Решение на Concurrent Tasks от Диан Тодоров

Обратно към всички решения

Към профила на Диан Тодоров

Резултати

  • 10 точки от тестове
  • 0 бонус точки
  • 10 точки общо
  • 10 успешни тест(а)
  • 3 неуспешни тест(а)

Код

package main
import (
"errors"
"sync"
"time"
)
// Task ...
type Task interface {
Execute(int) (int, error)
}
type pipelineManager struct {
tasks []Task
}
func (pe pipelineManager) Execute(x int) (int, error) {
if len(pe.tasks) == 0 {
return 0, errors.New("No tasks")
}
res := x
for _, t := range pe.tasks {
r, err := t.Execute(res)
res = r
if err != nil {
return 0, err
}
}
return res, nil
}
// Pipeline ...
func Pipeline(tasks ...Task) Task {
return pipelineManager{tasks: tasks}
}
// ======================================
type fastest struct {
tasks []Task
}
type result struct {
data int
err error
}
func (f fastest) Execute(x int) (int, error) {
if len(f.tasks) == 0 {
return 0, errors.New("No tasks")
}
ch := make(chan result, 1)
for _, task := range f.tasks {
go func(t Task) {
res, err := t.Execute(x)
select {
case ch <- result{res, err}:
default:
}
}(task)
}
finalResult := <-ch
return finalResult.data, finalResult.err
}
// Fastest ..
func Fastest(tasks ...Task) Task {
return fastest{tasks}
}
// ======================================
type timed struct {
task Task
timeout time.Duration
}
func (t timed) Execute(x int) (int, error) {
ch := make(chan result)
go func() {
res, err := t.task.Execute(x)
ch <- result{res, err}
}()
select {
case res := <-ch:
return res.data, res.err
case <-time.After(t.timeout):
return 0, errors.New("main: timed out on Execute")
}
}
// Timed ...
func Timed(task Task, timeout time.Duration) Task {
return timed{task: task, timeout: timeout}
}
// ======================================
type concurrentMapReduce struct {
reduce func(results []int) int
tasks []Task
}
func (c concurrentMapReduce) Execute(x int) (int, error) {
if len(c.tasks) == 0 {
return 0, errors.New("No tasks")
}
ch := make(chan result, 1)
for _, task := range c.tasks {
go func(t Task) {
res, err := t.Execute(x)
select {
case ch <- result{res, err}:
default:
}
}(task)
}
results := make([]int, 0)
for i := 0; i < len(c.tasks); i++ {
r := <-ch
if r.err != nil {
return 0, r.err
}
results = append(results, r.data)
}
return c.reduce(results), nil
}
// ConcurrentMapReduce ...
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return concurrentMapReduce{reduce: reduce, tasks: tasks}
}
// ======================================
type greatestSearcher struct {
tasks <-chan Task
errorLimit int
}
// Execute is hedious.
func (g greatestSearcher) Execute(x int) (int, error) {
ch := make(chan result)
finish := make(chan struct{})
final := make(chan result)
go func() {
results := make([]result, 0)
for {
select {
case res := <-ch:
results = append(results, res)
case <-finish:
close(ch)
errorCount := 0
finalResult := result{}
if len(results) == 0 {
finalResult = result{0, errors.New("No Tasks")}
final <- finalResult
}
max := results[0].data
for _, r := range results[1:] {
if r.data > max {
max = r.data
}
if r.err != nil {
errorCount++
}
}
if errorCount > g.errorLimit {
finalResult = result{0, errors.New("ErrorLimit reached")}
} else {
finalResult = result{max, nil}
}
final <- finalResult
}
}
}()
var wg sync.WaitGroup
for task := range g.tasks {
wg.Add(1)
go func(t Task) {
res, err := t.Execute(x)
ch <- result{res, err}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
y := <-final
return y.data, y.err
}
// GreatestSearcher ...
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greatestSearcher{errorLimit: errorLimit, tasks: tasks}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.013s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.003s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.003s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.113s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.214s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.134s
--- FAIL: TestTimedDoesntLeaveGoroutineHanging (0.20s)
	solution_test.go:216: Expected that there will be as many goroutines as at the start(3) after Timed task has finished after it has timeouted but got 4
		
		BEFORE:
		goroutine profile: total 2
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x46c456 0x46f4cd 0x46c0f1 0x46d165 0x46c785 0x401276 0x42a494 0x459f51
		#	0x46c455	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f4cc	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c0f0	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d164	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c784	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-101nr7r/_test/_testmain.go:78
		#	0x42a493	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x4c308f 0x4c2e90 0x4bfb41 0x473446 0x46c0f1 0x459f51
		#	0x4c308e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c2e8f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4bfb40	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x473445	_/tmp/d20161129-30451-101nr7r.TestTimedDoesntLeaveGoroutineHanging+0x175	/tmp/d20161129-30451-101nr7r/solution_test.go:191
		#	0x46c0f0	testing.tRunner+0x80								/usr/local/go/src/testing/testing.go:610
		
		
		
		AFTER:
		goroutine profile: total 4
		1 @ 0x42a8fa 0x42a9ee 0x4039a8 0x40376d 0x4753a3 0x459f51
		#	0x4753a2	_/tmp/d20161129-30451-101nr7r.timed.Execute.func1+0x92	/tmp/d20161129-30451-101nr7r/solution.go:81
		
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x46c456 0x46f4cd 0x46c0f1 0x46d165 0x46c785 0x401276 0x42a494 0x459f51
		#	0x46c455	testing.(*T).Run+0x315		/usr/local/go/src/testing/testing.go:647
		#	0x46f4cc	testing.RunTests.func1+0x6c	/usr/local/go/src/testing/testing.go:793
		#	0x46c0f0	testing.tRunner+0x80		/usr/local/go/src/testing/testing.go:610
		#	0x46d164	testing.RunTests+0x2f4		/usr/local/go/src/testing/testing.go:799
		#	0x46c784	testing.(*M).Run+0x84		/usr/local/go/src/testing/testing.go:743
		#	0x401275	main.main+0xc5			_/tmp/d20161129-30451-101nr7r/_test/_testmain.go:78
		#	0x42a493	runtime.main+0x1f3		/usr/local/go/src/runtime/proc.go:183
		
		1 @ 0x42a8fa 0x42a9ee 0x40469f 0x404365 0x4735df 0x46c0f1 0x459f51
		#	0x4735de	_/tmp/d20161129-30451-101nr7r.TestTimedDoesntLeaveGoroutineHanging+0x30e	/tmp/d20161129-30451-101nr7r/solution_test.go:225
		#	0x46c0f0	testing.tRunner+0x80								/usr/local/go/src/testing/testing.go:610
		
		1 @ 0x4c308f 0x4c2e90 0x4bfb41 0x4762e1 0x459f51
		#	0x4c308e	runtime/pprof.writeRuntimeProfile+0x9e						/usr/local/go/src/runtime/pprof/pprof.go:614
		#	0x4c2e8f	runtime/pprof.writeGoroutine+0x9f						/usr/local/go/src/runtime/pprof/pprof.go:576
		#	0x4bfb40	runtime/pprof.(*Profile).WriteTo+0x340						/usr/local/go/src/runtime/pprof/pprof.go:298
		#	0x4762e0	_/tmp/d20161129-30451-101nr7r.TestTimedDoesntLeaveGoroutineHanging.func2+0x1a0	/tmp/d20161129-30451-101nr7r/solution_test.go:212
		
		
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-101nr7r	0.203s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.003s
panic: test timed out after 1s

goroutine 17 [running]:
panic(0x4f26e0, 0xc42008c030)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200700c0, 0x520ce6, 0x1d, 0x52bea0, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200700c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200700c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52bfa0, 0x5ac980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-101nr7r/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-101nr7r.concurrentMapReduce.Execute(0x52bf70, 0xc42000a480, 0x3, 0x3, 0x2c, 0xc42000a480, 0x3, 0x3)
	/tmp/d20161129-30451-101nr7r/solution.go:118 +0x181
_/tmp/d20161129-30451-101nr7r.(*concurrentMapReduce).Execute(0xc42000e460, 0x2c, 0x3, 0x3, 0x5995c0)
	<autogenerated>:5 +0x77
_/tmp/d20161129-30451-101nr7r.TestConcurrentMapReduceSimple(0xc420070180)
	/tmp/d20161129-30451-101nr7r/solution_test.go:250 +0x18f
testing.tRunner(0xc420070180, 0x52bea0)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec
exit status 2
FAIL	_/tmp/d20161129-30451-101nr7r	1.005s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.003s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.048s
panic: runtime error: index out of range

goroutine 23 [running]:
panic(0x4fc8e0, 0xc4200100f0)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
_/tmp/d20161129-30451-101nr7r.greatestSearcher.Execute.func1(0xc420096180, 0xc4200961e0, 0xc420096240, 0xc420096120, 0x1)
	/tmp/d20161129-30451-101nr7r/solution.go:159 +0x3ae
created by _/tmp/d20161129-30451-101nr7r.greatestSearcher.Execute
	/tmp/d20161129-30451-101nr7r/solution.go:177 +0xe3
exit status 2
FAIL	_/tmp/d20161129-30451-101nr7r	0.051s
PASS
ok  	_/tmp/d20161129-30451-101nr7r	0.129s

История (2 версии и 2 коментара)

Диан обнови решението на 29.11.2016 09:36 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+// Task ...
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type pipelineManager struct {
+ tasks []Task
+}
+
+func (pe pipelineManager) Execute(x int) (int, error) {
+ if len(pe.tasks) == 0 {
+ return 0, nil
+ }
+ res := x
+ for _, t := range pe.tasks {
+ r, err := t.Execute(res)
+ res = r
+ if err != nil {
+ return 0, err
+ }
+ }
+ return res, nil
+}
+
+// Pipeline ...
+func Pipeline(tasks ...Task) Task {
+ return pipelineManager{tasks: tasks}
+}
+
+// ======================================
+type fastest struct {
+ tasks []Task
+}
+
+type result struct {
+ data int
+ err error
+}
+
+func (f fastest) Execute(x int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, nil
+ }
+ ch := make(chan result, 1)
+ for _, task := range f.tasks {
+ go func(t Task) {
+ res, err := t.Execute(x)
+ select {
+ case ch <- result{res, err}:
+ default:
+ }
+ }(task)
+ }
+ finalResult := <-ch
+ return finalResult.data, finalResult.err
+}
+
+// Fastest ..
+func Fastest(tasks ...Task) Task {
+ return fastest{tasks}
+}
+
+// ======================================
+type timed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t timed) Execute(x int) (int, error) {
+ ch := make(chan result)
+ go func() {
+ res, err := t.task.Execute(x)
+ ch <- result{res, err}
+ }()
+ select {
+ case res := <-ch:
+ return res.data, res.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("main: timed out on Execute")
+ }
+}
+
+// Timed ...
+func Timed(task Task, timeout time.Duration) Task {
+ return timed{task: task, timeout: timeout}
+}
+
+// ======================================
+type concurrentMapReduce struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (c concurrentMapReduce) Execute(x int) (int, error) {
+ if len(c.tasks) == 0 {
+ return 0, nil
+ }
+ ch := make(chan result, 1)
+ for _, task := range c.tasks {
+ go func(t Task) {
+ res, err := t.Execute(x)
+ select {
+ case ch <- result{res, err}:
+ default:
+ }
+ }(task)
+ }
+ results := make([]int, 0)
+ for i := 0; i < len(c.tasks); i++ {
+ r := <-ch
+ if r.err != nil {
+ return 0, r.err
+ }
+ results = append(results, r.data)
+ }
+ return c.reduce(results), nil
+}
+
+// ConcurrentMapReduce ...
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return concurrentMapReduce{reduce: reduce, tasks: tasks}
+}
+
+// ======================================
+type greatestSearcher struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+// Execute is hedious.
+func (g greatestSearcher) Execute(x int) (int, error) {
+ ch := make(chan result)
+ finish := make(chan struct{})
+ final := make(chan result)
+
+ go func() {
+ results := make([]result, 0)
+ for {
+ select {
+ case res := <-ch:
+ results = append(results, res)
+ case <-finish:
+ close(ch)
+ errorCount := 0
+ finalResult := result{}
+ if len(results) == 0 {
+ finalResult = result{0, fmt.Errorf("No Tasks")}
+ final <- finalResult
+ }
+
+ max := results[0].data
+ for _, r := range results[1:] {
+ if r.data > max {
+ max = r.data
+ }
+ if r.err != nil {
+ errorCount++
+ }
+ }
+
+ if errorCount > g.errorLimit {
+ finalResult = result{0, fmt.Errorf("ErrorLimit reached")}
+ } else {
+ finalResult = result{max, nil}
+ }
+ final <- finalResult
+ }
+ }
+ }()
+
+ var wg sync.WaitGroup
+ for task := range g.tasks {
+ wg.Add(1)
+ go func(t Task) {
+ res, err := t.Execute(x)
+ ch <- result{res, err}
+ wg.Done()
+ }(task)
+ }
+
+ wg.Wait()
+ finish <- struct{}{}
+ y := <-final
+ return y.data, y.err
+}
+
+// GreatestSearcher ...
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return greatestSearcher{errorLimit: errorLimit, tasks: tasks}
+}

