Решение на Concurrent Retry Executor от Георги Костадинов

Обратно към всички решения

Към профила на Георги Костадинов

Резултати

  • 2 точки от тестове
  • 0 бонус точки
  • 2 точки общо
  • 2 успешни тест(а)
  • 7 неуспешни тест(а)

Код

package main
import "sync"
type Job struct {
index int
task func() string
}
type JobResult struct {
index int
result string
}
func worker(jobs <-chan Job, output chan<- JobResult, quit <-chan bool, retryLimit int, wgJobs *sync.WaitGroup, wgWorkers *sync.WaitGroup) {
defer wgWorkers.Done()
for {
select {
case job, ok := <-jobs:
if !ok {
return
}
id := job.index
for retry := 0; retry < retryLimit; retry++ {
taskResult := job.task()
output <- JobResult{id, taskResult}
if taskResult != "" {
break
}
}
wgJobs.Done()
case <-quit:
return
}
}
}
func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan JobResult {
var wgWorkers sync.WaitGroup
var wgJobs sync.WaitGroup
tasksCount := len(tasks)
jobs := make(chan Job, 128)
output := make(chan JobResult)
quit := make(chan bool)
go func() {
wgWorkers.Wait()
close(jobs)
}()
go func() {
wgJobs.Wait()
close(output)
}()
for i := 0; i < tasksCount; i++ {
wgJobs.Add(1)
jobs <- Job{i, tasks[i]}
}
for i := 0; i < concurrentLimit; i++ {
wgWorkers.Add(1)
go worker(jobs, output, quit, retryLimit, &wgJobs, &wgWorkers)
}
return output
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161115-21147-kaoy06	0.003s
--- FAIL: TestBasic1Concurrency (0.00s)
panic: send on closed channel [recovered]
	panic: send on closed channel

goroutine 20 [running]:
panic(0x4f7180, 0xc42005e4c0)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.tRunner.func1(0xc420082180)
	/usr/local/go/src/testing/testing.go:579 +0x25d
panic(0x4f7180, 0xc42005e4c0)
	/usr/local/go/src/runtime/panic.go:458 +0x243
_/tmp/d20161115-21147-kaoy06.ConcurrentRetryExecutor(0xc4200a0000, 0x400, 0x400, 0x1, 0x5, 0x400)
	/tmp/d20161115-21147-kaoy06/solution.go:64 +0x1c8
_/tmp/d20161115-21147-kaoy06.testBasic(0xc420082180, 0x400, 0x1, 0x5, 0x5256c0)
	/tmp/d20161115-21147-kaoy06/solution_test.go:53 +0xcb
_/tmp/d20161115-21147-kaoy06.TestBasic1Concurrency(0xc420082180)
	/tmp/d20161115-21147-kaoy06/solution_test.go:118 +0x52
testing.tRunner(0xc420082180, 0x5256c8)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161115-21147-kaoy06	0.006s
--- FAIL: TestBasic10Concurrency (0.00s)
panic: send on closed channel [recovered]
	panic: send on closed channel

goroutine 6 [running]:
panic(0x4f7180, 0xc420010600)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.tRunner.func1(0xc420070180)
	/usr/local/go/src/testing/testing.go:579 +0x25d
panic(0x4f7180, 0xc420010600)
	/usr/local/go/src/runtime/panic.go:458 +0x243
_/tmp/d20161115-21147-kaoy06.ConcurrentRetryExecutor(0xc42008e000, 0x800, 0x800, 0xa, 0x5, 0x800)
	/tmp/d20161115-21147-kaoy06/solution.go:64 +0x1c8
_/tmp/d20161115-21147-kaoy06.testBasic(0xc420070180, 0x800, 0xa, 0x5, 0x5256b0)
	/tmp/d20161115-21147-kaoy06/solution_test.go:53 +0xcb
_/tmp/d20161115-21147-kaoy06.TestBasic10Concurrency(0xc420070180)
	/tmp/d20161115-21147-kaoy06/solution_test.go:127 +0x52
testing.tRunner(0xc420070180, 0x5256b8)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161115-21147-kaoy06	0.006s
--- FAIL: TestBasicFail1Concurrency (0.00s)
panic: send on closed channel [recovered]
	panic: send on closed channel

goroutine 20 [running]:
panic(0x4f7180, 0xc42005e4e0)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.tRunner.func1(0xc420082180)
	/usr/local/go/src/testing/testing.go:579 +0x25d
panic(0x4f7180, 0xc42005e4e0)
	/usr/local/go/src/runtime/panic.go:458 +0x243
_/tmp/d20161115-21147-kaoy06.ConcurrentRetryExecutor(0xc4200861e0, 0x14, 0x14, 0x1, 0x5, 0x14)
	/tmp/d20161115-21147-kaoy06/solution.go:64 +0x1c8
_/tmp/d20161115-21147-kaoy06.testBasic(0xc420082180, 0x14, 0x1, 0x5, 0xc4200a7ef8)
	/tmp/d20161115-21147-kaoy06/solution_test.go:53 +0xcb
_/tmp/d20161115-21147-kaoy06.testBasicFail(0xc420082180, 0x14, 0x1, 0x5)
	/tmp/d20161115-21147-kaoy06/solution_test.go:164 +0xfb
_/tmp/d20161115-21147-kaoy06.TestBasicFail1Concurrency(0xc420082180)
	/tmp/d20161115-21147-kaoy06/solution_test.go:131 +0x46
testing.tRunner(0xc420082180, 0x5256e0)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161115-21147-kaoy06	0.005s
--- FAIL: TestBasicFail10Concurrency (0.00s)
panic: send on closed channel [recovered]
	panic: send on closed channel

goroutine 6 [running]:
panic(0x4f7180, 0xc420010620)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.tRunner.func1(0xc420072180)
	/usr/local/go/src/testing/testing.go:579 +0x25d
panic(0x4f7180, 0xc420010620)
	/usr/local/go/src/runtime/panic.go:458 +0x243
_/tmp/d20161115-21147-kaoy06.ConcurrentRetryExecutor(0xc4200741e0, 0x14, 0x14, 0xa, 0x5, 0x14)
	/tmp/d20161115-21147-kaoy06/solution.go:64 +0x1c8
_/tmp/d20161115-21147-kaoy06.testBasic(0xc420072180, 0x14, 0xa, 0x5, 0xc42009bef8)
	/tmp/d20161115-21147-kaoy06/solution_test.go:53 +0xcb
_/tmp/d20161115-21147-kaoy06.testBasicFail(0xc420072180, 0x14, 0xa, 0x5)
	/tmp/d20161115-21147-kaoy06/solution_test.go:164 +0xfb
_/tmp/d20161115-21147-kaoy06.TestBasicFail10Concurrency(0xc420072180)
	/tmp/d20161115-21147-kaoy06/solution_test.go:135 +0x46
testing.tRunner(0xc420072180, 0x5256d8)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161115-21147-kaoy06	0.005s
--- FAIL: TestBasicFail1024Concurrency (0.00s)
panic: send on closed channel [recovered]
	panic: send on closed channel

goroutine 6 [running]:
panic(0x4f7180, 0xc420010630)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.tRunner.func1(0xc420072180)
	/usr/local/go/src/testing/testing.go:579 +0x25d
panic(0x4f7180, 0xc420010630)
	/usr/local/go/src/runtime/panic.go:458 +0x243
_/tmp/d20161115-21147-kaoy06.ConcurrentRetryExecutor(0xc42009a000, 0x1400, 0x1400, 0x400, 0x3, 0x1400)
	/tmp/d20161115-21147-kaoy06/solution.go:64 +0x1c8
_/tmp/d20161115-21147-kaoy06.testBasic(0xc420072180, 0x1400, 0x400, 0x3, 0xc4201d7ef8)
	/tmp/d20161115-21147-kaoy06/solution_test.go:53 +0xcb
_/tmp/d20161115-21147-kaoy06.testBasicFail(0xc420072180, 0x1400, 0x400, 0x3)
	/tmp/d20161115-21147-kaoy06/solution_test.go:164 +0xfb
_/tmp/d20161115-21147-kaoy06.TestBasicFail1024Concurrency(0xc420072180)
	/tmp/d20161115-21147-kaoy06/solution_test.go:139 +0x46
testing.tRunner(0xc420072180, 0x5256d0)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161115-21147-kaoy06	0.008s
--- FAIL: TestPerformance (0.00s)
panic: send on closed channel [recovered]
	panic: send on closed channel

goroutine 6 [running]:
panic(0x4f7180, 0xc420010670)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.tRunner.func1(0xc420082180)
	/usr/local/go/src/testing/testing.go:579 +0x25d
panic(0x4f7180, 0xc420010670)
	/usr/local/go/src/runtime/panic.go:458 +0x243
_/tmp/d20161115-21147-kaoy06.ConcurrentRetryExecutor(0xc42009c000, 0x2800, 0x2800, 0x400, 0x32, 0x2800)
	/tmp/d20161115-21147-kaoy06/solution.go:64 +0x1c8
_/tmp/d20161115-21147-kaoy06.TestPerformance(0xc420082180)
	/tmp/d20161115-21147-kaoy06/solution_test.go:195 +0x18a
testing.tRunner(0xc420082180, 0x525738)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161115-21147-kaoy06	0.005s
panic: send on closed channel

goroutine 8 [running]:
panic(0x4f7180, 0xc420092040)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
_/tmp/d20161115-21147-kaoy06.ConcurrentRetryExecutor(0xc42008c000, 0x400, 0x400, 0xa, 0x5, 0x0)
	/tmp/d20161115-21147-kaoy06/solution.go:64 +0x1c8
_/tmp/d20161115-21147-kaoy06.TestConcurrentCalls.func2(0xc420054420, 0xf, 0xc42000e3e0, 0xa, 0x5, 0xc420072180)
	/tmp/d20161115-21147-kaoy06/solution_test.go:230 +0xc6
created by _/tmp/d20161115-21147-kaoy06.TestConcurrentCalls
	/tmp/d20161115-21147-kaoy06/solution_test.go:238 +0x193
exit status 2
FAIL	_/tmp/d20161115-21147-kaoy06	0.006s
PASS
ok  	_/tmp/d20161115-21147-kaoy06	0.203s

История (1 версия и 0 коментара)

Георги обнови решението на 15.11.2016 17:25 (преди над 1 година)

+package main
+
+import "sync"
+
+type Job struct {
+ index int
+ task func() string
+}
+
+type JobResult struct {
+ index int
+ result string
+}
+
+func worker(jobs <-chan Job, output chan<- JobResult, quit <-chan bool, retryLimit int, wgJobs *sync.WaitGroup, wgWorkers *sync.WaitGroup) {
+ defer wgWorkers.Done()
+ for {
+ select {
+ case job, ok := <-jobs:
+ if !ok {
+ return
+ }
+
+ id := job.index
+ for retry := 0; retry < retryLimit; retry++ {
+ taskResult := job.task()
+
+ output <- JobResult{id, taskResult}
+
+ if taskResult != "" {
+ break
+ }
+ }
+
+ wgJobs.Done()
+ case <-quit:
+ return
+ }
+ }
+}
+
+func ConcurrentRetryExecutor(tasks []func() string, concurrentLimit int, retryLimit int) <-chan JobResult {
+ var wgWorkers sync.WaitGroup
+ var wgJobs sync.WaitGroup
+
+ tasksCount := len(tasks)
+
+ jobs := make(chan Job, 128)
+ output := make(chan JobResult)
+ quit := make(chan bool)
+
+ go func() {
+ wgWorkers.Wait()
+ close(jobs)
+ }()
+
+ go func() {
+ wgJobs.Wait()
+ close(output)
+ }()
+
+ for i := 0; i < tasksCount; i++ {
+ wgJobs.Add(1)
+ jobs <- Job{i, tasks[i]}
+ }
+
+ for i := 0; i < concurrentLimit; i++ {
+ wgWorkers.Add(1)
+ go worker(jobs, output, quit, retryLimit, &wgJobs, &wgWorkers)
+ }
+
+ return output
+}