Решение на Concurrent Retry Executor от Диан Тодоров

Обратно към всички решения

Към профила на Диан Тодоров

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 9 успешни тест(а)
  • 0 неуспешни тест(а)

Код

package main
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
res := make(chan struct {
index int
result string
})
var done = make(chan struct{}, len(tasks))
go func() {
for i := 0; i < len(tasks); i++ {
<-done
}
close(res)
}()
var sem = make(chan struct{}, concurrentLimit)
go func() {
for i, t := range tasks {
sem <- struct{}{}
go func(task func() string, index int) {
taskRes := task()
res <- struct {
index int
result string
}{index: index, result: taskRes}
var tries = 1
for taskRes == "" && tries < retryLimit {
taskRes = task()
res <- struct {
index int
result string
}{index: index, result: taskRes}
tries = tries + 1
}
<-sem
done <- struct{}{}
}(t, i)
}
}()
return res
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.003s
PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.037s
PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.024s
PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.004s
PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.004s
PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.073s
PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.370s
PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.655s
PASS
ok  	_/tmp/d20161115-21147-1z9tvr	0.203s

История (6 версии и 13 коментара)

Диан обнови решението на 13.11.2016 13:27 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+// ConcurrentRetryExecutor concurrently and sequentially executes a list of tasks
+// and returns the resulst of their execution as a channel.
+func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
+ index int
+ result string
+} {
+ res := make(chan struct {
+ index int
+ result string
+ })
+ var sem = make(chan struct{}, concurrentLimit)
+ var done = make(chan struct{}, len(tasks))
+
+ go func(d <-chan struct{}, r chan<- struct {
+ index int
+ result string
+ }) {
+ for i := 0; i < len(tasks); i++ {
+ <-d
+ }
+ close(r)
+ }(done, res)
+
+ for i, t := range tasks {
+
+ go func(task func() string, index int, d chan struct{}) {
+ sem <- struct{}{}
+ var tries int
+ for taskRes := task(); tries == 0 || taskRes == "" && tries < retryLimit; taskRes = task() {
+ res <- struct {
+ index int
+ result string
+ }{index: index, result: taskRes}
+ tries = tries + 1
+ }
+ d <- struct{}{}
+ <-sem
+ }(t, i, done)
+ }
+
+ return res
+}
+
+func main() {
+ first := func() string {
+ time.Sleep(2 * time.Second)
+ return "first"
+ }
+ second := func() string {
+ time.Sleep(1 * time.Second)
+ return "second"
+ }
+ third := func() string {
+ time.Sleep(600 * time.Millisecond)
+ return "" // always a failure :(
+ }
+ fourth := func() string {
+ time.Sleep(700 * time.Millisecond)
+ return "am I last?"
+ }
+
+ fmt.Println("Starting concurrent executor!")
+ tasks := []func() string{first, second, third, fourth}
+ results := ConcurrentRetryExecutor(tasks, 2, 3)
+ for result := range results {
+ fmt.Println("Waiting")
+ if result.result == "" {
+ fmt.Printf("Task %d returned an error!\n", result.index+1)
+ } else {
+ fmt.Printf("Task %d successfully returned '%s'\n", result.index+1, result.result)
+ }
+ }
+ fmt.Println("All done!")
+ time.Sleep(time.Duration(1000 * time.Second))
+}

Диан обнови решението на 13.11.2016 13:28 (преди над 1 година)

package main
-import (
- "fmt"
- "time"
-)
-
// ConcurrentRetryExecutor concurrently and sequentially executes a list of tasks
// and returns the resulst of their execution as a channel.
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
res := make(chan struct {
index int
result string
})
var sem = make(chan struct{}, concurrentLimit)
var done = make(chan struct{}, len(tasks))
go func(d <-chan struct{}, r chan<- struct {
index int
result string
}) {
for i := 0; i < len(tasks); i++ {
<-d
}
close(r)
}(done, res)
for i, t := range tasks {
go func(task func() string, index int, d chan struct{}) {
sem <- struct{}{}
var tries int
for taskRes := task(); tries == 0 || taskRes == "" && tries < retryLimit; taskRes = task() {
res <- struct {
index int
result string
}{index: index, result: taskRes}
tries = tries + 1
}
d <- struct{}{}
<-sem
}(t, i, done)
}
return res
}
func main() {
- first := func() string {
- time.Sleep(2 * time.Second)
- return "first"
- }
- second := func() string {
- time.Sleep(1 * time.Second)
- return "second"
- }
- third := func() string {
- time.Sleep(600 * time.Millisecond)
- return "" // always a failure :(
- }
- fourth := func() string {
- time.Sleep(700 * time.Millisecond)
- return "am I last?"
- }
- fmt.Println("Starting concurrent executor!")
- tasks := []func() string{first, second, third, fourth}
- results := ConcurrentRetryExecutor(tasks, 2, 3)
- for result := range results {
- fmt.Println("Waiting")
- if result.result == "" {
- fmt.Printf("Task %d returned an error!\n", result.index+1)
- } else {
- fmt.Printf("Task %d successfully returned '%s'\n", result.index+1, result.result)
- }
- }
- fmt.Println("All done!")
- time.Sleep(time.Duration(1000 * time.Second))
}

