Решение на Concurrent Retry Executor от Александър Александров

Обратно към всички решения

Към профила на Александър Александров

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 9 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
// create the results channel
resultsCh := make(chan struct {
index int
result string
}, concurrentLimit)
// concurrently handle all the tasks
go handleTasks(tasks, concurrentLimit, retryLimit, resultsCh)
return resultsCh
}
func handleTasks(
tasks []func() string,
concurrentLimit int,
retryLimit int,
resultsCh chan struct {
index int
result string
}) {
// create a limited size channel for the tasks
tasksCh := make(chan func() string, concurrentLimit)
// create a channel to use for synchronisation
syncCh := make(chan struct{}, len(tasks))
for index, task := range tasks {
// add a task to the tasks channel
// execute a task from the channel concurrently
tasksCh <- task
go handleOneTask(syncCh, tasksCh, task, index, 0, retryLimit, resultsCh)
}
// depending on the size of the tasks slice
// remove from syncCh channel until all tasks
// are removed and close the channel
for i := 0; i < len(tasks); i++ {
<-syncCh
}
close(resultsCh)
}
func handleOneTask(
syncCh chan struct{},
tasksCh chan func() string,
task func() string,
taskIndex int,
retryCount int,
retryLimit int,
resultsCh chan struct {
index int
result string
}) {
// stop retrying this task
// remove it from the tasks channel
// and add a struct to the syncCh
if retryCount >= retryLimit {
<-tasksCh
syncCh <- struct{}{}
return
}
// get the task's result and send it to the channel
result := task()
resultsCh <- struct {
index int
result string
}{index: taskIndex, result: result}
// if the result is "" retry the task
if result == "" {
go handleOneTask(syncCh, tasksCh, task, taskIndex, retryCount+1, retryLimit, resultsCh)
return
}
// if the task is ok remove it from the tasks channel
<-tasksCh
syncCh <- struct{}{}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.002s
PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.026s
PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.027s
PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.003s
PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.003s
PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.077s
PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.165s
PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.265s
PASS
ok  	_/tmp/d20161115-21147-zljbsu	0.203s

История (2 версии и 4 коментара)

Александър обнови решението на 14.11.2016 00:42 (преди над 1 година)

+package main
+
+import (
+ "sync"
+)
+
+func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
+ index int
+ result string
+} {
+
+ // create the results channel
+ resultsCh := make(chan struct {
+ index int
+ result string
+ }, concurrentLimit)
+ // concurrently handle all the tasks
+ go handleTasks(tasks, concurrentLimit, retryLimit, resultsCh)
+
+ return resultsCh
+}
+
+func handleTasks(
+ tasks []func() string,
+ concurrentLimit int,
+ retryLimit int,
+ resultsCh chan struct {
+ index int
+ result string
+ }) {
+ // create a limited size channel for the tasks
+ tasksCh := make(chan func() string, concurrentLimit)
+ // create a wait group to sync the goroutines as to close
+ // the resultsCh when all tasks are finished
+ var wg sync.WaitGroup
+
+ for index, task := range tasks {
+ // add a task to the tasks channel
+ // and add a counter to the wait group
+ // execute a task from the channel concurrently
+ tasksCh <- task
+ wg.Add(1)
+ go handleOneTask(&wg, tasksCh, task, index, 0, retryLimit, resultsCh)
+ }
+
+ // wait for all tasks to add their results to the resultsCh
+ // and close the channel after that
+ wg.Wait()
+ close(resultsCh)
+}
+
+func handleOneTask(
+ wg *sync.WaitGroup,
+ tasksCh chan func() string,
+ task func() string,
+ taskIndex int,
+ retryCount int,
+ retryLimit int,
+ resultsCh chan struct {
+ index int
+ result string
+ }) {
+
+ // stop retrying this task
+ if retryCount >= retryLimit {
+ wg.Done()
+ return
+ }
+
+ // get the task's result and send it to the channel
+ result := task()
+ resultsCh <- struct {
+ index int
+ result string
+ }{index: taskIndex, result: result}
+
+ // if the result is "" retry the task
+ if result == "" {
+ wg.Add(1)
+ go handleOneTask(wg, tasksCh, task, taskIndex, retryCount+1, retryLimit, resultsCh)
+ wg.Done()
+ return
+ }
+
+ // if the task is ok remove it from the tasks channel
+ <-tasksCh
+ wg.Done()
+
+}

