Решение на Concurrent Tasks от Радостина Димова

Обратно към всички решения

Към профила на Радостина Димова

Резултати

  • 13 точки от тестове
  • 0 бонус точки
  • 13 точки общо
  • 13 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import (
"errors"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type result struct {
res int
err error
}
//1
type ExecuterPipe struct {
tasks []Task
}
func (e ExecuterPipe) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var result int
for _, task := range e.tasks {
var err error
if result, err = task.Execute(arg); err != nil {
return 0, errors.New("error")
}
arg = result
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
e := new(ExecuterPipe)
e.tasks = tasks
return e
}
//2
type ExecuterFast struct {
tasks []Task
}
func (e ExecuterFast) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var once sync.Once
ret := make(chan result)
for _, task := range e.tasks {
go func(task Task) {
res, err := task.Execute(arg)
once.Do(func() {
ret <- result{res, err}
})
}(task)
}
r := <-ret
return r.res, r.err
}
func Fastest(tasks ...Task) Task {
e := new(ExecuterFast)
e.tasks = tasks
return e
}
//3
type ExecuterTimed struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
e := new(ExecuterTimed)
e.task = task
e.timeout = timeout
return e
}
func (e ExecuterTimed) Execute(arg int) (int, error) {
ch := make(chan result)
go func() {
res, err := e.task.Execute(arg)
ch <- result{res, err}
}()
select {
case r := <-ch:
return r.res, r.err
case <-time.After(e.timeout):
go func() {
<-ch
}()
return 0, errors.New("timeout")
}
}
//4
type ExecuterConcurrent struct {
tasks []Task
reduce func(results []int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
e := new(ExecuterConcurrent)
e.tasks = tasks
e.reduce = reduce
return e
}
func (e ExecuterConcurrent) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var wg sync.WaitGroup
var argMtx sync.Mutex
var once sync.Once
finish := make(chan struct{})
fail := make(chan struct{})
var reduceArg []int
reduceArg = make([]int, 0)
go func() {
for _, task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
once.Do(func() {
close(fail)
})
} else {
argMtx.Lock()
reduceArg = append(reduceArg, res)
argMtx.Unlock()
}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
}()
select {
case <-fail:
go func() {
<-finish
}()
return 0, errors.New("fail")
case <-finish:
return e.reduce(reduceArg), nil
}
}
//5
type ExecuterSearcher struct {
errorLimit int
tasks <-chan Task
limitMtx sync.Mutex
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
e := new(ExecuterSearcher)
e.tasks = tasks
e.errorLimit = errorLimit
return e
}
func (e ExecuterSearcher) Execute(arg int) (int, error) {
closed := make(chan struct{})
var wg sync.WaitGroup
max := math.MinInt64
var maxMtx sync.Mutex
haveResult := false
count := 0
go func() {
for task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
e.limitMtx.Lock()
e.errorLimit--
e.limitMtx.Unlock()
} else {
maxMtx.Lock()
if res > max {
max = res
haveResult = true
}
maxMtx.Unlock()
}
wg.Done()
}(task)
count++
}
closed <- struct{}{}
}()
<-closed
wg.Wait()
if !haveResult {
return 0, errors.New("no success")
} else if e.errorLimit < 0 {
return 0, errors.New("error limit")
} else if count == 0 {
return 0, errors.New("no tasks")
} else {
return max, nil
}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.003s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.003s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.003s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.103s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.203s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.136s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.203s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.003s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.003s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.003s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.055s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.053s
PASS
ok  	_/tmp/d20161129-30451-qgnta9	0.124s

История (5 версии и 8 коментара)

