Димитър обнови решението на 28.11.2016 23:22 (преди над 1 година)
+package main
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type ValErr struct {
+ value int
+ err error
+}
+
+//-------------------------------------
+type PipelinedTasks struct {
+ tasks []Task
+}
+
+func Pipeline(tasks ...Task) Task {
+ p := new(PipelinedTasks)
+ p.tasks = tasks
+ return p
+}
+
+func (p PipelinedTasks) Execute(value int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, fmt.Errorf("no tasks!")
+ }
+
+ var err error
+
+ for _, task := range p.tasks {
+ value, err = task.Execute(value)
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ return value, nil
+}
+
+//-------------------------------------
+type FastestTask struct {
+ tasks []Task
+}
+
+func Fastest(tasks ...Task) Task {
+ f := new(FastestTask)
+ f.tasks = tasks
+ return f
+}
+
+func (f FastestTask) Execute(value int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("no tasks!")
+ }
+
+ ch := make(chan ValErr)
+
+ for _, task := range f.tasks {
+ go func(task Task) {
+ res, err := task.Execute(value)
+ select {
+ case ch <- ValErr{res, err}:
+ default:
+ }
+ }(task)
+ }
+
+ valErr := <-ch
+ return valErr.value, valErr.err
+}
+
+//------------------------------------------------------
+type TimedTask struct {
+ task Task
+ timeout time.Duration
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ t := new(TimedTask)
+ t.task = task
+ t.timeout = timeout
+ return t
+}
+
+func (t TimedTask) Execute(value int) (int, error) {
+ ch := make(chan ValErr)
+
+ go func(task Task) {
+ val, err := task.Execute(value)
+ select {
+ case ch <- ValErr{val, err}:
+ default:
+ }
+ }(t.task)
+
+ select {
+ case valErr := <-ch:
+ return valErr.value, valErr.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("timed-out")
+ }
+}
+
+//--------------------------
+type MapReduceTasks struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ t := new(MapReduceTasks)
+ t.tasks = tasks
+ t.reduce = reduce
+ return t
+}
+
+func (m MapReduceTasks) Execute(value int) (int, error) {
+ if len(m.tasks) == 0 {
+ return 0, fmt.Errorf("no tasks!")
+ }
+
+ ch := make(chan ValErr)
+ errCh := make(chan struct{}, len(m.tasks))
+
+ for _, task := range m.tasks {
+ go func(task Task) {
+ res, err := task.Execute(value)
+ select {
+ case ch <- ValErr{res, err}:
+ case <-errCh:
+ return
+ }
+ }(task)
+ }
+
+ values := make([]int, 0)
+
+ for valErr := range ch {
+ if valErr.err != nil {
+ for i := 0; i < len(m.tasks); i++ {
+ errCh <- struct{}{}
+ }
+ return 0, valErr.err
+ }
+ values = append(values, valErr.value)
+
+ if len(values) == len(m.tasks) {
+ close(ch)
+ }
+ }
+
+ return m.reduce(values), nil
+}
+
+//---------------------------------------------------
+type GreatestTask struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ g := new(GreatestTask)
+ g.tasks = tasks
+ g.errorLimit = errorLimit
+ return g
+}
+
+func (g GreatestTask) Execute(value int) (int, error) {
+ ch := make(chan ValErr)
+ flag := false
+
+ go func() {
+ var wg sync.WaitGroup
+
+ for task := range g.tasks {
+ flag = true
+ wg.Add(1)
+ go func(task Task) {
+ val, err := task.Execute(value)
+ ch <- ValErr{val, err}
+ wg.Done()
+ }(task)
+ }
+ wg.Wait()
+ close(ch)
+ }()
+
+ if flag == false {
+ return 0, fmt.Errorf("no tasks passed!")
+ }
+
+ errorCount := 0
+ max := math.MinInt64
+
+ for valErr := range ch {
+ if valErr.err != nil {
+ errorCount++
+ if errorCount > g.errorLimit {
+ return 0, fmt.Errorf("error limit reached!")
+ }
+ }
+
+ if valErr.value > max {
+ max = valErr.value
+ }
+ }
+
+ return max, nil
+}
Погледни си GreatestSearcher
дали правилно синхронизираш нещата покрай проверката за липса на задачи