Николай обнови решението на 26.11.2016 16:38 (преди над 1 година)
+package main
+
+import (
+        "errors"
+        "fmt"
+        "sync"
+        "time"
+        "unsafe"
+)
+
+//
+// Help functions and types
+//
+type Task interface {
+        Execute(int) (int, error)
+}
+
+type Result struct {
+        res int
+        err error
+}
+
+type SyncInt struct {
+        sync.RWMutex
+        num int
+}
+
+func CreateSyncInt(start int) SyncInt {
+        return SyncInt{num: start}
+}
+
+func (ai *SyncInt) Increment() {
+        ai.Lock()
+        ai.num++
+        ai.Unlock()
+}
+
+func (ai *SyncInt) SetMax(val int) {
+        if val > ai.Get() {
+                ai.Lock()
+                ai.num = val
+                ai.Unlock()
+        }
+}
+
+func (ai *SyncInt) Get() (ret int) {
+        ai.RLock()
+        ret = ai.num
+        ai.RUnlock()
+        return
+}
+
+//----------------------------------------------------------------------------
+
+func Pipeline(tasks ...Task) Task {
+        return Pipeliner{tasks}
+}
+
+type Pipeliner struct {
+        tasks []Task
+}
+
+func (p Pipeliner) Execute(arg int) (int, error) {
+        if len(p.tasks) == 0 {
+                return 0, errors.New("No tasks given!")
+        }
+
+        finChan := make(chan Result)
+        prevChan, nextChan := finChan, finChan
+
+        for i := len(p.tasks) - 1; i >= 0; i-- {
+                nextChan = make(chan Result)
+                go chainTask(prevChan, nextChan, p.tasks[i])
+                prevChan = nextChan
+        }
+
+        go func(feedChan chan Result) {
+                feedChan <- Result{arg, nil}
+        }(nextChan)
+
+        result := <-finChan
+        return result.res, result.err
+}
+
+func chainTask(prevChan, nextChan chan Result, task Task) {
+        result := <-nextChan
+
+        if result.err != nil {
+                prevChan <- result
+        } else {
+                res, err := task.Execute(result.res)
+                prevChan <- Result{res, err}
+        }
+}
+
+//----------------------------------------------------------------------------
+
+func Fastest(tasks ...Task) Task {
+        return Timer{tasks}
+}
+
+type Timer struct {
+        tasks []Task
+}
+
+func (t Timer) Execute(arg int) (int, error) {
+        if len(t.tasks) == 0 {
+                return 0, errors.New("No tasks given!")
+        }
+
+        var once sync.Once
+        fastResChan := make(chan Result)
+
+        for _, task := range t.tasks {
+                go func(task Task) {
+                        res, err := task.Execute(arg)
+                        once.Do(func() {
+                                fastResChan <- Result{res, err}
+                        })
+                }(task)
+        }
+
+        result := <-fastResChan
+        return result.res, result.err
+}
+
+//----------------------------------------------------------------------------
+
+func Timed(task Task, timeout time.Duration) Task {
+        return Limiter{task, timeout}
+}
+
+type Limiter struct {
+        task  Task
+        limit time.Duration
+}
+
+func (l Limiter) Execute(arg int) (int, error) {
+        resultChan := make(chan Result)
+
+        go func() {
+                res, err := l.task.Execute(arg)
+                resultChan <- Result{res, err}
+        }()
+
+        select {
+        case result := <-resultChan:
+                return result.res, result.err
+        case <-time.After(l.limit):
+                return 0, fmt.Errorf("Task didn't finish for %s!", l.limit)
+        }
+}
+
+//----------------------------------------------------------------------------
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        return MapReducer{reduce, tasks}
+}
+
+type MapReducer struct {
+        reduce func(results []int) int
+        tasks  []Task
+}
+
+func (mr MapReducer) Execute(arg int) (int, error) {
+        if len(mr.tasks) == 0 {
+                return 0, errors.New("No tasks given!")
+        }
+
+        results := make([]int, len(mr.tasks))
+        resultChan := make(chan Result)
+        errCnt := CreateSyncInt(0)
+
+        var taskWg sync.WaitGroup
+        for ind, task := range mr.tasks {
+                taskWg.Add(1)
+                go func(ind int, task Task) {
+                        res, err := task.Execute(arg)
+                        if err == nil {
+                                results[ind] = res
+                        } else {
+                                resultChan <- Result{0, fmt.Errorf("Error in task %d", ind)}
+                                if errCnt.Get() == 0 {
+                                        errCnt.Increment()
+                                }
+                        }
+                        taskWg.Done()
+                }(ind, task)
+        }
+
+        go func() {
+                taskWg.Wait()
+                if errCnt.Get() == 0 {
+                        resultChan <- Result{mr.reduce(results), nil}
+                }
+        }()
+
+        result := <-resultChan
+        return result.res, result.err
+}
+
+//----------------------------------------------------------------------------
+
+func MinInt() int {
+        // 'i' is used to take the size of an int for the current build
+        var i int
+        return -(1 << ((uint(unsafe.Sizeof(i)) * 8) - 1))
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        return AsyncExecuter{errorLimit, tasks}
+}
+
+type AsyncExecuter struct {
+        errorLimit int
+        tasks      <-chan Task
+}
+
+func (ae AsyncExecuter) Execute(arg int) (int, error) {
+        var taskWg sync.WaitGroup
+        errorCountSync, maxSync := CreateSyncInt(0), CreateSyncInt(MinInt())
+
+        taskCnt := 0
+        for task := range ae.tasks {
+                taskWg.Add(1)
+                taskCnt++
+                go func(task Task) {
+                        res, err := task.Execute(arg)
+                        if err == nil {
+                                maxSync.SetMax(res)
+                        } else {
+                                errorCountSync.Increment()
+                        }
+                        taskWg.Done()
+                }(task)
+        }
+
+        taskWg.Wait()
+        max, errorCnt := maxSync.Get(), errorCountSync.Get()
+        var err error = nil
+        if taskCnt == 0 {
+                err = fmt.Errorf("No tasks given!")
+        } else if taskCnt <= errorCnt {
+                err = fmt.Errorf("All tasks failed!")
+        } else if ae.errorLimit < errorCnt {
+                err = fmt.Errorf("The error limit of %d was exceeded!", ae.errorLimit)
+        }
+        return max, err
+}
- Виж пак условието за 
Pipeline(), реално всичките тези горутини и канали не са ли лек overkill там? - Какво ще стане с горутината, която пускаш в
Execute()наTimed(), ако все пак се стигне до timeout? Или в тези, които пускаш вConcurrentMapReduce(), след като си прочел един резултат отresultChanи си го върнал? - Вместо 
MinInt(иunsafe!), помисли за по-добър начин да укажеш "нищо" или "няма стойност". Или разгледай math пакета от стандартната библиотека и ползвай MinInt нещата от там :) 