Радостина обнови решението на 26.11.2016 02:37 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+type result struct {
+ res int
+ err error
+}
+
+//1
+type ExecuterPipe struct {
+ tasks []Task
+}
+
+func (e ExecuterPipe) Execute(arg int) (int, error) {
+ if len(e.tasks) == 0 {
+ return 0, errors.New("no tasks")
+ }
+ var result int
+ for _, task := range e.tasks {
+ var err error
+ if result, err = task.Execute(arg); err != nil {
+ return 0, errors.New("error")
+ }
+ arg = result
+ }
+ return arg, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ e := new(ExecuterPipe)
+ e.tasks = make([]Task, 0)
+ for _, t := range tasks {
+ e.tasks = append(e.tasks, t)
+ }
+ return e
+}
+
+//2
+type ExecuterFast struct {
+ tasks []Task
+}
+
+func (e ExecuterFast) Execute(arg int) (int, error) {
+ if len(e.tasks) == 0 {
+ return 0, errors.New("no tasks")
+ }
+ var once sync.Once
+ ret := make(chan result)
+
+ for _, task := range e.tasks {
+ go func(task Task) {
+ res, err := task.Execute(arg)
+ once.Do(func() {
+ ret <- result{res, err}
+ })
+ }(task)
+ }
+ r := <-ret
+ return r.res, r.err
+}
+
+func Fastest(tasks ...Task) Task {
+ e := new(ExecuterFast)
+ e.tasks = make([]Task, 0)
+ for _, t := range tasks {
+ e.tasks = append(e.tasks, t)
+ }
+ return e
+}
+
+//3
+type ExecuterTimed struct {
+ task Task
+ timeout time.Duration
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ e := new(ExecuterTimed)
+ e.task = task
+ e.timeout = timeout
+ return e
+}
+
+func (e ExecuterTimed) Execute(arg int) (int, error) {
+ ch := make(chan result)
+ go func() {
+ res, err := e.task.Execute(arg)
+ ch <- result{res, err}
+ }()
+
+ select {
+ case r := <-ch:
+ return r.res, r.err
+ case <-time.After(e.timeout):
+ return 0, errors.New("timeout")
+ }
+}
+
+//4
+type ExecuterConcurrent struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ e := new(ExecuterConcurrent)
+ e.tasks = make([]Task, 0)
+ for _, t := range tasks {
+ e.tasks = append(e.tasks, t)
+ }
+ e.reduce = reduce
+ return e
+}
+
+func (e ExecuterConcurrent) Execute(arg int) (int, error) {
+ if len(e.tasks) == 0 {
+ return 0, errors.New("no tasks")
+ }
+
+ var wg sync.WaitGroup
+ var argMtx sync.Mutex
+
+ finish := make(chan struct{})
+ fail := make(chan struct{})
+
+ var reduceArg []int
+ reduceArg = make([]int, 0)
+ go func() {
+ for _, task := range e.tasks {
+ wg.Add(1)
+ go func(task Task) {
+ if res, err := task.Execute(arg); err != nil {
+ close(fail)
+ } else {
+ argMtx.Lock()
+ reduceArg = append(reduceArg, res)
+ argMtx.Unlock()
+ }
+ wg.Done()
+ }(task)
+ }
+ wg.Wait()
+ finish <- struct{}{}
+ }()
+
+ for {
+ select {
+ case <-fail:
+ return 0, errors.New("fail")
+ case <-finish:
+ return e.reduce(reduceArg), nil
+ }
+ }
+}
+
+//5
+type ExecuterSearcher struct {
+ errorLimit int
+ tasks <-chan Task
+ limitMtx sync.Mutex
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ e := new(ExecuterSearcher)
+ e.tasks = tasks
+ e.errorLimit = errorLimit
+ return e
+}
+
+func (e ExecuterSearcher) Execute(arg int) (int, error) {
+ closed := make(chan struct{})
+ var wg sync.WaitGroup
+ var max int
+ var maxMtx sync.Mutex
+
+ count := 0
+ go func() {
+ for task := range e.tasks {
+ wg.Add(1)
+ go func(task Task) {
+ if res, err := task.Execute(arg); err != nil {
+ e.limitMtx.Lock()
+ e.errorLimit--
+ e.limitMtx.Unlock()
+ } else {
+ if res > max {
+ maxMtx.Lock()
+ max = res
+ maxMtx.Unlock()
+ }
+ }
+ wg.Done()
+ }(task)
+ count++
+ }
+ closed <- struct{}{}
+ }()
+
+ <-closed
+ wg.Wait()
+
+ if e.errorLimit < 0 {
+ return 0, errors.New("error limit")
+ } else if count == 0 {
+ return 0, errors.New("no tasks")
+ } else {
+ return max, nil
+ }
+}

Радостина обнови решението на 26.11.2016 13:19 (преди над 1 година)

