Данислав обнови решението на 26.11.2016 10:56 (преди над 1 година)
+package main
+
+import (
+        "errors"
+        "sync"
+        "sync/atomic"
+        "time"
+)
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+type Pipeliner struct {
+        tasks []Task
+}
+
+func (p *Pipeliner) Execute(number int) (int, error) {
+        numberOfTasks := len(p.tasks)
+        if numberOfTasks == 0 {
+                return 0, errors.New("No tasks to execute!")
+        }
+
+        res := number
+        err := errors.New("")
тук май може да е просто var err error
+        for _, task := range p.tasks {
+                res, err = task.Execute(res)
+                if err != nil {
+                        return 0, err
+                }
+        }
+
+        return res, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+        p := new(Pipeliner)
+        p.tasks = tasks[:]
+        return p
+}
+
+type Fast struct {
+        tasks []Task
+}
+
+func (f *Fast) Execute(number int) (int, error) {
+        numberOfTasks := len(f.tasks)
+        if numberOfTasks == 0 {
+                return 0, errors.New("No tasks to execute!")
+        }
+
+        ch := make(chan struct {
+                res int
+                err error
+        }, numberOfTasks)
+        for _, task := range f.tasks {
+                go func(task Task) {
+                        res, err := task.Execute(number)
+                        ch <- struct {
+                                res int
+                                err error
+                        }{res, err}
+                }(task)
+        }
+
+        first := <-ch
+        return first.res, first.err
+}
+
+func Fastest(tasks ...Task) Task {
+        f := new(Fast)
+        f.tasks = tasks[:]
+        return f
+}
+
+type Timer struct {
+        task    Task
+        timeout time.Duration
+}
+
+func (t *Timer) Execute(number int) (int, error) {
+        ch := make(chan struct {
+                res int
+                err error
+        }, 1)
+        go func() {
+                res, err := t.task.Execute(number)
+                ch <- struct {
+                        res int
+                        err error
+                }{res, err}
+                close(ch)
+        }()
+
+        select {
+        case s := <-ch:
+                return s.res, s.err
+        case <-time.After(t.timeout):
+                return 0, errors.New("Timeout")
+        }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+        t := new(Timer)
+        t.task = task
+        t.timeout = timeout
+        return t
+}
+
+type Reducer struct {
+        reduce func(results []int) int
+        tasks  []Task
+}
+
+func (r *Reducer) Execute(number int) (int, error) {
+        numberOfTasks := len(r.tasks)
+        if numberOfTasks == 0 {
+                return 0, errors.New("No tasks to execute!")
+        }
+
+        var results []int
+        ch := make(chan struct {
+                res int
+                err error
+        }, numberOfTasks)
+        for _, task := range r.tasks {
+                go func(task Task) {
+                        res, err := task.Execute(number)
+                        ch <- struct {
+                                res int
+                                err error
+                        }{res, err}
+                }(task)
+        }
+
+        for i := 0; i < numberOfTasks; i++ {
+                s := <-ch
+                if s.err != nil {
+                        return 0, s.err
+                }
+                results = append(results, s.res)
+        }
+        return r.reduce(results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        r := new(Reducer)
+        r.reduce = reduce
+        r.tasks = tasks[:]
+        return r
+}
+
+type Searcher struct {
+        errorLimit int
+        tasks      <-chan Task
+}
+
+func (s *Searcher) Execute(number int) (int, error) {
+        var results []int
+        errs := int64(0)
+        var wg sync.WaitGroup
+        for task := range s.tasks {
+                wg.Add(1)
+                go func(task Task) {
+                        res, err := task.Execute(number)
+                        if err != nil {
+                                errs = atomic.AddInt64(&errs, 1)
+                        } else {
+                                results = append(results, res)
+                        }
+                        wg.Done()
+                }(task)
+        }
+
+        wg.Wait()
+        if len(results) == 0 && errs == 0 {
+                return 0, errors.New("No tasks to execute!")
+        }
+        if errs > int64(s.errorLimit) {
+                return 0, errors.New("Too many errors!")
+        }
+
+        max := results[0]
+        for _, res := range results {
+                if res > max {
+                        max = res
+                }
+        }
+
+        return int(max), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        s := new(Searcher)
+        s.errorLimit = errorLimit
+        s.tasks = tasks
+        return s
+}
Може да си дефинираш някакъв твой тип за тази структура с грешката и резултата, за да не се налага да му пишеш дефиницията навсякъде.
Иначе изглежда супер. Оше не сме написали всичките тестове и не мога да кажа със сигурност, на на пръв поглед не виждам някакив очевидни синхронизационни проблеми или висящи горутини.
