Решение на Concurrent Tasks от Александър Ангелов

Обратно към всички решения

Към профила на Александър Ангелов

Резултати

  • 12 точки от тестове
  • 0 бонус точки
  • 12 точки общо
  • 12 успешни тест(а)
  • 1 неуспешни тест(а)

Код

package main
import (
"fmt"
"time"
)
type Task interface {
Execute(int) (int, error)
}
// Pipeline stuff
type pipelineTask struct {
tasks []Task
}
func (p pipelineTask) Execute(input int) (int, error) {
if len(p.tasks) == 0 {
return 0, fmt.Errorf("No tasks to execute in Pipeline Task")
}
var (
output int
err error
)
for _, t := range p.tasks {
output, err = t.Execute(input)
if err != nil {
return 0, err
}
input = output
}
return output, nil
}
func Pipeline(tasks ...Task) Task {
var t pipelineTask
t.tasks = tasks
return t
}
// Fastest stuff
type fastestTask struct {
tasks []Task
}
func (f fastestTask) Execute(in int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("No input tasks")
}
doneSync := make(chan struct{}, 1)
setOutputSync := make(chan struct{}, 1)
var (
output int
outputError error
)
for _, t := range f.tasks {
go func(task Task) {
out, err := task.Execute(in)
if _, ok := <-setOutputSync; ok {
output = out
outputError = err
doneSync <- struct{}{}
}
}(t)
}
setOutputSync <- struct{}{}
<-doneSync
close(setOutputSync)
close(doneSync)
return output, outputError
}
func Fastest(tasks ...Task) Task {
var f fastestTask
f.tasks = tasks
return f
}
// Timed stuff
type timedTask struct {
task Task
timeout time.Duration
}
func (t timedTask) Execute(in int) (int, error) {
doneSync := make(chan struct{}, 1)
var (
output int
outputError error
)
go func() {
out, err := t.task.Execute(in)
output = out
outputError = err
doneSync <- struct{}{}
}()
select {
case <-doneSync:
return output, outputError
case <-time.After(t.timeout):
return 0, fmt.Errorf("Timed out waiting to task")
}
}
func Timed(task Task, timeout time.Duration) Task {
var t timedTask
t.task = task
t.timeout = timeout
return t
}
// ConcurrentMapReduce
type concurrentMapReduceTask struct {
tasks []Task
reduce func(results []int) int
}
func (c concurrentMapReduceTask) Execute(in int) (int, error) {
numTasks := len(c.tasks)
if numTasks == 0 {
return 0, fmt.Errorf("No input tasks")
}
results := make([]int, 0, numTasks)
errorChannel := make(chan error)
resultChannel := make(chan int)
syncChannel := make(chan struct{})
defer close(syncChannel)
defer close(errorChannel)
defer close(resultChannel)
for _, t := range c.tasks {
go func(task Task) {
output, outputError := task.Execute(in)
if _, ok := <-syncChannel; ok {
if outputError == nil {
resultChannel <- output
} else {
errorChannel <- outputError
}
}
}(t)
}
for i := 0; i < numTasks; i++ {
syncChannel <- struct{}{}
select {
case res := <-resultChannel:
results = append(results, res)
case err := <-errorChannel:
return 0, err
}
}
return c.reduce(results), nil
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
var c concurrentMapReduceTask
c.reduce = reduce
c.tasks = tasks
return c
}
// GreatestSearcher
type greatestSearcherTask struct {
errorLimit int
tasks <-chan Task
}
func (g greatestSearcherTask) Execute(in int) (int, error) {
errorChannel := make(chan struct{})
resultChannel := make(chan int)
currentErrors := 0
maxNumber := 0
var (
counter int64
shouldQuit bool
hadAnyTasks bool
)
for !shouldQuit || counter != 0 {
select {
case <-errorChannel:
currentErrors++
counter--
case res := <-resultChannel:
if res > maxNumber {
maxNumber = res
}
counter--
default:
if t, ok := <-g.tasks; ok {
hadAnyTasks = true
counter++
go func(task Task) {
out, err := task.Execute(in)
if err != nil {
errorChannel <- struct{}{}
} else {
resultChannel <- out
}
}(t)
} else {
shouldQuit = true
}
}
}
if !hadAnyTasks {
return 0, fmt.Errorf("No input tasks")
}
if currentErrors > g.errorLimit {
return 0, fmt.Errorf("Too much errors generated from tasks")
}
return maxNumber, nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
var g greatestSearcherTask
g.errorLimit = errorLimit
g.tasks = tasks
return g
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.002s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.003s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.003s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.103s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.204s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.134s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.203s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.003s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.003s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.005s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.048s
--- FAIL: TestGreatestSearcherErrors (0.05s)
    --- FAIL: TestGreatestSearcherErrors/only_failure (0.00s)
    	solution_test.go:335: Expected error did not occur instead got 0
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-15rykrp	0.048s
PASS
ok  	_/tmp/d20161129-30451-15rykrp	0.123s

История (1 версия и 0 коментара)

Александър обнови решението на 27.11.2016 15:53 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+// Pipeline stuff
+type pipelineTask struct {
+ tasks []Task
+}
+
+func (p pipelineTask) Execute(input int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks to execute in Pipeline Task")
+ }
+
+ var (
+ output int
+ err error
+ )
+ for _, t := range p.tasks {
+ output, err = t.Execute(input)
+ if err != nil {
+ return 0, err
+ }
+ input = output
+ }
+
+ return output, nil
+}
+
+func Pipeline(tasks ...Task) Task {
+ var t pipelineTask
+ t.tasks = tasks
+ return t
+}
+
+// Fastest stuff
+type fastestTask struct {
+ tasks []Task
+}
+
+func (f fastestTask) Execute(in int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("No input tasks")
+ }
+
+ doneSync := make(chan struct{}, 1)
+ setOutputSync := make(chan struct{}, 1)
+ var (
+ output int
+ outputError error
+ )
+
+ for _, t := range f.tasks {
+ go func(task Task) {
+ out, err := task.Execute(in)
+ if _, ok := <-setOutputSync; ok {
+ output = out
+ outputError = err
+ doneSync <- struct{}{}
+ }
+ }(t)
+ }
+ setOutputSync <- struct{}{}
+ <-doneSync
+ close(setOutputSync)
+ close(doneSync)
+ return output, outputError
+}
+
+func Fastest(tasks ...Task) Task {
+ var f fastestTask
+ f.tasks = tasks
+ return f
+}
+
+// Timed stuff
+type timedTask struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t timedTask) Execute(in int) (int, error) {
+ doneSync := make(chan struct{}, 1)
+ var (
+ output int
+ outputError error
+ )
+ go func() {
+ out, err := t.task.Execute(in)
+ output = out
+ outputError = err
+ doneSync <- struct{}{}
+ }()
+
+ select {
+ case <-doneSync:
+ return output, outputError
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("Timed out waiting to task")
+ }
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ var t timedTask
+ t.task = task
+ t.timeout = timeout
+ return t
+}
+
+// ConcurrentMapReduce
+type concurrentMapReduceTask struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func (c concurrentMapReduceTask) Execute(in int) (int, error) {
+ numTasks := len(c.tasks)
+ if numTasks == 0 {
+ return 0, fmt.Errorf("No input tasks")
+ }
+
+ results := make([]int, 0, numTasks)
+ errorChannel := make(chan error)
+ resultChannel := make(chan int)
+ syncChannel := make(chan struct{})
+ defer close(syncChannel)
+ defer close(errorChannel)
+ defer close(resultChannel)
+
+ for _, t := range c.tasks {
+ go func(task Task) {
+ output, outputError := task.Execute(in)
+ if _, ok := <-syncChannel; ok {
+ if outputError == nil {
+ resultChannel <- output
+ } else {
+ errorChannel <- outputError
+ }
+ }
+ }(t)
+ }
+
+ for i := 0; i < numTasks; i++ {
+ syncChannel <- struct{}{}
+ select {
+ case res := <-resultChannel:
+ results = append(results, res)
+ case err := <-errorChannel:
+ return 0, err
+ }
+ }
+
+ return c.reduce(results), nil
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ var c concurrentMapReduceTask
+ c.reduce = reduce
+ c.tasks = tasks
+ return c
+}
+
+// GreatestSearcher
+type greatestSearcherTask struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func (g greatestSearcherTask) Execute(in int) (int, error) {
+ errorChannel := make(chan struct{})
+ resultChannel := make(chan int)
+ currentErrors := 0
+ maxNumber := 0
+ var (
+ counter int64
+ shouldQuit bool
+ hadAnyTasks bool
+ )
+
+ for !shouldQuit || counter != 0 {
+ select {
+ case <-errorChannel:
+ currentErrors++
+ counter--
+ case res := <-resultChannel:
+ if res > maxNumber {
+ maxNumber = res
+ }
+ counter--
+ default:
+ if t, ok := <-g.tasks; ok {
+ hadAnyTasks = true
+ counter++
+ go func(task Task) {
+ out, err := task.Execute(in)
+ if err != nil {
+ errorChannel <- struct{}{}
+ } else {
+ resultChannel <- out
+ }
+ }(t)
+ } else {
+ shouldQuit = true
+ }
+ }
+ }
+
+ if !hadAnyTasks {
+ return 0, fmt.Errorf("No input tasks")
+ }
+
+ if currentErrors > g.errorLimit {
+ return 0, fmt.Errorf("Too much errors generated from tasks")
+ }
+
+ return maxNumber, nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ var g greatestSearcherTask
+ g.errorLimit = errorLimit
+ g.tasks = tasks
+ return g
+}