package main
import (
"errors"
+ "math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type result struct {
res int
err error
}
//1
type ExecuterPipe struct {
tasks []Task
}
func (e ExecuterPipe) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var result int
for _, task := range e.tasks {
var err error
if result, err = task.Execute(arg); err != nil {
return 0, errors.New("error")
}
arg = result
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
e := new(ExecuterPipe)
e.tasks = make([]Task, 0)
for _, t := range tasks {
e.tasks = append(e.tasks, t)

Имайки предвид, че при функции с произволен брой аргументи реално получаваш директно slice, в случая можеш директно да си присвоиш tasks, няма нужда да циклиш по него. И при долните "конструктори" също.

}
return e
}
//2
type ExecuterFast struct {
tasks []Task
}
func (e ExecuterFast) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var once sync.Once
ret := make(chan result)
for _, task := range e.tasks {
go func(task Task) {
res, err := task.Execute(arg)
once.Do(func() {
ret <- result{res, err}
})
}(task)
}
r := <-ret
return r.res, r.err
}
func Fastest(tasks ...Task) Task {
e := new(ExecuterFast)
e.tasks = make([]Task, 0)
for _, t := range tasks {
e.tasks = append(e.tasks, t)
}
return e
}
//3
type ExecuterTimed struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
e := new(ExecuterTimed)
e.task = task
e.timeout = timeout
return e
}
func (e ExecuterTimed) Execute(arg int) (int, error) {
ch := make(chan result)
go func() {
res, err := e.task.Execute(arg)
ch <- result{res, err}
}()
select {
case r := <-ch:
return r.res, r.err
case <-time.After(e.timeout):
return 0, errors.New("timeout")
}
}
//4
type ExecuterConcurrent struct {
tasks []Task
reduce func(results []int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
e := new(ExecuterConcurrent)
e.tasks = make([]Task, 0)
for _, t := range tasks {
e.tasks = append(e.tasks, t)
}
e.reduce = reduce
return e
}
func (e ExecuterConcurrent) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var wg sync.WaitGroup
var argMtx sync.Mutex
finish := make(chan struct{})
fail := make(chan struct{})
var reduceArg []int
reduceArg = make([]int, 0)
go func() {
for _, task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
close(fail)
} else {
argMtx.Lock()
reduceArg = append(reduceArg, res)
argMtx.Unlock()
}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
}()
for {
select {
case <-fail:
return 0, errors.New("fail")
case <-finish:
return e.reduce(reduceArg), nil
}
}
}
//5
type ExecuterSearcher struct {
errorLimit int
tasks <-chan Task
limitMtx sync.Mutex
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
e := new(ExecuterSearcher)
e.tasks = tasks
e.errorLimit = errorLimit
return e
}
func (e ExecuterSearcher) Execute(arg int) (int, error) {
closed := make(chan struct{})
var wg sync.WaitGroup
- var max int
+ max := math.MinInt64
var maxMtx sync.Mutex
count := 0
go func() {
for task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
e.limitMtx.Lock()
e.errorLimit--
e.limitMtx.Unlock()
} else {
if res > max {
maxMtx.Lock()
max = res
maxMtx.Unlock()
}
}
wg.Done()
}(task)
count++
}
closed <- struct{}{}
}()
<-closed
wg.Wait()
if e.errorLimit < 0 {
return 0, errors.New("error limit")
} else if count == 0 {
return 0, errors.New("no tasks")
} else {
return max, nil
}
-}
+}

Радостина обнови решението на 27.11.2016 10:09 (преди над 1 година)

