Решение на Concurrent Retry Executor от Александър Ангелов

Обратно към всички решения

Към профила на Александър Ангелов

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 9 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
resultChannel := make(chan struct {
index int
result string
}, len(tasks)*retryLimit) // we need so much in order to not block on resultChannel
// The upper size of the buffer should be adjusted based on the usage of the function
// right now it is very conservative size which will not block the executors but potentially
// can waste much memory however without real data and usage case this size will suffice
type fillChannelType struct {
index int
function func() string
}
// channel for syncing the executors
syncChannel := make(chan struct{})
// channel with task which the executors will take from
taskChannel := make(chan fillChannelType, concurrentLimit) // we need only concurrentLimit buffer because we have only concurrentLimit executors which will take tasks
// Fill the task channel
go func() {
for i, f := range tasks {
taskChannel <- fillChannelType{i, f}
}
close(taskChannel)
}()
// start concurrentLimit executors
for i := 0; i < concurrentLimit; i++ {
go func() {
for task := range taskChannel {
for count := 0; count < retryLimit; count++ {
result := task.function()
resultChannel <- struct {
index int
result string
}{task.index, result}
// if we have correct result break out of the retry loop and go to next task
if result != "" {
break
}
}
}
// we have finished all of the task we need to sync with other executor so we can close the resultChannel
syncChannel <- struct{}{}
}()
}
// start the sync goroutine which will wait for all of the executors to finish and will close the resultChannel
go func() {
for i := 0; i < concurrentLimit; i++ {
<-syncChannel
}
close(syncChannel)
close(resultChannel)
}()
return resultChannel
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.002s
PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.024s
PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.023s
PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.003s
PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.003s
PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.059s
PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.375s
PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.130s
PASS
ok  	_/tmp/d20161115-21147-17wtpoe	0.203s

История (1 версия и 0 коментара)

Александър обнови решението на 13.11.2016 17:40 (преди над 1 година)

+package main
+
+func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
+ index int
+ result string
+} {
+ resultChannel := make(chan struct {
+ index int
+ result string
+ }, len(tasks)*retryLimit) // we need so much in order to not block on resultChannel
+ // The upper size of the buffer should be adjusted based on the usage of the function
+ // right now it is very conservative size which will not block the executors but potentially
+ // can waste much memory however without real data and usage case this size will suffice
+
+ type fillChannelType struct {
+ index int
+ function func() string
+ }
+
+ // channel for syncing the executors
+ syncChannel := make(chan struct{})
+ // channel with task which the executors will take from
+ taskChannel := make(chan fillChannelType, concurrentLimit) // we need only concurrentLimit buffer because we have only concurrentLimit executors which will take tasks
+
+ // Fill the task channel
+ go func() {
+ for i, f := range tasks {
+ taskChannel <- fillChannelType{i, f}
+ }
+ close(taskChannel)
+ }()
+
+ // start concurrentLimit executors
+ for i := 0; i < concurrentLimit; i++ {
+ go func() {
+ for task := range taskChannel {
+ for count := 0; count < retryLimit; count++ {
+ result := task.function()
+ resultChannel <- struct {
+ index int
+ result string
+ }{task.index, result}
+ // if we have correct result break out of the retry loop and go to next task
+ if result != "" {
+ break
+ }
+ }
+ }
+
+ // we have finished all of the task we need to sync with other executor so we can close the resultChannel
+ syncChannel <- struct{}{}
+ }()
+ }
+ // start the sync goroutine which will wait for all of the executors to finish and will close the resultChannel
+ go func() {
+ for i := 0; i < concurrentLimit; i++ {
+ <-syncChannel
+ }
+
+ close(syncChannel)
+ close(resultChannel)
+ }()
+
+ return resultChannel
+}