Никола обнови решението на 29.11.2016 15:31 (преди над 1 година)
+package main
+
+import (
+        "fmt"
+        "time"
+)
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+type res struct {
+        r   int
+        err error
+}
+
+type responseTask struct {
+        exec func(int) (int, error)
+}
+
+func (rt responseTask) Execute(arg int) (int, error) {
+        return rt.exec(arg)
+}
+
+func runner(task Task, arg int, c chan<- res) {
+        r, e := task.Execute(arg)
+        ret := new(res)
+        ret.r = r
+        ret.err = e
+        c <- *ret
+        return
+}
+
+func runWrapper(task Task, arg int, c chan res, selfDestruct <-chan struct{}) {
+        go runner(task, arg, c)
+        select {
+        case result := <-c:
+                c <- result
+                return
+        case <-selfDestruct:
+                return
+        }
+}
+
+func Pipeline(tasks ...Task) Task {
+
+        response := new(responseTask)
+
+        response.exec = func(arg int) (int, error) {
+
+                if len(tasks) == 0 {
+                        return 0, fmt.Errorf("Zero tasks passed to Pipeline.")
+                }
+
+                execResult, err := tasks[0].Execute(arg)
+                if err != nil {
+                        return 0, fmt.Errorf("There was an error %v in the first task.", err)
+                }
+
+                for i, t := range tasks {
+                        if i == 0 {
+                                continue
+                        }
+                        execResult, err = t.Execute(execResult)
+                        if err != nil {
+                                return 0, err
+                        }
+                }
+                return execResult, nil
+        }
+
+        return response
+}
+
+func Fastest(tasks ...Task) Task {
+        response := new(responseTask)
+
+        response.exec = func(arg int) (int, error) {
+
+                if len(tasks) == 0 {
+                        return 0, fmt.Errorf("No tasks!")
+                }
+
+                agg := make(chan res, 1) // "Who is first" issue
+                destroy := make(chan struct{}, len(tasks))
+
+                for _, t := range tasks {
+                        go runWrapper(t, arg, agg, destroy)
+                }
+
+                result := <-agg
+                for i := 0; i < len(tasks); i++ {
+                        destroy <- struct{}{}
+                }
+                return result.r, result.err
+        }
+
+        return response
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+
+        response := new(responseTask)
+
+        response.exec = func(arg int) (int, error) {
+
+                if task == nil {
+                        return 0, fmt.Errorf("Nil task given!")
+                }
+
+                c := make(chan res, 1)
+                go runner(task, arg, c)
+                select {
+                case r := <-c:
+                        return r.r, r.err
+                case <-time.After(timeout):
+                        return 0, fmt.Errorf("Task timed out!")
+                }
+        }
+
+        return response
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        response := new(responseTask)
+
+        response.exec = func(arg int) (int, error) {
+
+                if len(tasks) == 0 {
+                        return 0, fmt.Errorf("No tasks!")
+                }
+
+                agg := make(chan res, len(tasks))
+                destroy := make(chan struct{}, len(tasks))
+
+                results := make([]int, 0, len(tasks))
+
+                for _, t := range tasks {
+                        go runWrapper(t, arg, agg, destroy)
+                }
+
+                for i := 0; i < len(tasks); i++ {
+                        msg := <-agg
+                        if msg.err != nil {
+                                for j := 0; j < len(tasks); j++ {
+                                        destroy <- struct{}{}
+                                }
+                                return 0, msg.err
+                        }
+                        results = append(results, msg.r)
+                }
+                return reduce(results), nil
+        }
+
+        return response
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        response := new(responseTask)
+
+        response.exec = func(arg int) (int, error) {
+                agg := make(chan res)
+                destroy := make(chan struct{})
+                count := 0
+
+                for t := range tasks {
+                        go runWrapper(t, arg, agg, destroy)
+                        count++
+                }
+
+                results := make([]int, 0, 20)
+                for count > 0 {
+                        r := <-agg
+                        count--
+                        if r.err != nil {
+                                errorLimit--
+                                if errorLimit == -1 {
+                                        for count > 0 {
+                                                destroy <- struct{}{}
+                                                count--
+                                        }
+                                        return 0, fmt.Errorf("Error limit exceeded!")
+                                }
+                        }
+                        results = append(results, r.r)
+                }
+
+                // Get max of results
+                max := results[0]
+                for i := 0; i < len(results); i++ {
+                        if results[i] > max {
+                                max = results[i]
+                        }
+                }
+                return max, nil
+        }
+
+        return response
+}