package main
import (
"errors"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type result struct {
res int
err error
}
//1
type ExecuterPipe struct {
tasks []Task
}
func (e ExecuterPipe) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var result int
for _, task := range e.tasks {
var err error
if result, err = task.Execute(arg); err != nil {
return 0, errors.New("error")
}
arg = result
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
e := new(ExecuterPipe)
- e.tasks = make([]Task, 0)
- for _, t := range tasks {
- e.tasks = append(e.tasks, t)
- }
+ e.tasks = tasks
return e
}
//2
type ExecuterFast struct {
tasks []Task
}
func (e ExecuterFast) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var once sync.Once
ret := make(chan result)
for _, task := range e.tasks {
go func(task Task) {
res, err := task.Execute(arg)
once.Do(func() {
ret <- result{res, err}
})
}(task)
}
r := <-ret
return r.res, r.err
}
func Fastest(tasks ...Task) Task {
e := new(ExecuterFast)
- e.tasks = make([]Task, 0)
- for _, t := range tasks {
- e.tasks = append(e.tasks, t)
- }
+ e.tasks = tasks
return e
}
//3
type ExecuterTimed struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
e := new(ExecuterTimed)
e.task = task
e.timeout = timeout
return e
}
func (e ExecuterTimed) Execute(arg int) (int, error) {
ch := make(chan result)
go func() {
res, err := e.task.Execute(arg)
ch <- result{res, err}
}()
select {
case r := <-ch:
return r.res, r.err
case <-time.After(e.timeout):
return 0, errors.New("timeout")
}
}
//4
type ExecuterConcurrent struct {
tasks []Task
reduce func(results []int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
e := new(ExecuterConcurrent)
- e.tasks = make([]Task, 0)
- for _, t := range tasks {
- e.tasks = append(e.tasks, t)
- }
+ e.tasks = tasks
e.reduce = reduce
return e
}
func (e ExecuterConcurrent) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var wg sync.WaitGroup
var argMtx sync.Mutex
finish := make(chan struct{})
fail := make(chan struct{})
var reduceArg []int
reduceArg = make([]int, 0)
go func() {
for _, task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
close(fail)
} else {
argMtx.Lock()
reduceArg = append(reduceArg, res)
argMtx.Unlock()
}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
}()
for {
select {
case <-fail:
return 0, errors.New("fail")
case <-finish:
return e.reduce(reduceArg), nil
}
}
}
//5
type ExecuterSearcher struct {
errorLimit int
tasks <-chan Task
limitMtx sync.Mutex
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
e := new(ExecuterSearcher)
e.tasks = tasks
e.errorLimit = errorLimit
return e
}
func (e ExecuterSearcher) Execute(arg int) (int, error) {
closed := make(chan struct{})
var wg sync.WaitGroup
max := math.MinInt64
var maxMtx sync.Mutex
count := 0
go func() {
for task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
e.limitMtx.Lock()
e.errorLimit--
e.limitMtx.Unlock()
} else {
if res > max {
maxMtx.Lock()
max = res
maxMtx.Unlock()
}
}
wg.Done()
}(task)
count++
}
closed <- struct{}{}
}()
<-closed
wg.Wait()
if e.errorLimit < 0 {
return 0, errors.New("error limit")
} else if count == 0 {
return 0, errors.New("no tasks")
} else {
return max, nil
}
}

Радостина обнови решението на 27.11.2016 11:11 (преди над 1 година)

package main
import (
"errors"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type result struct {
res int
err error
}
//1
type ExecuterPipe struct {
tasks []Task
}
func (e ExecuterPipe) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var result int
for _, task := range e.tasks {
var err error
if result, err = task.Execute(arg); err != nil {
return 0, errors.New("error")
}
arg = result
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
e := new(ExecuterPipe)
e.tasks = tasks
return e
}
//2
type ExecuterFast struct {
tasks []Task
}
func (e ExecuterFast) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var once sync.Once
ret := make(chan result)
for _, task := range e.tasks {
go func(task Task) {
res, err := task.Execute(arg)
once.Do(func() {
ret <- result{res, err}
})
}(task)
}
r := <-ret
return r.res, r.err
}
func Fastest(tasks ...Task) Task {
e := new(ExecuterFast)
e.tasks = tasks
return e
}
//3
type ExecuterTimed struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
e := new(ExecuterTimed)
e.task = task
e.timeout = timeout
return e
}
func (e ExecuterTimed) Execute(arg int) (int, error) {
ch := make(chan result)
go func() {
res, err := e.task.Execute(arg)
ch <- result{res, err}
}()
select {
case r := <-ch:
return r.res, r.err
case <-time.After(e.timeout):
+ go func() {
+ <-ch
+ }()
return 0, errors.New("timeout")
}
}
//4
type ExecuterConcurrent struct {
tasks []Task
reduce func(results []int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
e := new(ExecuterConcurrent)
e.tasks = tasks
e.reduce = reduce
return e
}
func (e ExecuterConcurrent) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var wg sync.WaitGroup
var argMtx sync.Mutex
+ var once sync.Once
finish := make(chan struct{})
fail := make(chan struct{})
var reduceArg []int
reduceArg = make([]int, 0)
go func() {
for _, task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
- close(fail)
+ once.Do(func() {
+ close(fail)
+ })
} else {
argMtx.Lock()
reduceArg = append(reduceArg, res)
argMtx.Unlock()
}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
}()
- for {
- select {
- case <-fail:
- return 0, errors.New("fail")
- case <-finish:
- return e.reduce(reduceArg), nil
- }
+ select {
+ case <-fail:
+ go func() {
+ <-finish
+ }()
+ return 0, errors.New("fail")
+ case <-finish:
+ return e.reduce(reduceArg), nil
}
}
//5
type ExecuterSearcher struct {
errorLimit int
tasks <-chan Task
limitMtx sync.Mutex
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
e := new(ExecuterSearcher)
e.tasks = tasks
e.errorLimit = errorLimit
return e
}
func (e ExecuterSearcher) Execute(arg int) (int, error) {
closed := make(chan struct{})
var wg sync.WaitGroup
max := math.MinInt64
var maxMtx sync.Mutex
count := 0
go func() {
for task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
e.limitMtx.Lock()
e.errorLimit--
e.limitMtx.Unlock()
} else {
if res > max {
maxMtx.Lock()
max = res
maxMtx.Unlock()
}
}
wg.Done()
}(task)
count++
}
closed <- struct{}{}
}()
<-closed
wg.Wait()
if e.errorLimit < 0 {
return 0, errors.New("error limit")
} else if count == 0 {
return 0, errors.New("no tasks")
} else {
return max, nil
}
-}
+}

