Решение на Concurrent Tasks от Георги Иванов

Обратно към всички решения

Към профила на Георги Иванов

Резултати

  • 13 точки от тестове
  • 0 бонус точки
  • 13 точки общо
  • 13 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type pipelineStruct struct {
tasks []Task
}
func Pipeline(tasks ...Task) Task {
return pipelineStruct{tasks: tasks}
}
func (p pipelineStruct) Execute(param int) (int, error) {
cntTasks := len(p.tasks)
if cntTasks == 0 {
return 0, fmt.Errorf("%d tasks given to execute.", 0)
}
res := param
var err error
for _, task := range p.tasks {
res, err = task.Execute(res)
if err != nil {
return 0, err
}
}
return res, nil
}
type fastestStruct struct {
tasks []Task
}
func Fastest(tasks ...Task) Task {
return fastestStruct{tasks: tasks}
}
type resultStruct struct {
res int
err error
}
func (f fastestStruct) Execute(param int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("%d tasks given to execute.", len(f.tasks))
}
resultChan := make(chan resultStruct, len(f.tasks))
for _, t := range f.tasks {
go func(tt Task) {
res, err := tt.Execute(param)
resultChan <- resultStruct{res, err}
}(t)
}
s := <-resultChan
return s.res, s.err
}
type timedStruct struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
return timedStruct{task: task, timeout: timeout}
}
func execTask(task Task, param int) <-chan resultStruct {
ch := make(chan resultStruct, 1)
go func() {
res, err := task.Execute(param)
ch <- resultStruct{res: res, err: err}
}()
return ch
}
func (t timedStruct) Execute(param int) (int, error) {
var (
res int
err error
)
select {
case s := <-execTask(t.task, param):
res = s.res
err = s.err
case <-time.After(t.timeout):
err = fmt.Errorf("Function timed out.")
}
return res, err
}
type mapReduceStruct struct {
tasks []Task
reduce func(results []int) int
results []int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
results := make([]int, len(tasks))
return mapReduceStruct{tasks: tasks, reduce: reduce, results: results}
}
func (m mapReduceStruct) Execute(param int) (int, error) {
if len(m.tasks) == 0 {
return 0, fmt.Errorf("No tasks given to execute.")
}
numTasks := len(m.tasks)
resultChan := make(chan resultStruct, len(m.tasks))
for _, task := range m.tasks {
go func(t Task) {
res, err := t.Execute(param)
resultChan <- resultStruct{res: res, err: err}
}(task)
}
for i := 0; i < numTasks; i++ {
s := <-resultChan
if s.err != nil {
return 0, s.err
}
m.results[i] = s.res
}
return m.reduce(m.results), nil
}
type greatestSearcherStruct struct {
mtx sync.Mutex
tasks <-chan Task
errorLimit int
maxRes int
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
var mtx sync.Mutex
return greatestSearcherStruct{
mtx: mtx,
tasks: tasks,
errorLimit: errorLimit,
maxRes: math.MinInt64,
}
}
func (g greatestSearcherStruct) Execute(param int) (int, error) {
 Като резултат `Execute()` метода на вашия тип, след приключването на всички задачи, трябва да върне най-голямото число, което *някоя задача е върнала*. Но ако повече от `errorLimit` задачи са върнали грешка или по `tasks` не бъдат подадени никакви задачи, `Execute()` трябва да върне грешка.
cntErrors := 0
cntTasks := 0
var wg sync.WaitGroup
for task := range g.tasks {
cntTasks++
wg.Add(1)
go func(task Task) {
defer wg.Done()
res, err := task.Execute(param)
if err != nil {
g.mtx.Lock()
cntErrors++
g.mtx.Unlock()
} else {
g.mtx.Lock()
if res > g.maxRes {
g.maxRes = res
}
g.mtx.Unlock()
}
}(task)
}
wg.Wait()
if cntErrors > g.errorLimit || cntTasks == 0 || cntErrors == cntTasks {
return 0, fmt.Errorf("Exceeded error limit or no tasks given.")
}
return g.maxRes, nil
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.004s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.008s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.008s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.103s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.203s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.134s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.203s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.003s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.003s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.003s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.048s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.048s
PASS
ok  	_/tmp/d20161129-30451-7smwkj	0.123s

История (4 версии и 6 коментара)

Георги обнови решението на 25.11.2016 20:01 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Pipeline_struct struct {
+ tasksChan chan Task
+ numTasks int
+}
+
+func Pipeline(tasks ...Task) Task {
+ tasksChan := make(chan Task, len(tasks))
+ for _, t := range tasks {
+ tasksChan <- t
+ }
+ return Pipeline_struct{tasksChan: tasksChan, numTasks: len(tasks)}
+}
+
+func (p Pipeline_struct) Execute(param int) (int, error) {
+ if p.numTasks == 0 {
+ return 0, fmt.Errorf("%d tasks given to execute.", 0)
+ }
+
+ res := param
+ var err error
+ for indF := 0; indF < p.numTasks; indF++ {
+ t := <-p.tasksChan
+ res, err = t.Execute(res)
+ if err != nil {
+ return 0, fmt.Errorf("Function %d returned error.", indF)
+ }
+ }
+ return res, nil
+}
+
+type fastestStruct struct {
+ tasks []Task
+}
+
+func Fastest(tasks ...Task) Task {
+ return fastestStruct{tasks: tasks}
+}
+
+type resultStruct struct {
+ res int
+ err error
+}
+
+func (f fastestStruct) Execute(param int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("%d tasks given to execute.", len(f.tasks))
+ }
+ resultChan := make(chan resultStruct)
+ defer close(resultChan)
+ for _, t := range f.tasks {
+ go func(tt Task) {
+ res, err := tt.Execute(param)
+ resultChan <- resultStruct{res, err}
+ }(t)
+ }
+ s := <-resultChan
+ numTasks := len(f.tasks)
+ for i := 1; i < numTasks; i++ {
+ <-resultChan
+ }
+
+ return s.res, s.err
+}
+
+type timedStruct struct {
+ task Task
+ timeout time.Duration
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return timedStruct{task: task, timeout: timeout}
+}
+
+func execTask(task Task, param int, syncChan chan struct{} /*wg *sync.WaitGroup*/) <-chan resultStruct {
+ ch := make(chan resultStruct, 1)
+
+ go func() {
+ defer func() { syncChan <- struct{}{} }()
+ res, err := task.Execute(param)
+ ch <- resultStruct{res: res, err: err}
+ }()
+ return ch
+}
+
+func (t timedStruct) Execute(param int) (int, error) {
+ var (
+ res int
+ err error
+ )
+ syncChan := make(chan struct{})
+ defer close(syncChan)
+ select {
+ case s := <-execTask(t.task, param, syncChan /* &wg*/):
+ res = s.res
+ err = s.err
+ case <-time.After(t.timeout):
+ err = fmt.Errorf("Function timed out.")
+
+ }
+ <-syncChan
+ return res, err
+}
+
+type mapReduceStruct struct {
+ tasks []Task
+ reduce func(results []int) int
+ results []int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ results := make([]int, len(tasks))
+ return mapReduceStruct{tasks: tasks, reduce: reduce, results: results /*, mtx: mtx*/}
+}
+
+func (m mapReduceStruct) Execute(param int) (int, error) {
+ if len(m.tasks) == 0 {
+ return 0, fmt.Errorf("%d tasks given to execute.", len(m.tasks))
+ }
+ numTasks := len(m.tasks)
+ resultChan := make(chan resultStruct, len(m.tasks))
+ defer close(resultChan)
+ for _, task := range m.tasks {
+ go func(t Task) {
+ res, err := t.Execute(param)
+ resultChan <- resultStruct{res: res, err: err}
+ }(task)
+ }
+
+ cntSuccessful := 0
+ for i := 0; i < numTasks; i++ {
+ s := <-resultChan
+ if s.err != nil {
+ break
+ } else {
+ cntSuccessful++
+ }
+ m.results[i] = s.res
+ }
+ if cntSuccessful != numTasks {
+ for i := 0; i < numTasks-cntSuccessful; i++ {
+ <-resultChan
+ }
+ return 0, fmt.Errorf("Only %d successful functions.", cntSuccessful)
+ } else {
+ return m.reduce(m.results), nil
+ }
+
+}
+
+type greatestSearcherStruct struct {
+ mtx sync.Mutex
+ tasks <-chan Task
+ errorLimit int
+ maxRes int
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ var mtx sync.Mutex
+ return greatestSearcherStruct{
+ mtx: mtx,
+ tasks: tasks,
+ errorLimit: errorLimit,
+ maxRes: math.MinInt64,
+ }
+}
+
+func (g greatestSearcherStruct) Execute(param int) (int, error) {
+ cntErrors := 0
+ cntTasks := 0
+ var wg sync.WaitGroup
+ for task := range g.tasks {
+ cntTasks++
+ wg.Add(1)
+ go func(task Task) {
+ defer wg.Done()
+ res, err := task.Execute(param)
+ if err != nil {
+ g.mtx.Lock()
+ cntErrors++
+ g.mtx.Unlock()
+ } else {
+ g.mtx.Lock()
+ if res > g.maxRes {
+ g.maxRes = res
+ }
+ g.mtx.Unlock()
+ }
+
+ }(task)
+ }
+ wg.Wait()
+
+ if cntErrors > 2 || cntTasks == 0 {
+ return 0, fmt.Errorf("Exceeded error limit or no tasks given.")
+ }
+ return g.maxRes, nil
+}
  • Защо ползваш канал в Pipeline структурата? Какви предимства ти дава пред slice при положение, че там дори нямаме конкурентност?
  • Синхронизацията ти в Timed ми се струва леко странна, сигурен ли си, че работи както се очаква?
  • Виж къде в GreatestSearcher() (не) ползваш errorLimit ;) Съвсем набързо

