Добромир обнови решението на 28.11.2016 00:02 (преди над 1 година)
+package main
+
+import "sync"
+import "time"
+import "fmt"
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+type pipelinedTasks struct {
+        tasks []Task
+}
+
+type fastestTask struct {
+        once  sync.Once
+        start sync.WaitGroup
+        tasks []Task
+}
+
+type timedTask struct {
+        task     Task
+        duration time.Duration
+}
+
+type mapReduce struct {
+        err    bool
+        tasks  []Task
+        reduce func([]int) int
+        lock   sync.Mutex
+}
+
+type faultTollerantTaskExecutor struct {
+        errorLimit int
+        tasks      <-chan Task
+        running sync.WaitGroup
+}
+
+func Pipeline(tasks ...Task) Task {
+        return pipelinedTasks{tasks}
+}
+
+func (pipeline pipelinedTasks) Execute(firstArg int) (int, error) {
+        result := firstArg
+        var err error
+        for i := 0; i < len(pipeline.tasks); i++ {
+                result, err = pipeline.tasks[i].Execute(result)
+                if err != nil {
+                        return 0, err
+                }
+        }
+        return result, nil
+}
+
+func Fastest(tasks ...Task) Task {
+        runner := &fastestTask{}
+        runner.tasks = tasks
+        return runner
+}
+
+func (fastest *fastestTask) Execute(arg int) (int, error) {
+        if len(fastest.tasks) == 0 {
+                return 0, fmt.Errorf("No tasks given to Fastest")
+        }
+
+        resChan := make(chan struct {
+                res int
+                err error
+        })
+        fastest.start.Add(len(fastest.tasks))
+
+        for _, task := range fastest.tasks {
+                go func(t Task, arg int, resChan chan struct {
+                        res int
+                        err error
+                }) {
+                        fastest.start.Wait()
+
+                        result, err := t.Execute(arg)
+                        fastest.once.Do(func() {
+                                resChan <- struct {
+                                        res int
+                                        err error
+                                }{result, err}
+                        })
+                }(task, arg, resChan)
+                fastest.start.Done()
+        }
+
+        res := <-resChan
+        return res.res, res.err
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+        return timedTask{task, timeout}
+}
+
+func (timeLimited timedTask) Execute(arg int) (int, error) {
+        start := time.Now()
+        res, err := timeLimited.task.Execute(arg)
+        duration := time.Since(start)
+
+        if timeLimited.duration >= duration {
+                return res, err
+        }
+
+        return 0, fmt.Errorf("")
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        mr := &mapReduce{}
+        mr.err = false
+        mr.reduce = reduce
+        mr.tasks = tasks
+
+        return mr
+}
+
+func (mr *mapReduce) Execute(arg int) (int, error) {
+        if len(mr.tasks) == 0 {
+                return 0, fmt.Errorf("No tasks given!")
+        }
+
+        results := make([]int, 0)
+        resultChan := make(chan int)
+        errorChan := make(chan error)
+
+        for _, task := range mr.tasks {
+                go func(task Task, resultChan chan int, errorChan chan error, mr *mapReduce) {
+                        res, err := task.Execute(arg)
+
+                        mr.lock.Lock()
+                        defer func() { mr.lock.Unlock() }()
+
+                        if mr.err {
+                                return
+                        }
+
+                        if err != nil {
+                                errorChan <- err
+                                mr.err = true
+                        } else {
+                                resultChan <- res
+                        }
+                }(task, resultChan, errorChan, mr)
+        }
+
+        done := false
+        for !done {
+                select {
+                case result, _ := <-resultChan:
+                        results = append(results, result)
+                        if len(results) == len(mr.tasks) {
+                                done = true
+                                break
+                        }
+                case err := <-errorChan:
+                        return 0, err
+                }
+        }
+
+        return mr.reduce(results), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        executor := &faultTollerantTaskExecutor{}
+
+        executor.errorLimit = errorLimit
+        executor.tasks = tasks
+
+        return executor
+}
+
+func (executor *faultTollerantTaskExecutor) Execute(arg int) (int, error) {
+        resultChan := make(chan int)
+        errorChan := make(chan error)
+        tasksRun := 0
+
+        for task := range(executor.tasks) {
+                tasksRun++
+                executor.running.Add(1)
+                go func(task Task, resultChan chan int, errorChan chan error) {
+                        defer func() { executor.running.Done() }()
+                        res, err := task.Execute(arg)
+                        if err != nil {
+                                errorChan <- err
+                        } else {
+                                resultChan <- res
+                        }
+                }(task, resultChan, errorChan)
+        }
+
+        if tasksRun == 0 {
+                return 0, fmt.Errorf("No tasks given!")
+        }
+
+        result, errors := 0, 0
+        go func() {
+                for tasksRun > 0 {
+                        select {
+                                case res, _ := <-resultChan:
+                                        if res > result {
+                                                result = res
+                                        }
+                                        tasksRun--
+                                case  <-errorChan:
+                                        tasksRun--
+                                        errors++
+                        }
+                }
+        }()
+
+        executor.running.Wait()
+        if errors >= executor.errorLimit {
+                return 0, fmt.Errorf("More tasks errored out than allowed limit!")
+        }
+
+        return result, nil
+}
Идеята на Timed е че не искаме да чакаме 2 часа за задача която ни трябва резултата в следващите 10 секунди. Така че трябва след като изтече timeout-а Timed да върне че не е стигнало времето. В допълнение е хубаво ако не оставя goroutine които няма да завършат преди края на програмата.
