Диан обнови решението на 29.11.2016 09:36 (преди над 1 година)
+package main
+
+import (
+        "fmt"
+        "sync"
+        "time"
+)
+
+// Task ...
+type Task interface {
+        Execute(int) (int, error)
+}
+
+type pipelineManager struct {
+        tasks []Task
+}
+
+func (pe pipelineManager) Execute(x int) (int, error) {
+        if len(pe.tasks) == 0 {
+                return 0, nil
+        }
+        res := x
+        for _, t := range pe.tasks {
+                r, err := t.Execute(res)
+                res = r
+                if err != nil {
+                        return 0, err
+                }
+        }
+        return res, nil
+}
+
+// Pipeline ...
+func Pipeline(tasks ...Task) Task {
+        return pipelineManager{tasks: tasks}
+}
+
+// ======================================
+type fastest struct {
+        tasks []Task
+}
+
+type result struct {
+        data int
+        err  error
+}
+
+func (f fastest) Execute(x int) (int, error) {
+        if len(f.tasks) == 0 {
+                return 0, nil
+        }
+        ch := make(chan result, 1)
+        for _, task := range f.tasks {
+                go func(t Task) {
+                        res, err := t.Execute(x)
+                        select {
+                        case ch <- result{res, err}:
+                        default:
+                        }
+                }(task)
+        }
+        finalResult := <-ch
+        return finalResult.data, finalResult.err
+}
+
+// Fastest ..
+func Fastest(tasks ...Task) Task {
+        return fastest{tasks}
+}
+
+// ======================================
+type timed struct {
+        task    Task
+        timeout time.Duration
+}
+
+func (t timed) Execute(x int) (int, error) {
+        ch := make(chan result)
+        go func() {
+                res, err := t.task.Execute(x)
+                ch <- result{res, err}
+        }()
+        select {
+        case res := <-ch:
+                return res.data, res.err
+        case <-time.After(t.timeout):
+                return 0, fmt.Errorf("main: timed out on Execute")
+        }
+}
+
+// Timed ...
+func Timed(task Task, timeout time.Duration) Task {
+        return timed{task: task, timeout: timeout}
+}
+
+// ======================================
+type concurrentMapReduce struct {
+        reduce func(results []int) int
+        tasks  []Task
+}
+
+func (c concurrentMapReduce) Execute(x int) (int, error) {
+        if len(c.tasks) == 0 {
+                return 0, nil
+        }
+        ch := make(chan result, 1)
+        for _, task := range c.tasks {
+                go func(t Task) {
+                        res, err := t.Execute(x)
+                        select {
+                        case ch <- result{res, err}:
+                        default:
+                        }
+                }(task)
+        }
+        results := make([]int, 0)
+        for i := 0; i < len(c.tasks); i++ {
+                r := <-ch
+                if r.err != nil {
+                        return 0, r.err
+                }
+                results = append(results, r.data)
+        }
+        return c.reduce(results), nil
+}
+
+// ConcurrentMapReduce ...
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        return concurrentMapReduce{reduce: reduce, tasks: tasks}
+}
+
+// ======================================
+type greatestSearcher struct {
+        tasks      <-chan Task
+        errorLimit int
+}
+
+// Execute is hedious.
+func (g greatestSearcher) Execute(x int) (int, error) {
+        ch := make(chan result)
+        finish := make(chan struct{})
+        final := make(chan result)
+
+        go func() {
+                results := make([]result, 0)
+                for {
+                        select {
+                        case res := <-ch:
+                                results = append(results, res)
+                        case <-finish:
+                                close(ch)
+                                errorCount := 0
+                                finalResult := result{}
+                                if len(results) == 0 {
+                                        finalResult = result{0, fmt.Errorf("No Tasks")}
+                                        final <- finalResult
+                                }
+
+                                max := results[0].data
+                                for _, r := range results[1:] {
+                                        if r.data > max {
+                                                max = r.data
+                                        }
+                                        if r.err != nil {
+                                                errorCount++
+                                        }
+                                }
+
+                                if errorCount > g.errorLimit {
+                                        finalResult = result{0, fmt.Errorf("ErrorLimit reached")}
+                                } else {
+                                        finalResult = result{max, nil}
+                                }
+                                final <- finalResult
+                        }
+                }
+        }()
+
+        var wg sync.WaitGroup
+        for task := range g.tasks {
+                wg.Add(1)
+                go func(t Task) {
+                        res, err := t.Execute(x)
+                        ch <- result{res, err}
+                        wg.Done()
+                }(task)
+        }
+
+        wg.Wait()
+        finish <- struct{}{}
+        y := <-final
+        return y.data, y.err
+}
+
+// GreatestSearcher ...
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        return greatestSearcher{errorLimit: errorLimit, tasks: tasks}
+}
Когато трябва да върнеш грешка, не ползвай nil, а errors.New("някакво съобщение") или fmt.Errorf("Форматирано съобщение %s", err)
Когато върнеш nil като error означава "няма грешка": http://lectures.fmi.golang.bg/06-concurrency102.slide#8