Диан обнови решението на 13.11.2016 13:29 (преди над 1 година)

package main
// ConcurrentRetryExecutor concurrently and sequentially executes a list of tasks
// and returns the resulst of their execution as a channel.
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
res := make(chan struct {
index int
result string
})
var sem = make(chan struct{}, concurrentLimit)
var done = make(chan struct{}, len(tasks))
- go func(d <-chan struct{}, r chan<- struct {
+ go func(d <-chan struct{}, r chan struct {
index int
result string
}) {
for i := 0; i < len(tasks); i++ {
<-d
}
close(r)
}(done, res)
for i, t := range tasks {
go func(task func() string, index int, d chan struct{}) {
sem <- struct{}{}
var tries int
for taskRes := task(); tries == 0 || taskRes == "" && tries < retryLimit; taskRes = task() {
res <- struct {
index int
result string
}{index: index, result: taskRes}
tries = tries + 1
}
d <- struct{}{}
<-sem
}(t, i, done)
}
return res
}
}

Вчера до уточних в условието:

Функцията трябва да връща канала без да изчаква извършването на задачите - за това имаме канала.

Ти пускаш толкова горотини колкото има задачи и се надяваш че ще се изпълнят в последователността която си ги пуснал - това не е задължително така и би довело до не последователно изпълняване на задачите.

Диан обнови решението на 13.11.2016 14:40 (преди над 1 година)

package main
// ConcurrentRetryExecutor concurrently and sequentially executes a list of tasks
// and returns the resulst of their execution as a channel.
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
res := make(chan struct {
index int
result string
})
var sem = make(chan struct{}, concurrentLimit)
var done = make(chan struct{}, len(tasks))
go func(d <-chan struct{}, r chan struct {
index int
result string
}) {
for i := 0; i < len(tasks); i++ {
<-d
}
close(r)
}(done, res)
for i, t := range tasks {
-
go func(task func() string, index int, d chan struct{}) {
sem <- struct{}{}
var tries int
for taskRes := task(); tries == 0 || taskRes == "" && tries < retryLimit; taskRes = task() {
res <- struct {
index int
result string
}{index: index, result: taskRes}
tries = tries + 1
}
d <- struct{}{}
<-sem
}(t, i, done)
}
return res
-}
-
-func main() {
-
}

Не разбирам, дори не знам какво не разбирам.

Въпроса ми е какво значи

Напишете функция, която конкурентно и последователно изпълнява списък от подадени ѝ задачи и връща техните резултати като канал:

Как едновременно последователно(sequentially, one after another) и едноременно конкурентно?

последователно в случая означава повече че ако имаш 10 задачи и конкурентност 2. Трябва да пуснеш първа и втора да се изпълняват да изчакаш да завърши поне една от тях и да пуснеш трета. Когато завърши една от двете четвърта и т.н. Възможно е едната от първите две задачи да завърши последна от десетте, но трябва да бъде пусната като една от първите две.

