Слави обнови решението на 29.11.2016 12:04 (преди над 1 година)
+package main
+
+import (
+        "errors"
+        "sync"
+        "time"
+)
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+//pipline
+type pipeline struct {
+        tasks []Task
+}
+
+func (pl pipeline) Execute(input int) (int, error) {
+        if len(pl.tasks) == 0 {
+                return 0, errors.New("no tasks given!")
+        }
+        result := input
+        var err error
+        for _, task := range pl.tasks {
+                result, err = task.Execute(result)
+                if err != nil {
+                        return 0, err
+                }
+        }
+        return result, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+        return pipeline{[]Task(tasks)}
+}
+
+//fastest
+type fastest struct {
+        tasks []Task
+}
+
+func (fs fastest) Execute(input int) (int, error) {
+        if len(fs.tasks) == 0 {
+                return 0, errors.New("No tasks given")
+        }
+        var once sync.Once
+        first := make(chan struct {
+                result int
+                err    error
+        })
+        for _, task := range fs.tasks {
+                go func() {
+                        result, err := task.Execute(input)
+                        once.Do(func() {
+                                first <- struct {
+                                        result int
+                                        err    error
+                                }{result, err}
+                        })
+                }()
+        }
+        output := <-first
+        return output.result, output.err
+}
+
+func Fastest(tasks ...Task) Task {
+        return fastest{[]Task(tasks)}
+}
+
+//timed
+type timed struct {
+        task    Task
+        timeout time.Duration
+}
+
+func (td timed) Execute(input int) (int, error) {
+        output := make(chan struct {
+                result int
+                err    error
+        })
+        var once sync.Once
+        go func() {
+                result, err := td.task.Execute(input)
+                once.Do(func() {
+                        output <- struct {
+                                result int
+                                err    error
+                        }{result, err}
+                })
+        }()
+        select {
+        case result := <-output:
+                return result.result, result.err
+        case <-time.After(td.timeout):
+                once.Do(func() {})
+                return 0, errors.New("Task timed out")
+        }
+        return 0, nil
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+        return timed{task, timeout}
+}
+
+//concurentmapreduce
+type mapReducer struct {
+        tasks  []Task
+        reduce func(results []int) int
+}
+
+func (mr mapReducer) Execute(input int) (int, error) {
+        taskNum := len(mr.tasks)
+        if taskNum == 0 {
+                return 0, errors.New("No tasks given")
+        }
+        results := make([]int, taskNum, taskNum)
+        errSignal := make(chan error)
+        doneSignal := make(chan struct{}, 1)
+        var (
+                once sync.Once
+                wg   sync.WaitGroup
+        )
+        wg.Add(taskNum)
+        for index, task := range mr.tasks {
+                go func(index int, task Task) {
+                        result, err := task.Execute(input)
+                        if err == nil {
+                                results[index] = result
+                        } else {
+                                once.Do(func() { errSignal <- err })
+                        }
+                        wg.Done()
+                }(index, task)
+        }
+        go func() {
+                wg.Wait()
+                doneSignal <- struct{}{}
+        }()
+        select {
+        case err := <-errSignal:
+                return 0, err
+        case <-doneSignal:
+                return mr.reduce(results), nil
+        }
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        return mapReducer{[]Task(tasks), reduce}
+}
+
+//greatestSearcher
+type searcher struct {
+        tasks      <-chan Task
+        errorLimit int
+}
+
+func (gs searcher) Execute(input int) (int, error) {
+        var (
+                mtex sync.Mutex
+                wg   sync.WaitGroup
+        )
+        errorNum := 0
+        results := make([]int, 0)
+        for task := range gs.tasks {
+                go func(task Task) {
+                        wg.Add(1)
+                        result, err := task.Execute(input)
+                        mtex.Lock()
+                        if err == nil {
+                                results = append(results, result)
+                        } else {
+                                errorNum++
+                        }
+                        mtex.Unlock()
+                        wg.Done()
+                }(task)
+        }
+        wg.Wait()
+        if len(results) == 0 {
+                return 0, errors.New("Not task finished succesfuly")
+        } else if errorNum > gs.errorLimit {
+
+                return 0, errors.New("Too many errors")
+        } else {
+                max := results[0]
+                for _, result := range results {
+                        if max < result {
+                                max = result
+                        }
+                }
+                return max, nil
+        }
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        return searcher{tasks, errorLimit}
+}
Погледни във Fastest какво ще бъде task в цикъла, който въртиш (и горутините, които пускаш).
