Решение на Concurrent Tasks от Иван Филипов

Обратно към всички решения

Към профила на Иван Филипов

Резултати

  • 9 точки от тестове
  • 0 бонус точки
  • 9 точки общо
  • 9 успешни тест(а)
  • 4 неуспешни тест(а)

Код

// hw3
package main
import (
"fmt"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type PipeType struct {
tasks []Task
}
func (m *PipeType) Execute(op int) (int, error) {
var nextVal int
var err error
if len(m.tasks) == 0 {
return 0, fmt.Errorf("no params!")
}
if nextVal, err = m.tasks[0].Execute(op); err != nil {
return 0, err
}
for i := 1; i < len(m.tasks); i++ {
if nextVal, err = m.tasks[i].Execute(nextVal); err != nil {
return 0, err
}
}
return nextVal, err
}
func Pipeline(tasks ...Task) Task {
return &PipeType{tasks}
}
//second part
type FastType struct {
tasks []Task
}
func Fastest(tasks ...Task) Task {
return &FastType{tasks}
}
type Res struct {
r int
e error
}
func makeRes(r int, e error) Res {
return Res{r, e}
}
func (f *FastType) Execute(op int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("no params!")
}
ch := make(chan Res, 1)
for _, t := range f.tasks {
go func() {
select {
case ch <- makeRes(t.Execute(op)):
default:
}
}()
}
var ret Res = <-ch
return ret.r, ret.e
}
//third part
type TimeType struct {
ts Task
tm time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
return &TimeType{task, timeout}
}
func (t TimeType) Execute(op int) (int, error) {
ch := make(chan Res, 1)
go func() {
ch <- makeRes(t.ts.Execute(op))
}()
select {
case res := <-ch:
return res.r, res.e
case <-time.After(t.tm):
return 0, fmt.Errorf("time out!")
}
}
//fourth part
type MapType struct {
tasks []Task
f func([]int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &MapType{tasks, reduce}
}
func (m *MapType) Execute(op int) (int, error) {
if len(m.tasks) == 0 {
return 0, fmt.Errorf("no params!")
}
var wg sync.WaitGroup
lock := make(chan struct{}, 1) //for res slice protection
errDetector := make(chan struct{}, 1)
alright := make(chan struct{}, 1)
res := make([]int, 0)
for _, i := range m.tasks {
wg.Add(1)
go func() {
lock <- struct{}{}
if curRes, err := i.Execute(op); err != nil {
errDetector <- struct{}{}
} else {
res = append(res, curRes)
<-lock
wg.Done()
}
}()
}
go func() {
wg.Wait()
alright <- struct{}{}
}()
select {
case <-alright:
//we are sure that all routines have been passed
return m.f(res), nil
case <-errDetector:
return 0, fmt.Errorf("an error has occurred")
}
}
//fifth part
var MIN int = -100000000
type GreatestType struct {
tasks <-chan Task
errLimit int
greatest int
sync.Mutex
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &GreatestType{tasks, errorLimit, MIN, sync.Mutex{}}
}
func (g *GreatestType) Execute(op int) (int, error) {
var wg sync.WaitGroup
for {
curTask, ok := <-g.tasks
if !ok { //the chan is closed
wg.Wait()
if g.errLimit < 0 {
return 0, fmt.Errorf("error limit exceeded")
}
if g.greatest == MIN { //the channel has been closed without sending any tasks
return 0, fmt.Errorf("no tasks has been sent")
} else {
return g.greatest, nil
}
} else {
wg.Add(1)
go func() {
g.Lock()
defer g.Unlock()
if res, err := curTask.Execute(op); err != nil {
if g.errLimit > 0 {
g.errLimit--
}
} else {
if res > g.greatest {
g.greatest = res
}
}
wg.Done()
}()
}
}
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.103s
panic: test timed out after 1s

goroutine 17 [running]:
panic(0x4f2660, 0xc4200a0010)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200700c0, 0x520c03, 0x1d, 0x52bd80, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200700c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200700c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52be40, 0x5ab980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-1r0pgvj/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute(0xc42000e7a0, 0x1, 0xc420010558, 0xc420054420, 0xc420070180)
	/tmp/d20161129-30451-1r0pgvj/solution.go:91 +0x166
_/tmp/d20161129-30451-1r0pgvj.TestFastestWaitsForGoroutines(0xc420070180)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:151 +0x37e
testing.tRunner(0xc420070180, 0x52bd80)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan receive]:
_/tmp/d20161129-30451-1r0pgvj.TestFastestWaitsForGoroutines.func3(0xc420010558, 0xc420054420, 0xc420070180, 0xc420054480, 0xc420012310, 0xc4200544e0)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:131 +0x81
created by _/tmp/d20161129-30451-1r0pgvj.TestFastestWaitsForGoroutines
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:150 +0x361

goroutine 8 [chan receive]:
_/tmp/d20161129-30451-1r0pgvj.TestFastestWaitsForGoroutines.func2(0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:123 +0x55
_/tmp/d20161129-30451-1r0pgvj.fTask.Execute(0xc42000e760, 0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:40 +0x30
_/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute.func1(0xc420054a20, 0xc4200109c0, 0x1)
	/tmp/d20161129-30451-1r0pgvj/solution.go:84 +0x41
created by _/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute
	/tmp/d20161129-30451-1r0pgvj/solution.go:88 +0xf0

goroutine 9 [chan receive]:
_/tmp/d20161129-30451-1r0pgvj.TestFastestWaitsForGoroutines.func2(0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:123 +0x55
_/tmp/d20161129-30451-1r0pgvj.fTask.Execute(0xc42000e760, 0x1, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:40 +0x30
_/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute.func1(0xc420054a20, 0xc4200109c0, 0x1)
	/tmp/d20161129-30451-1r0pgvj/solution.go:84 +0x41
created by _/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute
	/tmp/d20161129-30451-1r0pgvj/solution.go:88 +0xf0
exit status 2
FAIL	_/tmp/d20161129-30451-1r0pgvj	1.005s
PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.134s
PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.203s
PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.003s
--- FAIL: TestConcurrentMapReduceSimple (0.00s)
	solution_test.go:253: Expected result to be 55 but is 77
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1r0pgvj	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.003s
PASS
ok  	_/tmp/d20161129-30451-1r0pgvj	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/like_the_example (0.05s)
    	solution_test.go:313: Expected error did not occur instead got 42
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1r0pgvj	0.048s
panic: test timed out after 1s

goroutine 8 [running]:
panic(0x4f2660, 0xc420010750)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200700c0, 0x51d1b8, 0xb, 0x52bde8, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200700c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200700c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52be40, 0x5ab980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-1r0pgvj/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute(0xc42000e3e0, 0xffffffffffffffd3, 0xffffffffffffffd3, 0x0, 0x0)
	/tmp/d20161129-30451-1r0pgvj/solution.go:91 +0x166
_/tmp/d20161129-30451-1r0pgvj.(*PipeType).Execute(0xc42000e480, 0xffffffffffffffb5, 0x5, 0x598380, 0xc42000e480)
	/tmp/d20161129-30451-1r0pgvj/solution.go:35 +0xa7
_/tmp/d20161129-30451-1r0pgvj.TestThemAll(0xc420070180)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:374 +0x678
testing.tRunner(0xc420070180, 0x52bde8)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan send]:
_/tmp/d20161129-30451-1r0pgvj.TestThemAll.func1(0xc420054360)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:343 +0x8e
created by _/tmp/d20161129-30451-1r0pgvj.TestThemAll
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:349 +0x75

goroutine 17 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-1r0pgvj.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:33 +0x32
_/tmp/d20161129-30451-1r0pgvj.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:6 +0x65
_/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute.func1(0xc42009e000, 0xc4200a2000, 0xffffffffffffffd3)
	/tmp/d20161129-30451-1r0pgvj/solution.go:84 +0x41
created by _/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute
	/tmp/d20161129-30451-1r0pgvj/solution.go:88 +0xf0

goroutine 18 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-1r0pgvj.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:33 +0x32
_/tmp/d20161129-30451-1r0pgvj.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:6 +0x65
_/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute.func1(0xc42009e000, 0xc4200a2000, 0xffffffffffffffd3)
	/tmp/d20161129-30451-1r0pgvj/solution.go:84 +0x41
created by _/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute
	/tmp/d20161129-30451-1r0pgvj/solution.go:88 +0xf0

goroutine 19 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-1r0pgvj.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1r0pgvj/solution_test.go:33 +0x32
_/tmp/d20161129-30451-1r0pgvj.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:6 +0x65
_/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute.func1(0xc42009e000, 0xc4200a2000, 0xffffffffffffffd3)
	/tmp/d20161129-30451-1r0pgvj/solution.go:84 +0x41
created by _/tmp/d20161129-30451-1r0pgvj.(*FastType).Execute
	/tmp/d20161129-30451-1r0pgvj/solution.go:88 +0xf0
exit status 2
FAIL	_/tmp/d20161129-30451-1r0pgvj	1.008s

История (1 версия и 0 коментара)

Иван обнови решението на 29.11.2016 14:53 (преди над 1 година)

+// hw3
+package main
+
+import (
+ "fmt"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type PipeType struct {
+ tasks []Task
+}
+
+func (m *PipeType) Execute(op int) (int, error) {
+
+ var nextVal int
+ var err error
+
+ if len(m.tasks) == 0 {
+ return 0, fmt.Errorf("no params!")
+ }
+
+ if nextVal, err = m.tasks[0].Execute(op); err != nil {
+
+ return 0, err
+
+ }
+
+ for i := 1; i < len(m.tasks); i++ {
+
+ if nextVal, err = m.tasks[i].Execute(nextVal); err != nil {
+
+ return 0, err
+
+ }
+
+ }
+
+ return nextVal, err
+
+}
+
+func Pipeline(tasks ...Task) Task {
+
+ return &PipeType{tasks}
+
+}
+
+//second part
+type FastType struct {
+ tasks []Task
+}
+
+func Fastest(tasks ...Task) Task {
+
+ return &FastType{tasks}
+
+}
+
+type Res struct {
+ r int
+ e error
+}
+
+func makeRes(r int, e error) Res {
+ return Res{r, e}
+}
+
+func (f *FastType) Execute(op int) (int, error) {
+
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("no params!")
+ }
+
+ ch := make(chan Res, 1)
+
+ for _, t := range f.tasks {
+ go func() {
+ select {
+ case ch <- makeRes(t.Execute(op)):
+
+ default:
+ }
+ }()
+ }
+
+ var ret Res = <-ch
+
+ return ret.r, ret.e
+
+}
+
+//third part
+
+type TimeType struct {
+ ts Task
+ tm time.Duration
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+
+ return &TimeType{task, timeout}
+
+}
+
+func (t TimeType) Execute(op int) (int, error) {
+
+ ch := make(chan Res, 1)
+
+ go func() {
+
+ ch <- makeRes(t.ts.Execute(op))
+
+ }()
+
+ select {
+
+ case res := <-ch:
+ return res.r, res.e
+
+ case <-time.After(t.tm):
+ return 0, fmt.Errorf("time out!")
+
+ }
+
+}
+
+//fourth part
+type MapType struct {
+ tasks []Task
+
+ f func([]int) int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+
+ return &MapType{tasks, reduce}
+
+}
+
+func (m *MapType) Execute(op int) (int, error) {
+
+ if len(m.tasks) == 0 {
+ return 0, fmt.Errorf("no params!")
+ }
+
+ var wg sync.WaitGroup
+
+ lock := make(chan struct{}, 1) //for res slice protection
+ errDetector := make(chan struct{}, 1)
+ alright := make(chan struct{}, 1)
+
+ res := make([]int, 0)
+
+ for _, i := range m.tasks {
+
+ wg.Add(1)
+ go func() {
+
+ lock <- struct{}{}
+
+ if curRes, err := i.Execute(op); err != nil {
+
+ errDetector <- struct{}{}
+
+ } else {
+
+ res = append(res, curRes)
+ <-lock
+ wg.Done()
+ }
+
+ }()
+ }
+
+ go func() {
+
+ wg.Wait()
+ alright <- struct{}{}
+ }()
+
+ select {
+
+ case <-alright:
+ //we are sure that all routines have been passed
+ return m.f(res), nil
+
+ case <-errDetector:
+ return 0, fmt.Errorf("an error has occurred")
+
+ }
+
+}
+
+//fifth part
+
+var MIN int = -100000000
+
+type GreatestType struct {
+ tasks <-chan Task
+ errLimit int
+ greatest int
+ sync.Mutex
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+
+ return &GreatestType{tasks, errorLimit, MIN, sync.Mutex{}}
+
+}
+
+func (g *GreatestType) Execute(op int) (int, error) {
+
+ var wg sync.WaitGroup
+
+ for {
+
+ curTask, ok := <-g.tasks
+
+ if !ok { //the chan is closed
+
+ wg.Wait()
+
+ if g.errLimit < 0 {
+ return 0, fmt.Errorf("error limit exceeded")
+ }
+
+ if g.greatest == MIN { //the channel has been closed without sending any tasks
+ return 0, fmt.Errorf("no tasks has been sent")
+ } else {
+ return g.greatest, nil
+ }
+ } else {
+
+ wg.Add(1)
+ go func() {
+
+ g.Lock()
+
+ defer g.Unlock()
+
+ if res, err := curTask.Execute(op); err != nil {
+
+ if g.errLimit > 0 {
+
+ g.errLimit--
+
+ }
+
+ } else {
+
+ if res > g.greatest {
+ g.greatest = res
+ }
+
+ }
+ wg.Done()
+ }()
+ }
+
+ }
+
+}