Благодаря за коментарите :)

Не сме преподавали waitGroup-и и не са необходими за задачата - има не сложно решение с канали

Пробвай да намалиш конкурентата бройка в тестовете на 1 и виж какво се случва. С go test -timeout 5s можеш да кажеш тестовете да timeout-ват след 5секунди вместо след 10-те минути по подразбиране.

Александър обнови решението на 14.11.2016 11:11 (преди над 1 година)

package main
-import (
- "sync"
-)
-
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
-
// create the results channel
resultsCh := make(chan struct {
index int
result string
}, concurrentLimit)
+
// concurrently handle all the tasks
go handleTasks(tasks, concurrentLimit, retryLimit, resultsCh)
-
return resultsCh
}
func handleTasks(
tasks []func() string,
concurrentLimit int,
retryLimit int,
resultsCh chan struct {
index int
result string
}) {
+
// create a limited size channel for the tasks
tasksCh := make(chan func() string, concurrentLimit)
- // create a wait group to sync the goroutines as to close
- // the resultsCh when all tasks are finished
- var wg sync.WaitGroup
+ // create a channel to use for synchronisation
+ syncCh := make(chan struct{}, len(tasks))
for index, task := range tasks {
// add a task to the tasks channel
- // and add a counter to the wait group
// execute a task from the channel concurrently
tasksCh <- task
- wg.Add(1)
- go handleOneTask(&wg, tasksCh, task, index, 0, retryLimit, resultsCh)
+ go handleOneTask(syncCh, tasksCh, task, index, 0, retryLimit, resultsCh)
}
- // wait for all tasks to add their results to the resultsCh
- // and close the channel after that
- wg.Wait()
+ // depending on the size of the tasks slice
+ // remove from syncCh channel until all tasks
+ // are removed and close the channel
+ for i := 0; i < len(tasks); i++ {
+ <-syncCh
+ }
+
close(resultsCh)
}
func handleOneTask(
- wg *sync.WaitGroup,
+ syncCh chan struct{},
tasksCh chan func() string,
task func() string,
taskIndex int,
retryCount int,
retryLimit int,
resultsCh chan struct {
index int
result string
}) {
// stop retrying this task
+ // remove it from the tasks channel
+ // and add a struct to the syncCh
if retryCount >= retryLimit {
- wg.Done()
+ <-tasksCh
+ syncCh <- struct{}{}
return
- }
+ }
// get the task's result and send it to the channel
result := task()
resultsCh <- struct {
index int
result string
}{index: taskIndex, result: result}
// if the result is "" retry the task
if result == "" {
- wg.Add(1)
- go handleOneTask(wg, tasksCh, task, taskIndex, retryCount+1, retryLimit, resultsCh)
- wg.Done()
+ go handleOneTask(syncCh, tasksCh, task, taskIndex, retryCount+1, retryLimit, resultsCh)
return
}
// if the task is ok remove it from the tasks channel
<-tasksCh
- wg.Done()
-
+ syncCh <- struct{}{}
-}
+}

Мерси за коментарите. Замених WaitGroup с канал който ползвам за синхронизация и затваряне. Не съм на моето PC, тъй че го писах на The Go Playground, тестовете уж минават, заедно с различните concurrentLimit. При concurrentLimit=1 и timeout 5s е нормално да fail-не теста.

Три канала за такава задача ми се струват много, но може би е ок за Go.

PS: Забравил съм да затворя другите два канала, тъй че може да сложа нова версия по-късно, въпреки че не вярвам да е проблем GC-то би трябвало да ги премахне.