Предполагайки че не разбираш цитираната част от условието: има се предвид че ConcurrentRetryExecutor се очаква да върне канала без да трябва първо да е изпълнена която и да е от задачите. Точно в твоя случай това реално се случва и аз навярно по инерция съм го цитирал и при теб, защото на пръв поглед ми е изглеждало че ще блокираш - което към момента но мога да видя къде би било :( Извинявай

Добре прочитайки коментара и вуловието за пореден път отсях следните три условия

  • Условие 1: да върна канала дирекнто:

    • За да го направя, си правя отделна горотина, в която да слушам, че всички задачи са приключили, и в която да затворя канала, така не блокирам.
  • Условие 2: в даден момент от времето да се изпълняват concurrentLimit борй задачи

    • За да го направя, пускам всяка от задачите в отделна горотина, но си имплементирам и семафор (буфериран канал със размер concurrentLimit), който ми гарантира, че в даден момент от времето ще имам фиксирания concurrentLimit брой конкурентно изпълняващи се горотини. (това го видях в effective go, примера за семафори)
  • Условие 3: Да се изпълнват последователно

    • Тази част само не изпълнявам май. Трябва да гарантирам ред на изпълнение на задачите. Тоест да направя така, че ако concurrentLimit=2, тогава първо със сигурност да се изпълнят Task1 и Таsk2 и след това като една от тях приключи със сигурност да се изпълни Task3 и след това Task4 и така нататък? Така ли?

Благодаря, че се занимаваш да ми обясняваш

Диан обнови решението на 14.11.2016 22:03 (преди над 1 година)

package main
-// ConcurrentRetryExecutor concurrently and sequentially executes a list of tasks
-// and returns the resulst of their execution as a channel.
+import (
+ "fmt"
+ "time"
+)
+
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
res := make(chan struct {
index int
result string
})
- var sem = make(chan struct{}, concurrentLimit)
- var done = make(chan struct{}, len(tasks))
- go func(d <-chan struct{}, r chan struct {
- index int
- result string
- }) {
+ var done = make(chan struct{}, len(tasks))
+ go func() {
for i := 0; i < len(tasks); i++ {
- <-d
+ <-done
}
- close(r)
- }(done, res)
+ close(res)
+ }()
- for i, t := range tasks {
- go func(task func() string, index int, d chan struct{}) {
+ var sem = make(chan struct{}, concurrentLimit)
+ go func() {
+ for i, t := range tasks {
sem <- struct{}{}
- var tries int
- for taskRes := task(); tries == 0 || taskRes == "" && tries < retryLimit; taskRes = task() {
+ go func(task func() string, index int) {
+
+ taskRes := task()
res <- struct {
index int
result string
}{index: index, result: taskRes}
- tries = tries + 1
- }
- d <- struct{}{}
- <-sem
- }(t, i, done)
- }
+ var tries = 1
+ for taskRes == "" && tries < retryLimit {
+ taskRes = task()
+ res <- struct {
+ index int
+ result string
+ }{index: index, result: taskRes}
+ tries = tries + 1
+ }
+ <-sem
+ done <- struct{}{}
+ }(t, i)
+ }
+ }()
+
return res
+}
+func main() {
+ first := func() string {
+ time.Sleep(2 * time.Second)
+ return "first"
+ }
+ second := func() string {
+ time.Sleep(1 * time.Second)
+ return "second"
+ }
+ third := func() string {
+ time.Sleep(600 * time.Millisecond)
+ return "" // always a failure :(
+ }
+ fourth := func() string {
+ time.Sleep(700 * time.Millisecond)
+ return "am I last?"
+ }
+
+ fmt.Println("Starting concurrent executor!")
+ tasks := []func() string{first, second, third, fourth}
+ results := ConcurrentRetryExecutor(tasks, 2, 3)
+ for result := range results {
+ if result.result == "" {
+ fmt.Printf("Task %d returned an error!\n", result.index+1)
+ } else {
+ fmt.Printf("Task %d successfully returned '%s'\n", result.index+1, result.result)
+ }
+ }
+
+ fmt.Println("All done!")
+ time.Sleep(10000 * time.Second)
}

Диан обнови решението на 14.11.2016 22:04 (преди над 1 година)

package main
-import (
- "fmt"
- "time"
-)
-
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan struct {
index int
result string
} {
res := make(chan struct {
index int
result string
})
var done = make(chan struct{}, len(tasks))
go func() {
for i := 0; i < len(tasks); i++ {
<-done
}
close(res)
}()
var sem = make(chan struct{}, concurrentLimit)
go func() {
for i, t := range tasks {
sem <- struct{}{}
go func(task func() string, index int) {
taskRes := task()
res <- struct {
index int
result string
}{index: index, result: taskRes}
var tries = 1
for taskRes == "" && tries < retryLimit {
taskRes = task()
res <- struct {
index int
result string
}{index: index, result: taskRes}
tries = tries + 1
}
<-sem
done <- struct{}{}
}(t, i)
}
}()
return res
-}
-func main() {
+}
- first := func() string {
- time.Sleep(2 * time.Second)
- return "first"
- }
- second := func() string {
- time.Sleep(1 * time.Second)
- return "second"
- }
- third := func() string {
- time.Sleep(600 * time.Millisecond)
- return "" // always a failure :(
- }
- fourth := func() string {
- time.Sleep(700 * time.Millisecond)
- return "am I last?"
- }
-
- fmt.Println("Starting concurrent executor!")
- tasks := []func() string{first, second, third, fourth}
- results := ConcurrentRetryExecutor(tasks, 2, 3)
- for result := range results {
- if result.result == "" {
- fmt.Printf("Task %d returned an error!\n", result.index+1)
- } else {
- fmt.Printf("Task %d successfully returned '%s'\n", result.index+1, result.result)
- }
- }
-
- fmt.Println("All done!")
- time.Sleep(10000 * time.Second)
-}