Решение на Concurrent Tasks от Константин Тодоров

Обратно към всички решения

Към профила на Константин Тодоров

Резултати

  • 11 точки от тестове
  • 0 бонус точки
  • 11 точки общо
  • 11 успешни тест(а)
  • 2 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type TaskResult struct {
result int
err error
}
type asyncTask struct {
tasks []Task
concurrent bool
timeout time.Duration
reduceFunc func(results []int) int
errorLimit int
taskChannel <-chan Task
}
func CreateAsyncTask(tasks []Task, concurrent bool, timeout time.Duration, reduceFunc func(results []int) int, errorLimit int, taskChannel <-chan Task) Task {
var newTask = new(asyncTask)
newTask.tasks = tasks
newTask.concurrent = concurrent
newTask.timeout = timeout
newTask.reduceFunc = reduceFunc
newTask.errorLimit = errorLimit
newTask.taskChannel = taskChannel
return newTask
}

Това че всичко е един тип не ми харесва - хаби място когато половината полета не се ползват и прави долния метод ненужно сложен като между временно не печелиш нищо освен че ще трябва да се грижиш долния метод да не стане грешен всеки път когато добавяш нов "тип" задача

func (a asyncTask) Execute(number int) (int, error) {
if (a.tasks == nil || len(a.tasks) == 0) && a.taskChannel == nil {
return 0, errors.New("No tasks given")
}
var taskResult TaskResult
if a.taskChannel != nil {
taskResult = a.StartTaskExecutorChannel(number)
} else if a.reduceFunc != nil {
taskResult = a.ExecuteAndReduce(number)
} else if a.concurrent {
taskResult = a.ExecuteFastest(number)
} else if a.timeout != 0 {
taskResult = a.ExecuteTasksWithTimeout(number)
} else {
taskResult = a.ExecuteTasks(number)
}
return taskResult.result, taskResult.err
}
func (a asyncTask) ExecuteTasks(number int) TaskResult {
var currentResult = number
var err error
var nextResult = 0
for i := 0; i < len(a.tasks); i++ {
nextResult, err = a.tasks[i].Execute(currentResult)
currentResult = nextResult
// If error occured, we leave without running the next tasks
if err != nil {
return TaskResult{currentResult, err}
}
}
return TaskResult{currentResult, nil}
}
func (a asyncTask) ExecuteTasksWithTimeout(number int) TaskResult {
var result = number
var err error
for i := 0; i < len(a.tasks); i++ {
select {
case taskResult := <-fetchResult(a.tasks[i], number):
result = taskResult.result
err = taskResult.err
case <-time.After(a.timeout):
result = 0
err = errors.New("Timed out")
}
// If error occured, we leave without running the next tasks
if err != nil {
return TaskResult{result, err}
}
}
return TaskResult{result, nil}
}
func (a asyncTask) ExecuteAndReduce(number int) TaskResult {
var wg sync.WaitGroup
var syncMtx sync.Mutex
var errorMtx sync.Mutex
var result = 0
var err error
var results = []int{}
for i := 0; i < len(a.tasks); i++ {
wg.Add(1)
// Add every task to the wait group, so we can finish only when there are no running tasks
go func(index int) {
currentResult, currentErr := a.tasks[index].Execute(number)
if currentErr != nil {
errorMtx.Lock()
err = currentErr
errorMtx.Unlock()
wg.Done()
return
}
// Add the result to the current list of results of all tasks
syncMtx.Lock()
results = append(results, currentResult)
if len(results) == len(a.tasks) { // Then it's the last task
result = a.reduceFunc(results)
err = nil
}
syncMtx.Unlock()
wg.Done()
}(i)
}
wg.Wait()
return TaskResult{result, err}
}
func (a asyncTask) ExecuteFastest(number int) TaskResult {
var wg sync.WaitGroup
var syncMtx sync.Mutex
var result = 0
var err error
var finished = false
wg.Add(1)
for i := 0; i < len(a.tasks); i++ {
// Add every task to the wait group, so we can finish only when there are no running tasks
go func(index int) {
currentResult, currentErr := a.tasks[index].Execute(number)
syncMtx.Lock()
// We only want the result from the fastest task
if !finished {
result = currentResult
err = currentErr
finished = true
wg.Done()
}
syncMtx.Unlock()
}(i)
}
wg.Wait()
return TaskResult{result, err}
}
func fetchResult(task Task, parameter int) <-chan TaskResult {
ch := make(chan TaskResult)
go func() {
response, err := task.Execute(parameter)
ch <- TaskResult{response, err}
}()
return ch
}
func (a asyncTask) StartTaskExecutorChannel(number int) TaskResult {
var maxTaskResult = 0
var errorsOccured = 0
var wg sync.WaitGroup
var maxResultMtx sync.Mutex
var errorMtx sync.Mutex
wg.Add(1)
go func() {
for taskForExecuting := range a.taskChannel {
// Add every task to the wait group, so we can finish only when there are no running tasks
wg.Add(1)
go func(task Task) {
currentResult, currentErr := task.Execute(number)
if currentErr != nil {
errorMtx.Lock()
errorsOccured++
errorMtx.Unlock()
}
maxResultMtx.Lock()
if currentErr == nil && currentResult > maxTaskResult {
maxTaskResult = currentResult
}
maxResultMtx.Unlock()
wg.Done()
}(taskForExecuting)
}
wg.Done()
}()
wg.Wait()
if errorsOccured > a.errorLimit {
return TaskResult{0, errors.New("Max number of errors occured!")}
}
return TaskResult{maxTaskResult, nil}
}
func Pipeline(tasks ...Task) Task {
return CreateAsyncTask(tasks, false, 0, nil, 0, nil)
}
func Fastest(tasks ...Task) Task {
return CreateAsyncTask(tasks, true, 0, nil, 0, nil)
}
func Timed(task Task, timeout time.Duration) Task {
return CreateAsyncTask([]Task{task}, false, timeout, nil, 0, nil)
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return CreateAsyncTask(tasks, true, 0, reduce, 0, nil)
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return CreateAsyncTask(nil, false, 0, nil, errorLimit, tasks)
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.103s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.203s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.135s
--- FAIL: TestTimedDoesntLeaveGoroutineHanging (0.20s)
	solution_test.go:216: Expected that there will be as many goroutines as at the start(3) after Timed task has finished after it has timeouted but got 4
		
		BEFORE:
		goroutine profile: total 2
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x46c456 0x46f4cd 0x46c0f1 0x46d165 0x46c785 0x401276 0x42a494 0x459f51
		#	0x46c455	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f4cc	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c0f0	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d164	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c784	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-1rm2t3u/_test/_testmain.go:78
		#	0x42a493	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x4c382f 0x4c3630 0x4c02e1 0x473b33 0x46c0f1 0x459f51
		#	0x4c382e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c362f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4c02e0	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x473b32	_/tmp/d20161129-30451-1rm2t3u.TestTimedDoesntLeaveGoroutineHanging+0x172	/tmp/d20161129-30451-1rm2t3u/solution_test.go:191
		#	0x46c0f0	testing.tRunner+0x80								/usr/local/go/src/testing/testing.go:610
		
		
		
		AFTER:
		goroutine profile: total 4
		1 @ 0x42a8fa 0x42a9ee 0x4039a8 0x40376d 0x4761f3 0x459f51
		#	0x4761f2	_/tmp/d20161129-30451-1rm2t3u.fetchResult.func1+0x92	/tmp/d20161129-30451-1rm2t3u/solution.go:178
		
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x46c456 0x46f4cd 0x46c0f1 0x46d165 0x46c785 0x401276 0x42a494 0x459f51
		#	0x46c455	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f4cc	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c0f0	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d164	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c784	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-1rm2t3u/_test/_testmain.go:78
		#	0x42a493	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x473d1b 0x46c0f1 0x459f51
		#	0x473d1a	_/tmp/d20161129-30451-1rm2t3u.TestTimedDoesntLeaveGoroutineHanging+0x35a	/tmp/d20161129-30451-1rm2t3u/solution_test.go:225
		#	0x46c0f0	testing.tRunner+0x80								/usr/local/go/src/testing/testing.go:610
		
		1 @ 0x4c382f 0x4c3630 0x4c02e1 0x476da1 0x459f51
		#	0x4c382e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c362f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4c02e0	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x476da0	_/tmp/d20161129-30451-1rm2t3u.TestTimedDoesntLeaveGoroutineHanging.func2+0x1a0	/tmp/d20161129-30451-1rm2t3u/solution_test.go:212
		
		
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1rm2t3u	0.204s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.004s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/close_immediately (0.00s)
    	solution_test.go:322: Expected error did not occur instead got 0
    --- FAIL: TestGreatestSearcherErrors/only_failure (0.00s)
    	solution_test.go:335: Expected error did not occur instead got 0
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1rm2t3u	0.048s
PASS
ok  	_/tmp/d20161129-30451-1rm2t3u	0.123s

История (2 версии и 3 коментара)

Константин обнови решението на 27.11.2016 22:54 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type TaskResult struct {
+ result int
+ err error
+}
+
+type asyncTask struct {
+ tasks []Task
+ concurrent bool
+ timeout time.Duration
+ reduceFunc func(results []int) int
+
+ errorLimit int
+ taskChannel <-chan Task
+}
+
+func CreateAsyncTask(tasks []Task, concurrent bool, timeout time.Duration, reduceFunc func(results []int) int, errorLimit int, taskChannel <-chan Task) Task {
+ var newTask = new(asyncTask)
+ newTask.tasks = tasks
+ newTask.concurrent = concurrent
+ newTask.timeout = timeout
+ newTask.reduceFunc = reduceFunc
+ newTask.errorLimit = errorLimit
+ newTask.taskChannel = taskChannel
+ return newTask
+}

Това че всичко е един тип не ми харесва - хаби място когато половината полета не се ползват и прави долния метод ненужно сложен като между временно не печелиш нищо освен че ще трябва да се грижиш долния метод да не стане грешен всеки път когато добавяш нов "тип" задача

+
+func (a asyncTask) Execute(number int) (int, error) {
+ if (a.tasks == nil || len(a.tasks) == 0) && a.taskChannel == nil {
+ return 0, errors.New("No tasks given")
+ }
+
+ var taskResult TaskResult
+
+ if a.taskChannel != nil {
+ taskResult = a.StartTaskExecutorChannel(number)
+ } else if a.concurrent {
+ taskResult = a.ExecuteTasksAsync(number)
+ } else {
+ taskResult = a.ExecuteTasks(number)
+ }
+
+ return taskResult.result, taskResult.err
+}
+
+func (a asyncTask) ExecuteTasks(number int) TaskResult {
+ var currentResult = number
+ var err error
+ var nextResult = 0
+
+ for i := 0; i < len(a.tasks); i++ {
+ // If there is set timeout, then we must use it
+ // Otherwise, wait forever...
+ if a.timeout != 0 {
+ select {
+ case taskResult := <-fetchResult(a.tasks[i], currentResult):
+ nextResult = taskResult.result
+ err = taskResult.err
+ case <-time.After(a.timeout):
+ return TaskResult{0, errors.New("Timed out")}
+ }
+ } else {
+ nextResult, err = a.tasks[i].Execute(currentResult)
+ }
+
+ currentResult = nextResult
+
+ // If error occured, we leave without running the next tasks
+ if err != nil {
+ return TaskResult{currentResult, err}
+ }
+ }
+
+ return TaskResult{currentResult, nil}
+}
+
+func (a asyncTask) ExecuteTasksAsync(number int) TaskResult {
+ var wg sync.WaitGroup
+ var syncMtx sync.Mutex
+ var errorMtx sync.Mutex
+
+ var result = 0
+ var err error
+
+ var results = []int{}
+
+ var finished = false
+
+ for i := 0; i < len(a.tasks); i++ {
+ // Add every task to the wait group, so we can finish only when there are no running tasks
+ wg.Add(1)
+ go func(index int) {
+ currentResult, currentErr := a.tasks[index].Execute(number)
+ if a.reduceFunc != nil {
+ if currentErr != nil {
+ errorMtx.Lock()
+ err = currentErr
+ errorMtx.Unlock()
+ wg.Done()
+ return
+ }
+
+ // Add the result to the current list of results of all tasks
+ syncMtx.Lock()
+ results = append(results, currentResult)
+ if len(results) == len(a.tasks) { // Then it's the last task
+ result = a.reduceFunc(results)
+ err = nil
+ }
+ syncMtx.Unlock()
+
+ } else {
+ syncMtx.Lock()
+ // We only want the result from the fastest task
+ if !finished {
+ result = currentResult
+ err = currentErr
+ finished = true
+ }
+ syncMtx.Unlock()
+
+ }
+
+ wg.Done()
+ }(i)
+ }
+
+ wg.Wait()
+ return TaskResult{result, err}
+}
+
+func fetchResult(task Task, parameter int) <-chan TaskResult {
+ ch := make(chan TaskResult)
+
+ go func() {
+ if response, err := task.Execute(parameter); err == nil {
+ ch <- TaskResult{response, err}
+ }
+ }()
+
+ return ch
+}
+
+func (a asyncTask) StartTaskExecutorChannel(number int) TaskResult {
+ var maxTaskResult = 0
+ var errorsOccured = 0
+
+ var wg sync.WaitGroup
+
+ var maxResultMtx sync.Mutex
+ var errorMtx sync.Mutex
+
+ wg.Add(1)
+ go func() {
+ for taskForExecuting := range a.taskChannel {
+ // Add every task to the wait group, so we can finish only when there are no running tasks
+ wg.Add(1)
+ go func(task Task) {
+ currentResult, currentErr := task.Execute(number)
+
+ if currentErr != nil {
+ errorMtx.Lock()
+ errorsOccured++
+ errorMtx.Unlock()
+ }
+
+ maxResultMtx.Lock()
+ if currentErr == nil && currentResult > maxTaskResult {
+ maxTaskResult = currentResult
+ }
+ maxResultMtx.Unlock()
+ wg.Done()
+
+ }(taskForExecuting)
+ }
+ wg.Done()
+ }()
+
+ wg.Wait()
+ if errorsOccured > a.errorLimit {
+ return TaskResult{0, errors.New("Max number of errors occured!")}
+ }
+
+ return TaskResult{maxTaskResult, nil}
+}
+
+func Pipeline(tasks ...Task) Task {
+ newTask := CreateAsyncTask(tasks, false, 0, nil, 0, nil)
+ return newTask
+}
+
+func Fastest(tasks ...Task) Task {
+ newTask := CreateAsyncTask(tasks, true, 0, nil, 0, nil)
+ return newTask
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ newTask := CreateAsyncTask([]Task{task}, false, timeout, nil, 0, nil)
+ return newTask
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ newTask := CreateAsyncTask(tasks, true, 0, reduce, 0, nil)
+ return newTask
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ newTask := CreateAsyncTask(nil, false, 0, nil, errorLimit, tasks)
+ return newTask
+}

Константин обнови решението на 28.11.2016 21:03 (преди над 1 година)

package main
import (
"errors"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type TaskResult struct {
result int
err error
}
type asyncTask struct {
tasks []Task
concurrent bool
timeout time.Duration
reduceFunc func(results []int) int
errorLimit int
taskChannel <-chan Task
}
func CreateAsyncTask(tasks []Task, concurrent bool, timeout time.Duration, reduceFunc func(results []int) int, errorLimit int, taskChannel <-chan Task) Task {
var newTask = new(asyncTask)
newTask.tasks = tasks
newTask.concurrent = concurrent
newTask.timeout = timeout
newTask.reduceFunc = reduceFunc
newTask.errorLimit = errorLimit
newTask.taskChannel = taskChannel
return newTask
}
func (a asyncTask) Execute(number int) (int, error) {
if (a.tasks == nil || len(a.tasks) == 0) && a.taskChannel == nil {
return 0, errors.New("No tasks given")
}
var taskResult TaskResult
if a.taskChannel != nil {
taskResult = a.StartTaskExecutorChannel(number)
+ } else if a.reduceFunc != nil {
+ taskResult = a.ExecuteAndReduce(number)
} else if a.concurrent {
- taskResult = a.ExecuteTasksAsync(number)
+ taskResult = a.ExecuteFastest(number)
+ } else if a.timeout != 0 {
+ taskResult = a.ExecuteTasksWithTimeout(number)
} else {
taskResult = a.ExecuteTasks(number)
}
return taskResult.result, taskResult.err
}
func (a asyncTask) ExecuteTasks(number int) TaskResult {
var currentResult = number
var err error
var nextResult = 0
for i := 0; i < len(a.tasks); i++ {
- // If there is set timeout, then we must use it
- // Otherwise, wait forever...
- if a.timeout != 0 {
- select {
- case taskResult := <-fetchResult(a.tasks[i], currentResult):
- nextResult = taskResult.result
- err = taskResult.err
- case <-time.After(a.timeout):
- return TaskResult{0, errors.New("Timed out")}
- }
- } else {
- nextResult, err = a.tasks[i].Execute(currentResult)
- }
+ nextResult, err = a.tasks[i].Execute(currentResult)
currentResult = nextResult
// If error occured, we leave without running the next tasks
if err != nil {
return TaskResult{currentResult, err}
}
}
return TaskResult{currentResult, nil}
}
-func (a asyncTask) ExecuteTasksAsync(number int) TaskResult {
+func (a asyncTask) ExecuteTasksWithTimeout(number int) TaskResult {
+ var result = number
+ var err error
+
+ for i := 0; i < len(a.tasks); i++ {
+ select {
+ case taskResult := <-fetchResult(a.tasks[i], number):
+ result = taskResult.result
+ err = taskResult.err
+ case <-time.After(a.timeout):
+ result = 0
+ err = errors.New("Timed out")
+ }
+
+ // If error occured, we leave without running the next tasks
+ if err != nil {
+ return TaskResult{result, err}
+ }
+ }
+
+ return TaskResult{result, nil}
+}
+
+func (a asyncTask) ExecuteAndReduce(number int) TaskResult {
var wg sync.WaitGroup
var syncMtx sync.Mutex
var errorMtx sync.Mutex
var result = 0
var err error
var results = []int{}
- var finished = false
-
for i := 0; i < len(a.tasks); i++ {
- // Add every task to the wait group, so we can finish only when there are no running tasks
wg.Add(1)
+ // Add every task to the wait group, so we can finish only when there are no running tasks
go func(index int) {
currentResult, currentErr := a.tasks[index].Execute(number)
- if a.reduceFunc != nil {
- if currentErr != nil {
- errorMtx.Lock()
- err = currentErr
- errorMtx.Unlock()
- wg.Done()
- return
- }
+ if currentErr != nil {
+ errorMtx.Lock()
+ err = currentErr
+ errorMtx.Unlock()
+ wg.Done()
+ return
+ }
- // Add the result to the current list of results of all tasks
- syncMtx.Lock()
- results = append(results, currentResult)
- if len(results) == len(a.tasks) { // Then it's the last task
- result = a.reduceFunc(results)
- err = nil
- }
- syncMtx.Unlock()
-
- } else {
- syncMtx.Lock()
- // We only want the result from the fastest task
- if !finished {
- result = currentResult
- err = currentErr
- finished = true
- }
- syncMtx.Unlock()
-
+ // Add the result to the current list of results of all tasks
+ syncMtx.Lock()
+ results = append(results, currentResult)
+ if len(results) == len(a.tasks) { // Then it's the last task
+ result = a.reduceFunc(results)
+ err = nil
}
+ syncMtx.Unlock()
wg.Done()
}(i)
}
wg.Wait()
return TaskResult{result, err}
}
+func (a asyncTask) ExecuteFastest(number int) TaskResult {
+ var wg sync.WaitGroup
+ var syncMtx sync.Mutex
+
+ var result = 0
+ var err error
+
+ var finished = false
+
+ wg.Add(1)
+ for i := 0; i < len(a.tasks); i++ {
+ // Add every task to the wait group, so we can finish only when there are no running tasks
+ go func(index int) {
+ currentResult, currentErr := a.tasks[index].Execute(number)
+ syncMtx.Lock()
+ // We only want the result from the fastest task
+ if !finished {
+ result = currentResult
+ err = currentErr
+ finished = true
+ wg.Done()
+ }
+ syncMtx.Unlock()
+ }(i)
+ }
+
+ wg.Wait()
+ return TaskResult{result, err}
+}
+
func fetchResult(task Task, parameter int) <-chan TaskResult {
ch := make(chan TaskResult)
go func() {
- if response, err := task.Execute(parameter); err == nil {
- ch <- TaskResult{response, err}
- }
+ response, err := task.Execute(parameter)
+ ch <- TaskResult{response, err}
}()
return ch
}
func (a asyncTask) StartTaskExecutorChannel(number int) TaskResult {
var maxTaskResult = 0
var errorsOccured = 0
var wg sync.WaitGroup
var maxResultMtx sync.Mutex
var errorMtx sync.Mutex
wg.Add(1)
go func() {
for taskForExecuting := range a.taskChannel {
// Add every task to the wait group, so we can finish only when there are no running tasks
wg.Add(1)
go func(task Task) {
currentResult, currentErr := task.Execute(number)
if currentErr != nil {
errorMtx.Lock()
errorsOccured++
errorMtx.Unlock()
}
maxResultMtx.Lock()
if currentErr == nil && currentResult > maxTaskResult {
maxTaskResult = currentResult
}
maxResultMtx.Unlock()
wg.Done()
}(taskForExecuting)
}
wg.Done()
}()
wg.Wait()
if errorsOccured > a.errorLimit {
return TaskResult{0, errors.New("Max number of errors occured!")}
}
return TaskResult{maxTaskResult, nil}
}
func Pipeline(tasks ...Task) Task {
- newTask := CreateAsyncTask(tasks, false, 0, nil, 0, nil)
- return newTask
+ return CreateAsyncTask(tasks, false, 0, nil, 0, nil)
}
func Fastest(tasks ...Task) Task {
- newTask := CreateAsyncTask(tasks, true, 0, nil, 0, nil)
- return newTask
+ return CreateAsyncTask(tasks, true, 0, nil, 0, nil)
}
func Timed(task Task, timeout time.Duration) Task {
- newTask := CreateAsyncTask([]Task{task}, false, timeout, nil, 0, nil)
- return newTask
+ return CreateAsyncTask([]Task{task}, false, timeout, nil, 0, nil)
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
- newTask := CreateAsyncTask(tasks, true, 0, reduce, 0, nil)
- return newTask
+ return CreateAsyncTask(tasks, true, 0, reduce, 0, nil)
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
- newTask := CreateAsyncTask(nil, false, 0, nil, errorLimit, tasks)
- return newTask
+ return CreateAsyncTask(nil, false, 0, nil, errorLimit, tasks)
}