Живко обнови решението на 29.11.2016 16:15 (преди над 1 година)
+package main
+
+import (
+        "fmt"
+        "sync"
+        "time"
+)
+
+type Task interface {
+        Execute(int) (int, error)
+}
+
+type Task1 struct {
+        result   int
+        err      error
+        ch1, ch2 chan struct{}
+}
+
+func (task1 *Task1) Execute(arg int) (int, error) {
+        task1.result = arg
+        task1.ch1 <- struct{}{}
+        <-task1.ch2
+        return task1.result, task1.err
+}
+
+func Pipeline(tasks ...Task) Task {
+        myTask := Task1{result: 0, err: nil}
+        myTask.ch1 = make(chan struct{})
+        myTask.ch2 = make(chan struct{})
+        go func() {
+                <-myTask.ch1
+                for _, task := range tasks {
+                        if myTask.err == nil {
+                                myTask.result, myTask.err = task.Execute(myTask.result)
+                        } else {
+                                break
+                        }
+                }
+                if len(tasks) == 0 {
+                        myTask.err = fmt.Errorf("no tasks")
+                }
+                myTask.ch2 <- struct{}{}
+        }()
+        return &myTask
+}
+
+func Fastest(tasks ...Task) Task {
+        myTask := Task1{err: nil}
+        myTask.ch1 = make(chan struct{})
+        myTask.ch2 = make(chan struct{})
+        var countMtx sync.Mutex
+        go func() {
+                <-myTask.ch1
+                ch3 := make(chan int, len(tasks)+1)
+                ch4 := make(chan error, len(tasks)+1)
+                for _, task := range tasks {
+                        go func(currentTask Task) {
+                                curruntResult, currentError := currentTask.Execute(myTask.result)
+                                countMtx.Lock()
+                                ch3 <- curruntResult
+                                ch4 <- currentError
+                                countMtx.Unlock()
+                        }(task)
+                }
+                if len(tasks) == 0 {
+                        myTask.err = fmt.Errorf("no tasks")
+                        myTask.ch2 <- struct{}{}
+                        return
+                }
+                myTask.result = <-ch3
+                myTask.err = <-ch4
+                myTask.ch2 <- struct{}{}
+        }()
+        return &myTask
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+        myTask := Task1{err: nil}
+        myTask.ch1 = make(chan struct{})
+        myTask.ch2 = make(chan struct{})
+        go func() {
+                <-myTask.ch1
+                ch3 := make(chan struct{})
+                go func() {
+                        myTask.result, myTask.err = task.Execute(myTask.result)
+                        ch3 <- struct{}{}
+                }()
+                select {
+                case <-ch3:
+                        myTask.ch2 <- struct{}{}
+                case <-time.After(timeout):
+                        myTask.err = fmt.Errorf("time is over")
+                        myTask.ch2 <- struct{}{}
+                }
+        }()
+        return &myTask
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+        myTask := Task1{err: nil}
+        myTask.ch1 = make(chan struct{})
+        myTask.ch2 = make(chan struct{})
+        var countMtx sync.Mutex
+        go func() {
+                <-myTask.ch1
+                ch3 := make(chan int, len(tasks)+1)
+                ch4 := make(chan error, len(tasks)+1)
+                ch5 := make(chan struct{})
+                for _, task := range tasks {
+                        go func(currentTask Task) {
+                                curruntResult, currentError := currentTask.Execute(myTask.result)
+                                if currentError != nil {
+                                        myTask.err = currentError
+                                        myTask.ch2 <- struct{}{}
+                                }
+                                countMtx.Lock()
+                                ch3 <- curruntResult
+                                ch4 <- currentError
+                                countMtx.Unlock()
+                                ch5 <- struct{}{}
+                        }(task)
+                }
+                for i := 0; i < len(tasks); i++ {
+                        <-ch5
+                }
+                if len(tasks) == 0 {
+                        myTask.err = fmt.Errorf("no tasks")
+                        myTask.ch2 <- struct{}{}
+                        return
+                }
+                for currentError := range ch4 {
+                        if currentError != nil {
+                                return
+                        }
+                }
+                results := make([]int, 0, len(tasks))
+                for curruntResult := range ch3 {
+                        results = append(results, curruntResult)
+                }
+                myTask.result = reduce(results)
+                myTask.ch2 <- struct{}{}
+        }()
+        return &myTask
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+        myTask := Task1{err: nil}
+        myTask.ch1 = make(chan struct{})
+        myTask.ch2 = make(chan struct{})
+        var countMtx sync.Mutex
+        ch3 := make(chan int)
+        ch4 := make(chan error)
+        count := 0
+        go func() {
+                <-myTask.ch1
+                ch5 := make(chan struct{})
+                for {
+                        task, ok := <-tasks
+                        if ok == false {
+                                break
+                        }
+                        count = count + 1
+                        go func(currentTask Task) {
+                                curruntResult, currentError := currentTask.Execute(myTask.result)
+                                countMtx.Lock()
+                                ch3 <- curruntResult
+                                ch4 <- currentError
+                                countMtx.Unlock()
+                                ch5 <- struct{}{}
+                        }(task)
+                }
+                for i := 0; i < count; i++ {
+                        <-ch5
+                }
+                close(ch3)
+                close(ch4)
+        }()
+        go func() {
+                max := 0
+                firstResult := true
+                for {
+                        curruntResult, ok1 := <-ch3
+                        currentError, ok2 := <-ch4
+                        if ok1 == false || ok2 == false {
+                                break
+                        }
+                        if currentError != nil {
+                                errorLimit = errorLimit - 1
+                        } else if firstResult {
+                                max = curruntResult
+                                firstResult = false
+                        } else if max < curruntResult {
+                                max = curruntResult
+                        }
+                }
+                if errorLimit < 0 {
+                        myTask.err = fmt.Errorf("too many errors")
+                        myTask.ch2 <- struct{}{}
+                        return
+                }
+                if count == 0 {
+                        myTask.err = fmt.Errorf("no tasks")
+                        myTask.ch2 <- struct{}{}
+                        return
+                }
+                if firstResult {
+                        myTask.err = fmt.Errorf("only errors")
+                        myTask.ch2 <- struct{}{}
+                        return
+                }
+                myTask.result = max
+                myTask.ch2 <- struct{}{}
+        }()
+        return &myTask
+}