Радостина обнови решението на 29.11.2016 12:54 (преди над 1 година)

package main
import (
"errors"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type result struct {
res int
err error
}
//1
type ExecuterPipe struct {
tasks []Task
}
func (e ExecuterPipe) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var result int
for _, task := range e.tasks {
var err error
if result, err = task.Execute(arg); err != nil {
return 0, errors.New("error")
}
arg = result
}
return arg, nil
}
func Pipeline(tasks ...Task) Task {
e := new(ExecuterPipe)
e.tasks = tasks
return e
}
//2
type ExecuterFast struct {
tasks []Task
}
func (e ExecuterFast) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var once sync.Once
ret := make(chan result)
for _, task := range e.tasks {
go func(task Task) {
res, err := task.Execute(arg)
once.Do(func() {
ret <- result{res, err}
})
}(task)
}
r := <-ret
return r.res, r.err
}
func Fastest(tasks ...Task) Task {
e := new(ExecuterFast)
e.tasks = tasks
return e
}
//3
type ExecuterTimed struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
e := new(ExecuterTimed)
e.task = task
e.timeout = timeout
return e
}
func (e ExecuterTimed) Execute(arg int) (int, error) {
ch := make(chan result)
go func() {
res, err := e.task.Execute(arg)
ch <- result{res, err}
}()
select {
case r := <-ch:
return r.res, r.err
case <-time.After(e.timeout):
go func() {
<-ch
}()
return 0, errors.New("timeout")
}
}
//4
type ExecuterConcurrent struct {
tasks []Task
reduce func(results []int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
e := new(ExecuterConcurrent)
e.tasks = tasks
e.reduce = reduce
return e
}
func (e ExecuterConcurrent) Execute(arg int) (int, error) {
if len(e.tasks) == 0 {
return 0, errors.New("no tasks")
}
var wg sync.WaitGroup
var argMtx sync.Mutex
var once sync.Once
finish := make(chan struct{})
fail := make(chan struct{})
var reduceArg []int
reduceArg = make([]int, 0)
go func() {
for _, task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
once.Do(func() {
close(fail)
})
} else {
argMtx.Lock()
reduceArg = append(reduceArg, res)
argMtx.Unlock()
}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
}()
select {
case <-fail:
go func() {
<-finish
}()
return 0, errors.New("fail")
case <-finish:
return e.reduce(reduceArg), nil
}
}
//5
type ExecuterSearcher struct {
errorLimit int
tasks <-chan Task
limitMtx sync.Mutex
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
e := new(ExecuterSearcher)
e.tasks = tasks
e.errorLimit = errorLimit
return e
}
func (e ExecuterSearcher) Execute(arg int) (int, error) {
closed := make(chan struct{})
var wg sync.WaitGroup
max := math.MinInt64
var maxMtx sync.Mutex
+ haveResult := false
count := 0
go func() {
for task := range e.tasks {
wg.Add(1)
go func(task Task) {
if res, err := task.Execute(arg); err != nil {
e.limitMtx.Lock()
e.errorLimit--
e.limitMtx.Unlock()
} else {
+ maxMtx.Lock()
if res > max {
- maxMtx.Lock()
max = res
- maxMtx.Unlock()
+ haveResult = true
}
+ maxMtx.Unlock()
}
wg.Done()
}(task)
count++
}
closed <- struct{}{}
}()
<-closed
wg.Wait()
- if e.errorLimit < 0 {
+ if !haveResult {
+ return 0, errors.New("no success")
+ } else if e.errorLimit < 0 {
return 0, errors.New("error limit")
} else if count == 0 {
return 0, errors.New("no tasks")
} else {
return max, nil
}
}