Александър обнови решението на 27.11.2016 15:53 (преди над 1 година)
+package main
+
+import (
+        "fmt"
+        "time"
+)
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+// Pipeline stuff
+type pipelineTask struct {
+        tasks []Task
+}
+
+func (p pipelineTask) Execute(input int) (int, error) {
+        if len(p.tasks) == 0 {
+                return 0, fmt.Errorf("No tasks to execute in Pipeline Task")
+        }
+
+        var (
+                output int
+                err    error
+        )
+        for _, t := range p.tasks {
+                output, err = t.Execute(input)
+                if err != nil {
+                        return 0, err
+                }
+                input = output
+        }
+
+        return output, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+        var t pipelineTask
+        t.tasks = tasks
+        return t
+}
+
+// Fastest stuff
+type fastestTask struct {
+        tasks []Task
+}
+
+func (f fastestTask) Execute(in int) (int, error) {
+        if len(f.tasks) == 0 {
+                return 0, fmt.Errorf("No input tasks")
+        }
+
+        doneSync := make(chan struct{}, 1)
+        setOutputSync := make(chan struct{}, 1)
+        var (
+                output      int
+                outputError error
+        )
+
+        for _, t := range f.tasks {
+                go func(task Task) {
+                        out, err := task.Execute(in)
+                        if _, ok := <-setOutputSync; ok {
+                                output = out
+                                outputError = err
+                                doneSync <- struct{}{}
+                        }
+                }(t)
+        }
+        setOutputSync <- struct{}{}
+        <-doneSync
+        close(setOutputSync)
+        close(doneSync)
+        return output, outputError
+}
+
+func Fastest(tasks ...Task) Task {
+        var f fastestTask
+        f.tasks = tasks
+        return f
+}
+
+// Timed stuff
+type timedTask struct {
+        task    Task
+        timeout time.Duration
+}
+
+func (t timedTask) Execute(in int) (int, error) {
+        doneSync := make(chan struct{}, 1)
+        var (
+                output      int
+                outputError error
+        )
+        go func() {
+                out, err := t.task.Execute(in)
+                output = out
+                outputError = err
+                doneSync <- struct{}{}
+        }()
+
+        select {
+        case <-doneSync:
+                return output, outputError
+        case <-time.After(t.timeout):
+                return 0, fmt.Errorf("Timed out waiting to task")
+        }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+        var t timedTask
+        t.task = task
+        t.timeout = timeout
+        return t
+}
+
+// ConcurrentMapReduce
+type concurrentMapReduceTask struct {
+        tasks  []Task
+        reduce func(results []int) int
+}
+
+func (c concurrentMapReduceTask) Execute(in int) (int, error) {
+        numTasks := len(c.tasks)
+        if numTasks == 0 {
+                return 0, fmt.Errorf("No input tasks")
+        }
+
+        results := make([]int, 0, numTasks)
+        errorChannel := make(chan error)
+        resultChannel := make(chan int)
+        syncChannel := make(chan struct{})
+        defer close(syncChannel)
+        defer close(errorChannel)
+        defer close(resultChannel)
+
+        for _, t := range c.tasks {
+                go func(task Task) {
+                        output, outputError := task.Execute(in)
+                        if _, ok := <-syncChannel; ok {
+                                if outputError == nil {
+                                        resultChannel <- output
+                                } else {
+                                        errorChannel <- outputError
+                                }
+                        }
+                }(t)
+        }
+
+        for i := 0; i < numTasks; i++ {
+                syncChannel <- struct{}{}
+                select {
+                case res := <-resultChannel:
+                        results = append(results, res)
+                case err := <-errorChannel:
+                        return 0, err
+                }
+        }
+
+        return c.reduce(results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        var c concurrentMapReduceTask
+        c.reduce = reduce
+        c.tasks = tasks
+        return c
+}
+
+// GreatestSearcher
+type greatestSearcherTask struct {
+        errorLimit int
+        tasks      <-chan Task
+}
+
+func (g greatestSearcherTask) Execute(in int) (int, error) {
+        errorChannel := make(chan struct{})
+        resultChannel := make(chan int)
+        currentErrors := 0
+        maxNumber := 0
+        var (
+                counter     int64
+                shouldQuit  bool
+                hadAnyTasks bool
+        )
+
+        for !shouldQuit || counter != 0 {
+                select {
+                case <-errorChannel:
+                        currentErrors++
+                        counter--
+                case res := <-resultChannel:
+                        if res > maxNumber {
+                                maxNumber = res
+                        }
+                        counter--
+                default:
+                        if t, ok := <-g.tasks; ok {
+                                hadAnyTasks = true
+                                counter++
+                                go func(task Task) {
+                                        out, err := task.Execute(in)
+                                        if err != nil {
+                                                errorChannel <- struct{}{}
+                                        } else {
+                                                resultChannel <- out
+                                        }
+                                }(t)
+                        } else {
+                                shouldQuit = true
+                        }
+                }
+        }
+
+        if !hadAnyTasks {
+                return 0, fmt.Errorf("No input tasks")
+        }
+
+        if currentErrors > g.errorLimit {
+                return 0, fmt.Errorf("Too much errors generated from tasks")
+        }
+
+        return maxNumber, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        var g greatestSearcherTask
+        g.errorLimit = errorLimit
+        g.tasks = tasks
+        return g
+}
