Решение на Concurrent Retry Executor от Анатоли Бързев

Обратно към всички решения

Към профила на Анатоли Бързев

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 9 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
totalTasks := len(tasks)
if concurrentLimit > totalTasks || concurrentLimit <= 0 {
concurrentLimit = totalTasks
}
if retryLimit < 0 {
retryLimit = 0
}
//the results will be sent here
resultChan := make(chan struct {
index int
result string
}, totalTasks*retryLimit+1)
//no tasks to exec
if concurrentLimit == 0 {
resultChan <- struct {
index int
result string
}{0, "No task to exec"}
close(resultChan)
return resultChan
}
//used to implement concurrent limit
throttlingChan := make(chan bool, concurrentLimit)
for i := 0; i < concurrentLimit; i++ {
throttlingChan <- true
}
//used to wait all routines to finish before exit
doneChan := make(chan bool, totalTasks)
//task executor
go func() {
for idx, task := range tasks {
<-throttlingChan
//task
go func(idx int, task func() string) {
taskResult := ""
for i := 0; i < retryLimit; i++ {
//execute
taskResult = task()
//send the response
resultChan <- struct {
index int
result string
}{idx, taskResult}
//finish if successfull task
if taskResult != "" {
break
}
}
throttlingChan <- true
doneChan <- true
}(idx, task)
}
//wait for the last routines to end
for i := 0; i < totalTasks; i++ {
<-doneChan
}
//cleanup
close(throttlingChan)
close(resultChan)
close(doneChan)
}()
return resultChan
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.003s
PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.026s
PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.026s
PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.003s
PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.003s
PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.072s
PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.341s
PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.361s
PASS
ok  	_/tmp/d20161115-21147-1j4fci8	0.203s

История (3 версии и 3 коментара)

Анатоли обнови решението на 14.11.2016 10:54 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+)
+
+func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
+ index int
+ result string
+} {
+
+ totalTasks := len(tasks)
+
+ if concurrentLimit > totalTasks || concurrentLimit <= 0 {
+ concurrentLimit = totalTasks
+ }
+
+ if retryLimit < 0 {
+ retryLimit = 0
+ }
+
+ //the results will be sent here
+ resultChan := make(chan struct {
+ index int
+ result string
+ }, totalTasks*retryLimit+1)
+
+ //no tasks to exec
+ if concurrentLimit == 0 {
+ resultChan <- struct {
+ index int
+ result string
+ }{0, "No task to exec"}
+
+ close(resultChan)
+ return resultChan
+ }
+
+ //used to implement concurrent limit
+ throttlingChan := make(chan bool, concurrentLimit)
+
+ for i := 0; i < concurrentLimit; i++ {
+ throttlingChan <- true
+ }
+
+ //used to wait all routines to finish before exit
+ doneChan := make(chan bool, totalTasks)
+
+ //task executor
+ go func() {
+ for idx, task := range tasks {
+ <-throttlingChan
+
+ //task
+ go func(idx int, task func() string) {
+ taskResult := ""
+
+ for i := 0; i <= retryLimit; i++ {
+ //execute
+ taskResult = task()
+
+ //send the response
+ resultChan <- struct {
+ index int
+ result string
+ }{idx, taskResult}
+
+ //finish if successfull task
+ if taskResult != "" {
+ break
+ }
+ }
+
+ throttlingChan <- true
+ doneChan <- true
+ }(idx, task)
+ }
+
+ //wait for the last routines to end
+ for i := 0; i < totalTasks; i++ {
+ <-doneChan
+ }
+
+ //cleanup
+ close(throttlingChan)
+ close(resultChan)
+ close(doneChan)
+ }()
+
+ return resultChan
+}
+
+ results := ConcurrentRetryExecutor([]func() string{}, 2, 3)
+
+ for result := range results {
+ if result.result == "" {
+ fmt.Printf("Task %d returned an error!\n", result.index+1)
+ } else {
+ fmt.Printf("Task %d successfully returned '%s'\n", result.index+1, result.result)
+ }
+ }
+
+ fmt.Println("All done!")
+}

Анатоли обнови решението на 14.11.2016 11:07 (преди над 1 година)

package main
-import (
- "fmt"
-)
-
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
totalTasks := len(tasks)
if concurrentLimit > totalTasks || concurrentLimit <= 0 {
concurrentLimit = totalTasks
}
if retryLimit < 0 {
retryLimit = 0
}
//the results will be sent here
resultChan := make(chan struct {
index int
result string
}, totalTasks*retryLimit+1)
//no tasks to exec
if concurrentLimit == 0 {
resultChan <- struct {
index int
result string
}{0, "No task to exec"}
close(resultChan)
return resultChan
}
//used to implement concurrent limit
throttlingChan := make(chan bool, concurrentLimit)
for i := 0; i < concurrentLimit; i++ {
throttlingChan <- true
}
//used to wait all routines to finish before exit
doneChan := make(chan bool, totalTasks)
//task executor
go func() {
for idx, task := range tasks {
<-throttlingChan
//task
go func(idx int, task func() string) {
taskResult := ""
for i := 0; i <= retryLimit; i++ {
//execute
taskResult = task()
//send the response
resultChan <- struct {
index int
result string
}{idx, taskResult}
//finish if successfull task
if taskResult != "" {
break
}
}
throttlingChan <- true
doneChan <- true
}(idx, task)
}
//wait for the last routines to end
for i := 0; i < totalTasks; i++ {
<-doneChan
}
//cleanup
close(throttlingChan)
close(resultChan)
close(doneChan)
}()
return resultChan
-}
-
-func main() {
- results := ConcurrentRetryExecutor([]func() string{}, 2, 3)
-
- for result := range results {
- if result.result == "" {
- fmt.Printf("Task %d returned an error!\n", result.index+1)
- } else {
- fmt.Printf("Task %d successfully returned '%s'\n", result.index+1, result.result)
- }
- }
-
- fmt.Println("All done!")
}

Няма нужда от всичките тези проверки на входните аргументи и не указани връщащи резултати.

И по важно retryLimit е колко пъти да бъде опитано не колко пъти да бъде опитано след като фейлне веднъж.

Анатоли обнови решението на 14.11.2016 11:52 (преди над 1 година)

package main
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
totalTasks := len(tasks)
if concurrentLimit > totalTasks || concurrentLimit <= 0 {
concurrentLimit = totalTasks
}
if retryLimit < 0 {
retryLimit = 0
}
//the results will be sent here
resultChan := make(chan struct {
index int
result string
}, totalTasks*retryLimit+1)
//no tasks to exec
if concurrentLimit == 0 {
resultChan <- struct {
index int
result string
}{0, "No task to exec"}
close(resultChan)
return resultChan
}
//used to implement concurrent limit
throttlingChan := make(chan bool, concurrentLimit)
for i := 0; i < concurrentLimit; i++ {
throttlingChan <- true
}
//used to wait all routines to finish before exit
doneChan := make(chan bool, totalTasks)
//task executor
go func() {
for idx, task := range tasks {
<-throttlingChan
//task
go func(idx int, task func() string) {
taskResult := ""
- for i := 0; i <= retryLimit; i++ {
+ for i := 0; i < retryLimit; i++ {
//execute
taskResult = task()
//send the response
resultChan <- struct {
index int
result string
}{idx, taskResult}
//finish if successfull task
if taskResult != "" {
break
}
}
throttlingChan <- true
doneChan <- true
}(idx, task)
}
//wait for the last routines to end
for i := 0; i < totalTasks; i++ {
<-doneChan
}
//cleanup
close(throttlingChan)
close(resultChan)
close(doneChan)
}()
return resultChan
}