За 1 и 3 абсолютно недоглеждане. А за 2, не съм сигурен дали съм разбрал правилно. Нали се очаква ако задачата не успее да завърши в даденото време, да върнем грешка, но все пак да я изчакаме да приключи преди това?

Георги обнови решението на 26.11.2016 10:55 (преди над 1 година)

package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type Pipeline_struct struct {
- tasksChan chan Task
- numTasks int
+ tasks []Task
}
func Pipeline(tasks ...Task) Task {
- tasksChan := make(chan Task, len(tasks))
- for _, t := range tasks {
- tasksChan <- t
- }
- return Pipeline_struct{tasksChan: tasksChan, numTasks: len(tasks)}
+ return Pipeline_struct{tasks: tasks}
}
func (p Pipeline_struct) Execute(param int) (int, error) {
- if p.numTasks == 0 {
+ cntTasks := len(p.tasks)
+ if cntTasks == 0 {
return 0, fmt.Errorf("%d tasks given to execute.", 0)
}
res := param
var err error
- for indF := 0; indF < p.numTasks; indF++ {
- t := <-p.tasksChan
- res, err = t.Execute(res)
+ for _, task := range p.tasks {
+ res, err = task.Execute(res)
if err != nil {
- return 0, fmt.Errorf("Function %d returned error.", indF)
+ return 0, fmt.Errorf("Function %d returned error.")
}
}
return res, nil
}
type fastestStruct struct {
tasks []Task
}
func Fastest(tasks ...Task) Task {
return fastestStruct{tasks: tasks}
}
type resultStruct struct {
res int
err error
}
func (f fastestStruct) Execute(param int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("%d tasks given to execute.", len(f.tasks))
}
- resultChan := make(chan resultStruct)
- defer close(resultChan)
+ resultChan := make(chan resultStruct, len(f.tasks))
for _, t := range f.tasks {
go func(tt Task) {
res, err := tt.Execute(param)
resultChan <- resultStruct{res, err}
}(t)
}
s := <-resultChan
- numTasks := len(f.tasks)
- for i := 1; i < numTasks; i++ {
- <-resultChan
- }
return s.res, s.err
}
type timedStruct struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
return timedStruct{task: task, timeout: timeout}
}
-func execTask(task Task, param int, syncChan chan struct{} /*wg *sync.WaitGroup*/) <-chan resultStruct {
+func execTask(task Task, param int) <-chan resultStruct {
ch := make(chan resultStruct, 1)
go func() {
- defer func() { syncChan <- struct{}{} }()
res, err := task.Execute(param)
ch <- resultStruct{res: res, err: err}
}()
return ch
}
func (t timedStruct) Execute(param int) (int, error) {
var (
res int
err error
)
- syncChan := make(chan struct{})
- defer close(syncChan)
select {
- case s := <-execTask(t.task, param, syncChan /* &wg*/):
+ case s := <-execTask(t.task, param /*, syncChan /* &wg*/):
res = s.res
err = s.err
case <-time.After(t.timeout):
err = fmt.Errorf("Function timed out.")
}
- <-syncChan
return res, err
}
type mapReduceStruct struct {
tasks []Task
reduce func(results []int) int
results []int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
results := make([]int, len(tasks))
- return mapReduceStruct{tasks: tasks, reduce: reduce, results: results /*, mtx: mtx*/}
+ return mapReduceStruct{tasks: tasks, reduce: reduce, results: results}
}
func (m mapReduceStruct) Execute(param int) (int, error) {
if len(m.tasks) == 0 {
- return 0, fmt.Errorf("%d tasks given to execute.", len(m.tasks))
+ return 0, fmt.Errorf("No tasks given to execute.")
}
numTasks := len(m.tasks)
resultChan := make(chan resultStruct, len(m.tasks))
defer close(resultChan)
for _, task := range m.tasks {
go func(t Task) {
res, err := t.Execute(param)
resultChan <- resultStruct{res: res, err: err}
}(task)
}
- cntSuccessful := 0
for i := 0; i < numTasks; i++ {
s := <-resultChan
if s.err != nil {
- break
- } else {
- cntSuccessful++
+ return 0, fmt.Errorf("Function returned error.")
}
+
m.results[i] = s.res
}
- if cntSuccessful != numTasks {
- for i := 0; i < numTasks-cntSuccessful; i++ {
- <-resultChan
- }
- return 0, fmt.Errorf("Only %d successful functions.", cntSuccessful)
- } else {
- return m.reduce(m.results), nil
- }
+ return m.reduce(m.results), nil
}
type greatestSearcherStruct struct {
mtx sync.Mutex
tasks <-chan Task
errorLimit int
maxRes int
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
var mtx sync.Mutex
return greatestSearcherStruct{
mtx: mtx,
tasks: tasks,
errorLimit: errorLimit,
maxRes: math.MinInt64,
}
}
func (g greatestSearcherStruct) Execute(param int) (int, error) {
 Като резултат `Execute()` метода на вашия тип, след приключването на всички задачи, трябва да върне най-голямото число, което *някоя задача е върнала*. Но ако повече от `errorLimit` задачи са върнали грешка или по `tasks` не бъдат подадени никакви задачи, `Execute()` трябва да върне грешка.
cntErrors := 0
cntTasks := 0
var wg sync.WaitGroup
for task := range g.tasks {
cntTasks++
wg.Add(1)
go func(task Task) {
defer wg.Done()
res, err := task.Execute(param)
if err != nil {
g.mtx.Lock()
cntErrors++
g.mtx.Unlock()
} else {
g.mtx.Lock()
if res > g.maxRes {
g.maxRes = res
}
g.mtx.Unlock()
}
}(task)
}
wg.Wait()
- if cntErrors > 2 || cntTasks == 0 {
+ if cntErrors > g.errorLimit || cntTasks == 0 {
return 0, fmt.Errorf("Exceeded error limit or no tasks given.")
}
return g.maxRes, nil
}

Георги обнови решението на 28.11.2016 23:01 (преди над 1 година)

package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
-type Pipeline_struct struct {
+type pipelineStruct struct {
tasks []Task
}
func Pipeline(tasks ...Task) Task {
- return Pipeline_struct{tasks: tasks}
+ return pipelineStruct{tasks: tasks}
}
-func (p Pipeline_struct) Execute(param int) (int, error) {
+func (p pipelineStruct) Execute(param int) (int, error) {
cntTasks := len(p.tasks)
if cntTasks == 0 {
return 0, fmt.Errorf("%d tasks given to execute.", 0)
}
res := param
var err error
for _, task := range p.tasks {
res, err = task.Execute(res)
if err != nil {
return 0, fmt.Errorf("Function %d returned error.")
}
}
return res, nil
}
type fastestStruct struct {
tasks []Task
}
func Fastest(tasks ...Task) Task {
return fastestStruct{tasks: tasks}
}
type resultStruct struct {
res int
err error
}
func (f fastestStruct) Execute(param int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("%d tasks given to execute.", len(f.tasks))
}
resultChan := make(chan resultStruct, len(f.tasks))
for _, t := range f.tasks {
go func(tt Task) {
res, err := tt.Execute(param)
resultChan <- resultStruct{res, err}
}(t)
}
s := <-resultChan
return s.res, s.err
}
type timedStruct struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
return timedStruct{task: task, timeout: timeout}
}
func execTask(task Task, param int) <-chan resultStruct {
ch := make(chan resultStruct, 1)
go func() {
res, err := task.Execute(param)
ch <- resultStruct{res: res, err: err}
}()
return ch
}
func (t timedStruct) Execute(param int) (int, error) {
var (
res int
err error
)
select {
case s := <-execTask(t.task, param /*, syncChan /* &wg*/):
res = s.res
err = s.err
case <-time.After(t.timeout):
err = fmt.Errorf("Function timed out.")
}
return res, err
}
type mapReduceStruct struct {
tasks []Task
reduce func(results []int) int
results []int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
results := make([]int, len(tasks))
return mapReduceStruct{tasks: tasks, reduce: reduce, results: results}
}
func (m mapReduceStruct) Execute(param int) (int, error) {
if len(m.tasks) == 0 {
return 0, fmt.Errorf("No tasks given to execute.")
}
numTasks := len(m.tasks)
resultChan := make(chan resultStruct, len(m.tasks))
- defer close(resultChan)
for _, task := range m.tasks {
go func(t Task) {
res, err := t.Execute(param)
resultChan <- resultStruct{res: res, err: err}
}(task)
}
for i := 0; i < numTasks; i++ {
s := <-resultChan
if s.err != nil {
return 0, fmt.Errorf("Function returned error.")
}
m.results[i] = s.res
}
return m.reduce(m.results), nil
}
type greatestSearcherStruct struct {
mtx sync.Mutex
tasks <-chan Task
errorLimit int
maxRes int
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
var mtx sync.Mutex
return greatestSearcherStruct{
mtx: mtx,
tasks: tasks,
errorLimit: errorLimit,
maxRes: math.MinInt64,
}
}
func (g greatestSearcherStruct) Execute(param int) (int, error) {
cntErrors := 0
cntTasks := 0
var wg sync.WaitGroup
for task := range g.tasks {
cntTasks++
wg.Add(1)
go func(task Task) {
defer wg.Done()
res, err := task.Execute(param)
if err != nil {
g.mtx.Lock()
cntErrors++
g.mtx.Unlock()
} else {
g.mtx.Lock()
if res > g.maxRes {
g.maxRes = res
}
g.mtx.Unlock()
}
}(task)
}
wg.Wait()
- if cntErrors > g.errorLimit || cntTasks == 0 {
+ if cntErrors > g.errorLimit || cntTasks == 0 || cntErrors == cntTasks {
return 0, fmt.Errorf("Exceeded error limit or no tasks given.")
}
return g.maxRes, nil
}

Георги обнови решението на 29.11.2016 15:53 (преди над 1 година)

package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type pipelineStruct struct {
tasks []Task
}
func Pipeline(tasks ...Task) Task {
return pipelineStruct{tasks: tasks}
}
func (p pipelineStruct) Execute(param int) (int, error) {
cntTasks := len(p.tasks)
if cntTasks == 0 {
return 0, fmt.Errorf("%d tasks given to execute.", 0)
}
res := param
var err error
for _, task := range p.tasks {
res, err = task.Execute(res)
if err != nil {
- return 0, fmt.Errorf("Function %d returned error.")
+ return 0, err
}
}
return res, nil
}
type fastestStruct struct {
tasks []Task
}
func Fastest(tasks ...Task) Task {
return fastestStruct{tasks: tasks}
}
type resultStruct struct {
res int
err error
}
func (f fastestStruct) Execute(param int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("%d tasks given to execute.", len(f.tasks))
}
resultChan := make(chan resultStruct, len(f.tasks))
for _, t := range f.tasks {
go func(tt Task) {
res, err := tt.Execute(param)
resultChan <- resultStruct{res, err}
}(t)
}
s := <-resultChan
return s.res, s.err
}
type timedStruct struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
return timedStruct{task: task, timeout: timeout}
}
func execTask(task Task, param int) <-chan resultStruct {
ch := make(chan resultStruct, 1)
go func() {
res, err := task.Execute(param)
ch <- resultStruct{res: res, err: err}
}()
return ch
}
func (t timedStruct) Execute(param int) (int, error) {
var (
res int
err error
)
select {
- case s := <-execTask(t.task, param /*, syncChan /* &wg*/):
+ case s := <-execTask(t.task, param):
res = s.res
err = s.err
case <-time.After(t.timeout):
err = fmt.Errorf("Function timed out.")
}
return res, err
}
type mapReduceStruct struct {
tasks []Task
reduce func(results []int) int
results []int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
results := make([]int, len(tasks))
return mapReduceStruct{tasks: tasks, reduce: reduce, results: results}
}
func (m mapReduceStruct) Execute(param int) (int, error) {
if len(m.tasks) == 0 {
return 0, fmt.Errorf("No tasks given to execute.")
}
numTasks := len(m.tasks)
resultChan := make(chan resultStruct, len(m.tasks))
for _, task := range m.tasks {
go func(t Task) {
res, err := t.Execute(param)
resultChan <- resultStruct{res: res, err: err}
}(task)
}
for i := 0; i < numTasks; i++ {
s := <-resultChan
if s.err != nil {
- return 0, fmt.Errorf("Function returned error.")
+ return 0, s.err
}
m.results[i] = s.res
}
return m.reduce(m.results), nil
}
type greatestSearcherStruct struct {
mtx sync.Mutex
tasks <-chan Task
errorLimit int
maxRes int
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
var mtx sync.Mutex
return greatestSearcherStruct{
mtx: mtx,
tasks: tasks,
errorLimit: errorLimit,
maxRes: math.MinInt64,
}
}
func (g greatestSearcherStruct) Execute(param int) (int, error) {
cntErrors := 0
cntTasks := 0
var wg sync.WaitGroup
for task := range g.tasks {
cntTasks++
wg.Add(1)
go func(task Task) {
defer wg.Done()
res, err := task.Execute(param)
if err != nil {
g.mtx.Lock()
cntErrors++
g.mtx.Unlock()
} else {
g.mtx.Lock()
if res > g.maxRes {
g.maxRes = res
}
g.mtx.Unlock()
}
}(task)
}
wg.Wait()
if cntErrors > g.errorLimit || cntTasks == 0 || cntErrors == cntTasks {
return 0, fmt.Errorf("Exceeded error limit or no tasks given.")
}
return g.maxRes, nil
}