Решение на Concurrent Tasks от Димитър Влаховски

Обратно към всички решения

Към профила на Димитър Влаховски

Резултати

  • 11 точки от тестове
  • 0 бонус точки
  • 11 точки общо
  • 11 успешни тест(а)
  • 2 неуспешни тест(а)

Код

package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type ValErr struct {
value int
err error
}
//-------------------------------------
type PipelinedTasks struct {
tasks []Task
}
func Pipeline(tasks ...Task) Task {
p := new(PipelinedTasks)
p.tasks = tasks
return p
}
func (p PipelinedTasks) Execute(value int) (int, error) {
if len(p.tasks) == 0 {
return 0, fmt.Errorf("no tasks!")
}
var err error
for _, task := range p.tasks {
value, err = task.Execute(value)
if err != nil {
return 0, err
}
}
return value, nil
}
//-------------------------------------
type FastestTask struct {
tasks []Task
}
func Fastest(tasks ...Task) Task {
f := new(FastestTask)
f.tasks = tasks
return f
}
func (f FastestTask) Execute(value int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("no tasks!")
}
ch := make(chan ValErr)
for _, task := range f.tasks {
go func(task Task) {
res, err := task.Execute(value)
select {
case ch <- ValErr{res, err}:
default:
}
}(task)
}
valErr := <-ch
return valErr.value, valErr.err
}
//------------------------------------------------------
type TimedTask struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
t := new(TimedTask)
t.task = task
t.timeout = timeout
return t
}
func (t TimedTask) Execute(value int) (int, error) {
ch := make(chan ValErr)
go func(task Task) {
val, err := task.Execute(value)
select {
case ch <- ValErr{val, err}:
default:
}
}(t.task)
select {
case valErr := <-ch:
return valErr.value, valErr.err
case <-time.After(t.timeout):
return 0, fmt.Errorf("timed-out")
}
}
//--------------------------
type MapReduceTasks struct {
tasks []Task
reduce func(results []int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
t := new(MapReduceTasks)
t.tasks = tasks
t.reduce = reduce
return t
}
func (m MapReduceTasks) Execute(value int) (int, error) {
if len(m.tasks) == 0 {
return 0, fmt.Errorf("no tasks!")
}
ch := make(chan ValErr)
errCh := make(chan struct{}, len(m.tasks))
for _, task := range m.tasks {
go func(task Task) {
res, err := task.Execute(value)
select {
case ch <- ValErr{res, err}:
case <-errCh:
return
}
}(task)
}
values := make([]int, 0)
for valErr := range ch {
if valErr.err != nil {
for i := 0; i < len(m.tasks); i++ {
errCh <- struct{}{}
}
return 0, valErr.err
}
values = append(values, valErr.value)
if len(values) == len(m.tasks) {
close(ch)
}
}
return m.reduce(values), nil
}
//---------------------------------------------------
type GreatestTask struct {
errorLimit int
tasks <-chan Task
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
g := new(GreatestTask)
g.tasks = tasks
g.errorLimit = errorLimit
return g
}
func (g GreatestTask) Execute(value int) (int, error) {
ch := make(chan ValErr)
flag := false
go func() {
var wg sync.WaitGroup
for task := range g.tasks {
wg.Add(1)
go func(task Task) {
val, err := task.Execute(value)
ch <- ValErr{val, err}
wg.Done()
}(task)
}
wg.Wait()
close(ch)
}()
errorCount := 0
max := math.MinInt64
for valErr := range ch {
flag = true
if valErr.err != nil {
errorCount++
if errorCount > g.errorLimit {
return 0, fmt.Errorf("error limit reached!")
}
}
if valErr.value > max {
max = valErr.value
}
}
if flag == false {
return 0, fmt.Errorf("no tasks passed!")
}
return max, nil
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.103s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.203s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.134s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.203s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.002s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.003s
PASS
ok  	_/tmp/d20161129-30451-1vbqw6z	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/only_failure (0.00s)
    	solution_test.go:335: Expected error did not occur instead got 0
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1vbqw6z	0.048s
panic: test timed out after 1s

goroutine 20 [running]:
panic(0x4f2680, 0xc420096010)
	/usr/local/go/src/runtime/panic.go:500 +0x1a1
testing.startAlarm.func1()
	/usr/local/go/src/testing/testing.go:918 +0x10b
created by time.goFunc
	/usr/local/go/src/time/sleep.go:154 +0x44

goroutine 1 [chan receive]:
testing.(*T).Run(0xc4200720c0, 0x51d1e3, 0xb, 0x52bea8, 0xc42003bd01)
	/usr/local/go/src/testing/testing.go:647 +0x316
testing.RunTests.func1(0xc4200720c0)
	/usr/local/go/src/testing/testing.go:793 +0x6d
testing.tRunner(0xc4200720c0, 0xc42003be20)
	/usr/local/go/src/testing/testing.go:610 +0x81
testing.RunTests(0x52bf00, 0x5ac980, 0xd, 0xd, 0x0)
	/usr/local/go/src/testing/testing.go:799 +0x2f5
testing.(*M).Run(0xc42003bee8, 0xc42000e3a0)
	/usr/local/go/src/testing/testing.go:743 +0x85
main.main()
	_/tmp/d20161129-30451-1vbqw6z/_test/_testmain.go:78 +0xc6

goroutine 6 [chan receive]:
_/tmp/d20161129-30451-1vbqw6z.FastestTask.Execute(0xc42000a480, 0x3, 0x3, 0xffffffffffffffd3, 0xffffffffffffffd3, 0x0, 0x0)
	/tmp/d20161129-30451-1vbqw6z/solution.go:75 +0x101
_/tmp/d20161129-30451-1vbqw6z.(*FastestTask).Execute(0xc42000e3e0, 0xffffffffffffffd3, 0xffffffffffffffd3, 0x0, 0x0)
	<autogenerated>:3 +0x6e
_/tmp/d20161129-30451-1vbqw6z.PipelinedTasks.Execute(0xc42001a0a0, 0x5, 0x5, 0xffffffffffffffb5, 0x5015c0, 0xc42000e460, 0xc42000e460)
	/tmp/d20161129-30451-1vbqw6z/solution.go:38 +0x60
_/tmp/d20161129-30451-1vbqw6z.(*PipelinedTasks).Execute(0xc42000e460, 0xffffffffffffffb5, 0x5, 0x599380, 0xc42000e460)
	<autogenerated>:2 +0x6e
_/tmp/d20161129-30451-1vbqw6z.TestThemAll(0xc420072180)
	/tmp/d20161129-30451-1vbqw6z/solution_test.go:374 +0x671
testing.tRunner(0xc420072180, 0x52bea8)
	/usr/local/go/src/testing/testing.go:610 +0x81
created by testing.(*T).Run
	/usr/local/go/src/testing/testing.go:646 +0x2ec

goroutine 7 [chan send]:
_/tmp/d20161129-30451-1vbqw6z.TestThemAll.func1(0xc420056360)
	/tmp/d20161129-30451-1vbqw6z/solution_test.go:343 +0x8e
created by _/tmp/d20161129-30451-1vbqw6z.TestThemAll
	/tmp/d20161129-30451-1vbqw6z/solution_test.go:349 +0x75

goroutine 18 [sleep]:
time.Sleep(0x6fc23ac00)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-1vbqw6z.lazyAdder.Execute(0x12c, 0x7530, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1vbqw6z/solution_test.go:33 +0x32
_/tmp/d20161129-30451-1vbqw6z.(*lazyAdder).Execute(0xc4200106b0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:8 +0x65
_/tmp/d20161129-30451-1vbqw6z.FastestTask.Execute.func1(0xffffffffffffffd3, 0xc42008c000, 0x599740, 0xc4200106b0)
	/tmp/d20161129-30451-1vbqw6z/solution.go:67 +0x3f
created by _/tmp/d20161129-30451-1vbqw6z.FastestTask.Execute
	/tmp/d20161129-30451-1vbqw6z/solution.go:72 +0xae

goroutine 19 [sleep]:
time.Sleep(0x2540be400)
	/usr/local/go/src/runtime/time.go:59 +0xe1
_/tmp/d20161129-30451-1vbqw6z.lazyAdder.Execute(0x64, 0x2710, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	/tmp/d20161129-30451-1vbqw6z/solution_test.go:33 +0x32
_/tmp/d20161129-30451-1vbqw6z.(*lazyAdder).Execute(0xc4200106c0, 0xffffffffffffffd3, 0x0, 0x0, 0x0)
	<autogenerated>:8 +0x65
_/tmp/d20161129-30451-1vbqw6z.FastestTask.Execute.func1(0xffffffffffffffd3, 0xc42008c000, 0x599740, 0xc4200106c0)
	/tmp/d20161129-30451-1vbqw6z/solution.go:67 +0x3f
created by _/tmp/d20161129-30451-1vbqw6z.FastestTask.Execute
	/tmp/d20161129-30451-1vbqw6z/solution.go:72 +0xae
exit status 2
FAIL	_/tmp/d20161129-30451-1vbqw6z	1.005s

История (2 версии и 1 коментар)

Димитър обнови решението на 28.11.2016 23:22 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type ValErr struct {
+ value int
+ err error
+}
+
+//-------------------------------------
+type PipelinedTasks struct {
+ tasks []Task
+}
+
+func Pipeline(tasks ...Task) Task {
+ p := new(PipelinedTasks)
+ p.tasks = tasks
+ return p
+}
+
+func (p PipelinedTasks) Execute(value int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, fmt.Errorf("no tasks!")
+ }
+
+ var err error
+
+ for _, task := range p.tasks {
+ value, err = task.Execute(value)
+ if err != nil {
+ return 0, err
+ }
+ }
+
+ return value, nil
+}
+
+//-------------------------------------
+type FastestTask struct {
+ tasks []Task
+}
+
+func Fastest(tasks ...Task) Task {
+ f := new(FastestTask)
+ f.tasks = tasks
+ return f
+}
+
+func (f FastestTask) Execute(value int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("no tasks!")
+ }
+
+ ch := make(chan ValErr)
+
+ for _, task := range f.tasks {
+ go func(task Task) {
+ res, err := task.Execute(value)
+ select {
+ case ch <- ValErr{res, err}:
+ default:
+ }
+ }(task)
+ }
+
+ valErr := <-ch
+ return valErr.value, valErr.err
+}
+
+//------------------------------------------------------
+type TimedTask struct {
+ task Task
+ timeout time.Duration
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ t := new(TimedTask)
+ t.task = task
+ t.timeout = timeout
+ return t
+}
+
+func (t TimedTask) Execute(value int) (int, error) {
+ ch := make(chan ValErr)
+
+ go func(task Task) {
+ val, err := task.Execute(value)
+ select {
+ case ch <- ValErr{val, err}:
+ default:
+ }
+ }(t.task)
+
+ select {
+ case valErr := <-ch:
+ return valErr.value, valErr.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("timed-out")
+ }
+}
+
+//--------------------------
+type MapReduceTasks struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ t := new(MapReduceTasks)
+ t.tasks = tasks
+ t.reduce = reduce
+ return t
+}
+
+func (m MapReduceTasks) Execute(value int) (int, error) {
+ if len(m.tasks) == 0 {
+ return 0, fmt.Errorf("no tasks!")
+ }
+
+ ch := make(chan ValErr)
+ errCh := make(chan struct{}, len(m.tasks))
+
+ for _, task := range m.tasks {
+ go func(task Task) {
+ res, err := task.Execute(value)
+ select {
+ case ch <- ValErr{res, err}:
+ case <-errCh:
+ return
+ }
+ }(task)
+ }
+
+ values := make([]int, 0)
+
+ for valErr := range ch {
+ if valErr.err != nil {
+ for i := 0; i < len(m.tasks); i++ {
+ errCh <- struct{}{}
+ }
+ return 0, valErr.err
+ }
+ values = append(values, valErr.value)
+
+ if len(values) == len(m.tasks) {
+ close(ch)
+ }
+ }
+
+ return m.reduce(values), nil
+}
+
+//---------------------------------------------------
+type GreatestTask struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ g := new(GreatestTask)
+ g.tasks = tasks
+ g.errorLimit = errorLimit
+ return g
+}
+
+func (g GreatestTask) Execute(value int) (int, error) {
+ ch := make(chan ValErr)
+ flag := false
+
+ go func() {
+ var wg sync.WaitGroup
+
+ for task := range g.tasks {
+ flag = true
+ wg.Add(1)
+ go func(task Task) {
+ val, err := task.Execute(value)
+ ch <- ValErr{val, err}
+ wg.Done()
+ }(task)
+ }
+ wg.Wait()
+ close(ch)
+ }()
+
+ if flag == false {
+ return 0, fmt.Errorf("no tasks passed!")
+ }
+
+ errorCount := 0
+ max := math.MinInt64
+
+ for valErr := range ch {
+ if valErr.err != nil {
+ errorCount++
+ if errorCount > g.errorLimit {
+ return 0, fmt.Errorf("error limit reached!")
+ }
+ }
+
+ if valErr.value > max {
+ max = valErr.value
+ }
+ }
+
+ return max, nil
+}

Димитър обнови решението на 29.11.2016 00:51 (преди над 1 година)

package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type ValErr struct {
value int
err error
}
//-------------------------------------
type PipelinedTasks struct {
tasks []Task
}
func Pipeline(tasks ...Task) Task {
p := new(PipelinedTasks)
p.tasks = tasks
return p
}
func (p PipelinedTasks) Execute(value int) (int, error) {
if len(p.tasks) == 0 {
return 0, fmt.Errorf("no tasks!")
}
var err error
for _, task := range p.tasks {
value, err = task.Execute(value)
if err != nil {
return 0, err
}
}
return value, nil
}
//-------------------------------------
type FastestTask struct {
tasks []Task
}
func Fastest(tasks ...Task) Task {
f := new(FastestTask)
f.tasks = tasks
return f
}
func (f FastestTask) Execute(value int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("no tasks!")
}
ch := make(chan ValErr)
for _, task := range f.tasks {
go func(task Task) {
res, err := task.Execute(value)
select {
case ch <- ValErr{res, err}:
default:
}
}(task)
}
valErr := <-ch
return valErr.value, valErr.err
}
//------------------------------------------------------
type TimedTask struct {
task Task
timeout time.Duration
}
func Timed(task Task, timeout time.Duration) Task {
t := new(TimedTask)
t.task = task
t.timeout = timeout
return t
}
func (t TimedTask) Execute(value int) (int, error) {
ch := make(chan ValErr)
go func(task Task) {
val, err := task.Execute(value)
select {
case ch <- ValErr{val, err}:
default:
}
}(t.task)
select {
case valErr := <-ch:
return valErr.value, valErr.err
case <-time.After(t.timeout):
return 0, fmt.Errorf("timed-out")
}
}
//--------------------------
type MapReduceTasks struct {
tasks []Task
reduce func(results []int) int
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
t := new(MapReduceTasks)
t.tasks = tasks
t.reduce = reduce
return t
}
func (m MapReduceTasks) Execute(value int) (int, error) {
if len(m.tasks) == 0 {
return 0, fmt.Errorf("no tasks!")
}
ch := make(chan ValErr)
errCh := make(chan struct{}, len(m.tasks))
for _, task := range m.tasks {
go func(task Task) {
res, err := task.Execute(value)
select {
case ch <- ValErr{res, err}:
case <-errCh:
return
}
}(task)
}
values := make([]int, 0)
for valErr := range ch {
if valErr.err != nil {
for i := 0; i < len(m.tasks); i++ {
errCh <- struct{}{}
}
return 0, valErr.err
}
values = append(values, valErr.value)
if len(values) == len(m.tasks) {
close(ch)
}
}
return m.reduce(values), nil
}
//---------------------------------------------------
type GreatestTask struct {
errorLimit int
tasks <-chan Task
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
g := new(GreatestTask)
g.tasks = tasks
g.errorLimit = errorLimit
return g
}
func (g GreatestTask) Execute(value int) (int, error) {
ch := make(chan ValErr)
flag := false
go func() {
var wg sync.WaitGroup
for task := range g.tasks {
- flag = true
wg.Add(1)
go func(task Task) {
val, err := task.Execute(value)
ch <- ValErr{val, err}
wg.Done()
}(task)
}
wg.Wait()
close(ch)
}()
- if flag == false {
- return 0, fmt.Errorf("no tasks passed!")
- }
-
errorCount := 0
max := math.MinInt64
for valErr := range ch {
+ flag = true
if valErr.err != nil {
errorCount++
if errorCount > g.errorLimit {
return 0, fmt.Errorf("error limit reached!")
}
}
if valErr.value > max {
max = valErr.value
}
+ }
+
+ if flag == false {
+ return 0, fmt.Errorf("no tasks passed!")
}
return max, nil
}