Диан обнови решението на 29.11.2016 14:39 (преди над 1 година)

package main
import (
- "fmt"
+ "errors"
"sync"
"time"
)
// Task ...
type Task interface {
Execute(int) (int, error)
}
type pipelineManager struct {
tasks []Task
}
func (pe pipelineManager) Execute(x int) (int, error) {
if len(pe.tasks) == 0 {
- return 0, nil
+ return 0, errors.New("No tasks")
}
res := x
for _, t := range pe.tasks {
r, err := t.Execute(res)
res = r
if err != nil {
return 0, err
}
}
return res, nil
}
// Pipeline ...
func Pipeline(tasks ...Task) Task {
return pipelineManager{tasks: tasks}
}
// ======================================
type fastest struct {
tasks []Task
}
type result struct {
data int
err error
}
func (f fastest) Execute(x int) (int, error) {
if len(f.tasks) == 0 {
- return 0, nil
+ return 0, errors.New("No tasks")
}
ch := make(chan result, 1)
for _, task := range f.tasks {
go func(t Task) {
res, err := t.Execute(x)
select {
case ch <- result{res, err}:
default:
}
}(task)
}
finalResult := <-ch
return finalResult.data, finalResult.err
}
// Fastest ..
func Fastest(tasks ...Task) Task {
return fastest{tasks}
}
// ======================================
type timed struct {
task Task
timeout time.Duration
}
func (t timed) Execute(x int) (int, error) {
ch := make(chan result)
go func() {
res, err := t.task.Execute(x)
ch <- result{res, err}
}()
select {
case res := <-ch:
return res.data, res.err
case <-time.After(t.timeout):
- return 0, fmt.Errorf("main: timed out on Execute")
+ return 0, errors.New("main: timed out on Execute")
}
}
// Timed ...
func Timed(task Task, timeout time.Duration) Task {
return timed{task: task, timeout: timeout}
}
// ======================================
type concurrentMapReduce struct {
reduce func(results []int) int
tasks []Task
}
func (c concurrentMapReduce) Execute(x int) (int, error) {
if len(c.tasks) == 0 {
- return 0, nil
+ return 0, errors.New("No tasks")
}
ch := make(chan result, 1)
for _, task := range c.tasks {
go func(t Task) {
res, err := t.Execute(x)
select {
case ch <- result{res, err}:
default:
}
}(task)
}
results := make([]int, 0)
for i := 0; i < len(c.tasks); i++ {
r := <-ch
if r.err != nil {
return 0, r.err
}
results = append(results, r.data)
}
return c.reduce(results), nil
}
// ConcurrentMapReduce ...
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return concurrentMapReduce{reduce: reduce, tasks: tasks}
}
// ======================================
type greatestSearcher struct {
tasks <-chan Task
errorLimit int
}
// Execute is hedious.
func (g greatestSearcher) Execute(x int) (int, error) {
ch := make(chan result)
finish := make(chan struct{})
final := make(chan result)
go func() {
results := make([]result, 0)
for {
select {
case res := <-ch:
results = append(results, res)
case <-finish:
close(ch)
errorCount := 0
finalResult := result{}
if len(results) == 0 {
- finalResult = result{0, fmt.Errorf("No Tasks")}
+ finalResult = result{0, errors.New("No Tasks")}
final <- finalResult
}
max := results[0].data
for _, r := range results[1:] {
if r.data > max {
max = r.data
}
if r.err != nil {
errorCount++
}
}
if errorCount > g.errorLimit {
- finalResult = result{0, fmt.Errorf("ErrorLimit reached")}
+ finalResult = result{0, errors.New("ErrorLimit reached")}
} else {
finalResult = result{max, nil}
}
final <- finalResult
}
}
}()
var wg sync.WaitGroup
for task := range g.tasks {
wg.Add(1)
go func(t Task) {
res, err := t.Execute(x)
ch <- result{res, err}
wg.Done()
}(task)
}
wg.Wait()
finish <- struct{}{}
y := <-final
return y.data, y.err
}
// GreatestSearcher ...
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return greatestSearcher{errorLimit: errorLimit, tasks: tasks}
}