Диан обнови решението на 29.11.2016 09:36 (преди над 1 година)
+package main
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+// Task ...
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type pipelineManager struct {
+ tasks []Task
+}
+
+func (pe pipelineManager) Execute(x int) (int, error) {
+ if len(pe.tasks) == 0 {
+ return 0, nil
+ }
+ res := x
+ for _, t := range pe.tasks {
+ r, err := t.Execute(res)
+ res = r
+ if err != nil {
+ return 0, err
+ }
+ }
+ return res, nil
+}
+
+// Pipeline ...
+func Pipeline(tasks ...Task) Task {
+ return pipelineManager{tasks: tasks}
+}
+
+// ======================================
+type fastest struct {
+ tasks []Task
+}
+
+type result struct {
+ data int
+ err error
+}
+
+func (f fastest) Execute(x int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, nil
+ }
+ ch := make(chan result, 1)
+ for _, task := range f.tasks {
+ go func(t Task) {
+ res, err := t.Execute(x)
+ select {
+ case ch <- result{res, err}:
+ default:
+ }
+ }(task)
+ }
+ finalResult := <-ch
+ return finalResult.data, finalResult.err
+}
+
+// Fastest ..
+func Fastest(tasks ...Task) Task {
+ return fastest{tasks}
+}
+
+// ======================================
+type timed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t timed) Execute(x int) (int, error) {
+ ch := make(chan result)
+ go func() {
+ res, err := t.task.Execute(x)
+ ch <- result{res, err}
+ }()
+ select {
+ case res := <-ch:
+ return res.data, res.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("main: timed out on Execute")
+ }
+}
+
+// Timed ...
+func Timed(task Task, timeout time.Duration) Task {
+ return timed{task: task, timeout: timeout}
+}
+
+// ======================================
+type concurrentMapReduce struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (c concurrentMapReduce) Execute(x int) (int, error) {
+ if len(c.tasks) == 0 {
+ return 0, nil
+ }
+ ch := make(chan result, 1)
+ for _, task := range c.tasks {
+ go func(t Task) {
+ res, err := t.Execute(x)
+ select {
+ case ch <- result{res, err}:
+ default:
+ }
+ }(task)
+ }
+ results := make([]int, 0)
+ for i := 0; i < len(c.tasks); i++ {
+ r := <-ch
+ if r.err != nil {
+ return 0, r.err
+ }
+ results = append(results, r.data)
+ }
+ return c.reduce(results), nil
+}
+
+// ConcurrentMapReduce ...
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return concurrentMapReduce{reduce: reduce, tasks: tasks}
+}
+
+// ======================================
+type greatestSearcher struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+// Execute is hedious.
+func (g greatestSearcher) Execute(x int) (int, error) {
+ ch := make(chan result)
+ finish := make(chan struct{})
+ final := make(chan result)
+
+ go func() {
+ results := make([]result, 0)
+ for {
+ select {
+ case res := <-ch:
+ results = append(results, res)
+ case <-finish:
+ close(ch)
+ errorCount := 0
+ finalResult := result{}
+ if len(results) == 0 {
+ finalResult = result{0, fmt.Errorf("No Tasks")}
+ final <- finalResult
+ }
+
+ max := results[0].data
+ for _, r := range results[1:] {
+ if r.data > max {
+ max = r.data
+ }
+ if r.err != nil {
+ errorCount++
+ }
+ }
+
+ if errorCount > g.errorLimit {
+ finalResult = result{0, fmt.Errorf("ErrorLimit reached")}
+ } else {
+ finalResult = result{max, nil}
+ }
+ final <- finalResult
+ }
+ }
+ }()
+
+ var wg sync.WaitGroup
+ for task := range g.tasks {
+ wg.Add(1)
+ go func(t Task) {
+ res, err := t.Execute(x)
+ ch <- result{res, err}
+ wg.Done()
+ }(task)
+ }
+
+ wg.Wait()
+ finish <- struct{}{}
+ y := <-final
+ return y.data, y.err
+}
+
+// GreatestSearcher ...
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return greatestSearcher{errorLimit: errorLimit, tasks: tasks}
+}
Когато трябва да върнеш грешка, не ползвай nil
, а errors.New("някакво съобщение")
или fmt.Errorf("Форматирано съобщение %s", err)
Когато върнеш nil
като error
означава "няма грешка": http://lectures.fmi.golang.bg/06-concurrency102.slide#8