Николай обнови решението на 29.11.2016 02:12 (преди над 1 година)
+package main
+
+import (
+        "fmt"
+        "math"
+        "sync"
+        "time"
+)
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+func Pipeline(tasks ...Task) Task {
+        return &pipeline{tasks}
+}
+
+type pipeline struct {
+        tasks []Task
+}
+
+func (p pipeline) Execute(addend int) (int, error) {
+        var err error
+        if len(p.tasks) == 0 {
+                return 0, fmt.Errorf("No tasks given")
+        }
+        val := addend
+        for _, task := range p.tasks {
+                if val, err = task.Execute(val); err != nil {
+                        return 0, err
+                }
+        }
+        return val, nil
+}
+
+func Fastest(tasks ...Task) Task {
+        return &fastest{tasks}
+}
+
+type fastest struct {
+        tasks []Task
+}
+
+type taskResult struct {
+        val int
+        err error
+}
+
+func (f fastest) Execute(addend int) (int, error) {
+        if len(f.tasks) == 0 {
+                return 0, fmt.Errorf("No tasks given")
+        }
+        c := make(chan taskResult)
+        executeTask := func(i int) {
+                val, err := f.tasks[i].Execute(addend)
+                c <- taskResult{val, err}
+        }
+        for i := range f.tasks {
+                go executeTask(i)
+        }
+        fastestTaskResult := <-c
+        return fastestTaskResult.val, fastestTaskResult.err
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+        return &timed{task, timeout}
+}
+
+type timed struct {
+        task    Task
+        timeout time.Duration
+}
+
+func (t timed) Execute(addend int) (int, error) {
+        c := make(chan taskResult)
+        go func() {
+                val, err := t.task.Execute(addend)
+                c <- taskResult{val, err}
+        }()
+        select {
+        case result := <-c:
+                close(c)
+                return result.val, result.err
+        case <-time.After(t.timeout):
+                return 0, fmt.Errorf("Execution timed out after %v", t.timeout)
+        }
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        return &concurrentMapReduce{tasks, reduce}
+}
+
+type concurrentMapReduce struct {
+        tasks  []Task
+        reduce func(results []int) int
+}
+
+func (cmr concurrentMapReduce) Execute(addend int) (int, error) {
+        if len(cmr.tasks) == 0 {
+                return 0, fmt.Errorf("No tasks given")
+        }
+        group := sync.WaitGroup{}
+        c := make(chan taskResult)
+        results := make([]int, 0)
+
+        executeTask := func(i int) {
+                val, err := cmr.tasks[i].Execute(addend)
+                c <- taskResult{val, err}
+        }
+        for i := range cmr.tasks {
+                group.Add(1)
+                go executeTask(i)
+        }
+        go func() {
+                group.Wait()
+                close(c)
+        }()
+        for result := range c {
+                val, err := result.val, result.err
+                if err == nil {
+                        results = append(results, val)
+                } else {
+                        return 0, err
+                }
+                group.Done()
+        }
+        return cmr.reduce(results), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        return &greatestSearcher{tasks, errorLimit}
+}
+
+type greatestSearcher struct {
+        tasks      <-chan Task
+        errorLimit int
+}
+
+func (g greatestSearcher) Execute(addend int) (int, error) {
+        errorCount := 0
+        max := math.MinInt64
+        hasSuccessfulTask, hasFailedTask := false, false
+        group := sync.WaitGroup{}
+
+        c := concurentHelper(g.tasks, &group, addend)
+
+        for taskResult := range c {
+                val, err := taskResult.val, taskResult.err
+                if err == nil {
+                        hasSuccessfulTask = true
+                        if val > max {
+                                max = val
+                        }
+                } else {
+                        hasFailedTask = true
+                        errorCount++
+                        if errorCount > g.errorLimit {
+                                return 0, fmt.Errorf("There are [%v] errors, but the limit is [%v]", errorCount, g.errorLimit)
+                        }
+                }
+                group.Done()
+        }
+        if !hasFailedTask && !hasSuccessfulTask {
+                return 0, fmt.Errorf("No task were given")
+        } else if !hasSuccessfulTask {
+                return 0, fmt.Errorf("No successful tasks were executed")
+        }
+        return max, nil
+}
+
+func concurentHelper(tasks <-chan Task, group *sync.WaitGroup, addend int) <-chan taskResult {
+        c := make(chan taskResult)
+        go func() {
+                defer close(c)
+                defer group.Wait()
+                for task := range tasks {
+                        group.Add(1)
+                        go func(task Task) {
+                                val, err := task.Execute(addend)
+                                c <- taskResult{val, err}
+                        }(task)
+                }
+        }()
+        return c
+}
