Георги обнови решението на 28.11.2016 19:31 (преди над 1 година)
+package main
+
+import (
+        "errors"
+        "fmt"
+        "math"
+        "sync"
+        "time"
+)
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+type TaskOutput struct {
+        result int
+        err    error
+}
+
+// SequentialExecutor
+
+type SequentialExecutor struct {
+        tasks []Task
+}
+
+func NewSequentialExecutor() *SequentialExecutor {
+        return &SequentialExecutor{}
+}
+
+func (s *SequentialExecutor) AppendTask(task Task) {
+        s.tasks = append(s.tasks, task)
+}
+
+func (s *SequentialExecutor) Execute(arg int) (int, error) {
+        if len(s.tasks) == 0 {
+                return -1, errors.New("There aren't any tasks to be executed.")
+        }
+
+        execArg := arg
+        var err error
+
+        for _, task := range s.tasks {
+                execArg, err = task.Execute(execArg)
+
+                if err != nil {
+                        return -1, err
+                }
+        }
+
+        return execArg, err
+}
+
+// ConcurrentExecutor
+
+type ConcurrentExecutor struct {
+        tasks []Task
+}
+
+func NewConcurrentExecutor() *ConcurrentExecutor {
+        return &ConcurrentExecutor{}
+}
+
+func (c *ConcurrentExecutor) AppendTask(task Task) {
+        c.tasks = append(c.tasks, task)
+}
+
+func (c *ConcurrentExecutor) Execute(arg int) (int, error) {
+        if len(c.tasks) == 0 {
+                return -1, errors.New("The aren't any tasks to be executed.")
+        }
+
+        resultChan := make(chan TaskOutput, 1)
+
+        for _, task := range c.tasks {
+                // Run new goroutines only if we still don't have a completed task
+                select {
+                case res := <-resultChan:
+                        return res.result, res.err
+                default:
+                        go func(t Task, arg int) {
+                                res, err := t.Execute(arg)
+                                resultChan <- TaskOutput{res, err}
+                        }(task, arg)
+                }
+        }
+
+        // In case the last task is the fastest one
+        result := <-resultChan
+
+        return result.result, result.err
+}
+
+// TimedExecutor
+
+type TimedExecutor struct {
+        task    Task
+        timeout time.Duration
+}
+
+func NewTimedExecutor(task Task, timeout time.Duration) *TimedExecutor {
+        return &TimedExecutor{task, timeout}
+}
+
+func (t *TimedExecutor) Execute(arg int) (int, error) {
+        timer := time.Now()
+        resultChan := make(chan TaskOutput, 1)
+
+        go func(t Task, arg int) {
+                res, err := t.Execute(arg)
+                resultChan <- TaskOutput{res, err}
+        }(t.task, arg)
+
+        final := <-resultChan
+
+        if t.timeout > time.Since(timer) {
+                return final.result, final.err
+        }
+
+        return -1, errors.New("The task execution time exceeded the provided timeout")
+}
+
+// ConcurrentMapExecutor
+
+type ConcurrentMapExecutor struct {
+        reduce func(result []int) int
+        tasks  []Task
+}
+
+func NewConcurrentMapExecutor(reduce func(result []int) int) *ConcurrentMapExecutor {
+        obj := &ConcurrentMapExecutor{}
+        obj.reduce = reduce
+
+        return obj
+}
+
+func (c *ConcurrentMapExecutor) AppendTask(task Task) {
+        c.tasks = append(c.tasks, task)
+}
+
+func (c *ConcurrentMapExecutor) Execute(arg int) (int, error) {
+        if len(c.tasks) == 0 {
+                return -1, errors.New("The aren't any tasks to be executed.")
+        }
+
+        var wg sync.WaitGroup
+        resultChan := make(chan TaskOutput, len(c.tasks))
+        errorChan := make(chan error, 1)
+
+        for _, task := range c.tasks {
+                select {
+                case err := <-errorChan:
+                        return -1, err
+                default:
+                        wg.Add(1)
+                        go func(t Task, arg int) {
+                                res, err := t.Execute(arg)
+
+                                if err != nil {
+                                        errorChan <- err
+                                }
+
+                                resultChan <- TaskOutput{res, err}
+                                wg.Done()
+                        }(task, arg)
+                }
+        }
+
+        wg.Wait()
+        close(resultChan)
+
+        results := []int{}
+        for output := range resultChan {
+                results = append(results, output.result)
+        }
+
+        return c.reduce(results), nil
+}
+
+// GreedilyTaskExecutor
+type GreedilyTaskExecutor struct {
+        errorLimit int
+        tasks      <-chan Task
+}
+
+func NewGreedilyTaskExecutor(errorLimit int, tasks <-chan Task) *GreedilyTaskExecutor {
+        return &GreedilyTaskExecutor{errorLimit, tasks}
+}
+
+func (g *GreedilyTaskExecutor) Execute(arg int) (int, error) {
+        var wg sync.WaitGroup
+        var mtx sync.Mutex
+        results := []TaskOutput{}
+
+        for {
+                task, ok := <-g.tasks
+
+                if ok {
+                        wg.Add(1)
+                        go func(t Task, arg int) {
+                                res, err := t.Execute(arg)
+                                mtx.Lock()
+                                results = append(results, TaskOutput{res, err})
+                                mtx.Unlock()
+                                wg.Done()
+                        }(task, arg)
+                } else {
+                        break
+                }
+        }
+
+        wg.Wait()
+
+        if len(results) == 0 {
+                return -1, errors.New("No tasks were executed.")
+        }
+
+        max := math.MinInt32
+        errCount := 0
+        for _, output := range results {
+                if output.result > max {
+                        max = output.result
+                }
+                if output.err != nil {
+                        errCount++
+                }
+        }
+
+        if errCount > g.errorLimit {
+                return -1, errors.New("Exceeded error limit.")
+        }
+
+        return max, nil
+}
+
+// Functions
+
+func Pipeline(tasks ...Task) Task {
+        se := NewSequentialExecutor()
+
+        for _, task := range tasks {
+                se.AppendTask(task)
+        }
+
+        return se
+}
+
+func Fastest(tasks ...Task) Task {
+        ce := NewConcurrentExecutor()
+
+        for _, task := range tasks {
+                ce.AppendTask(task)
+        }
+
+        return ce
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+        return NewTimedExecutor(task, timeout)
+}
+
+func ConcurrentMapReduce(reduce func(result []int) int, tasks ...Task) Task {
+        cmr := NewConcurrentMapExecutor(reduce)
+
+        for _, task := range tasks {
+                cmr.AppendTask(task)
+        }
+
+        return cmr
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        return NewGreedilyTaskExecutor(errorLimit, tasks)
+}
+
+// TESTING
+
+// Pipeline stuff
+type adder struct {
+        augend int
+}
+
+func (a adder) Execute(addend int) (int, error) {
+        result := a.augend + addend
+        if result > 127 {
+                return 0, fmt.Errorf("Result %d exceeds the adder threshold", a)
+        }
+        return result, nil
+}
+
+// Fastest and Timed stuff
+type lazyAdder struct {
+        adder
+        delay time.Duration
+}
+
+func (la lazyAdder) Execute(addend int) (int, error) {
+        time.Sleep(la.delay * time.Millisecond)
+        return la.adder.Execute(addend)
+}
+
+func main() {
+
+}

Какво ще се случи с goroutine-ите които не са най бързи ?
Това не е
for task := range g.tasks {защото ?Какво очакваш да е result при задача която е върнала грешка?