Решение на Concurrent Tasks от Николай Генов

Обратно към всички решения

Към профила на Николай Генов

Резултати

  • 13 точки от тестове
  • 0 бонус точки
  • 13 точки общо
  • 13 успешни тест(а)
  • 0 неуспешни тест(а)

Код

// Package main is solution by Nikolay Genov of the third problem
// in the course for the Go Programming Language in FMI, Sofia University
// For more information about the current problem, visit this link
// https://github.com/fmi/go-homework/tree/master/tasks/03
package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
func Pipeline(tasks ...Task) Task {
return &pipeline{tasks}
}
type pipeline struct {
tasks []Task
}
func (p pipeline) Execute(addend int) (int, error) {
var err error
if len(p.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
val := addend
for _, task := range p.tasks {
if val, err = task.Execute(val); err != nil {
return 0, err
}
}
return val, nil
}
func Fastest(tasks ...Task) Task {
return &fastest{tasks}
}
type fastest struct {
tasks []Task
}
type taskResult struct {
val int
err error
}
func (f fastest) Execute(addend int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
var once sync.Once
c := make(chan taskResult)
executeTask := func(i int) {
val, err := f.tasks[i].Execute(addend)
once.Do(func() { c <- taskResult{val, err} })
}
for i := range f.tasks {
go executeTask(i)
}
fastestTaskResult := <-c
return fastestTaskResult.val, fastestTaskResult.err
}
func Timed(task Task, timeout time.Duration) Task {
return &timed{task, timeout}
}
type timed struct {
task Task
timeout time.Duration
}
func (t timed) Execute(addend int) (int, error) {
mutex := sync.Mutex{}
c := make(chan taskResult)
defer close(c)
finished := false
go func() {
val, err := t.task.Execute(addend)
mutex.Lock()
defer mutex.Unlock()
if !finished {
c <- taskResult{val, err}
}
}()
select {
case result := <-c:
return result.val, result.err
case <-time.After(t.timeout):
mutex.Lock()
finished = true
mutex.Unlock()
return 0, fmt.Errorf("Execution timed out after %v", t.timeout)
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &concurrentMapReduce{tasks, reduce}
}
type concurrentMapReduce struct {
tasks []Task
reduce func(results []int) int
}
func (cmr concurrentMapReduce) Execute(addend int) (int, error) {
if len(cmr.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
group := sync.WaitGroup{}
c := make(chan taskResult)
results := make([]int, 0)
executeTask := func(i int) {
val, err := cmr.tasks[i].Execute(addend)
c <- taskResult{val, err}
}
for i := range cmr.tasks {
group.Add(1)
go executeTask(i)
}
go func() {
group.Wait()
close(c)
}()
for result := range c {
val, err := result.val, result.err
if err == nil {
results = append(results, val)
} else {
return 0, err
}
group.Done()
}
return cmr.reduce(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &greatestSearcher{tasks, errorLimit}
}
type greatestSearcher struct {
tasks <-chan Task
errorLimit int
}
func (g greatestSearcher) Execute(addend int) (int, error) {
errorCount := 0
max := math.MinInt64
hasSuccessfulTask, hasFailedTask := false, false
group := sync.WaitGroup{}
c := concurentHelper(g.tasks, &group, addend)
for taskResult := range c {
val, err := taskResult.val, taskResult.err
if err == nil {
hasSuccessfulTask = true
if val > max {
max = val
}
} else {
hasFailedTask = true
errorCount++
if errorCount > g.errorLimit {
return 0, fmt.Errorf("There are [%v] errors, but the limit is [%v]", errorCount, g.errorLimit)
}
}
group.Done()
}
if !hasFailedTask && !hasSuccessfulTask {
return 0, fmt.Errorf("No task were given")
} else if !hasSuccessfulTask {
return 0, fmt.Errorf("No successful tasks were executed")
}
return max, nil
}
func concurentHelper(tasks <-chan Task, group *sync.WaitGroup, addend int) <-chan taskResult {
c := make(chan taskResult)
go func() {
defer close(c)
defer group.Wait()
for task := range tasks {
group.Add(1)
go func(task Task) {
val, err := task.Execute(addend)
c <- taskResult{val, err}
}(task)
}
}()
return c
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.003s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.003s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.003s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.103s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.204s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.134s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.203s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.003s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.003s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.003s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.049s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.048s
PASS
ok  	_/tmp/d20161129-30451-tw5nz	0.123s

История (4 версии и 3 коментара)

Николай обнови решението на 29.11.2016 02:12 (преди над 1 година)

+package main
+
+import (
+ "fmt"
+ "math"
+ "sync"
+ "time"
+)
+
+type Task interface {
+ Execute(int) (int, error)
+}
+
+func Pipeline(tasks ...Task) Task {
+ return &pipeline{tasks}
+}
+
+type pipeline struct {
+ tasks []Task
+}
+
+func (p pipeline) Execute(addend int) (int, error) {
+ var err error
+ if len(p.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given")
+ }
+ val := addend
+ for _, task := range p.tasks {
+ if val, err = task.Execute(val); err != nil {
+ return 0, err
+ }
+ }
+ return val, nil
+}
+
+func Fastest(tasks ...Task) Task {
+ return &fastest{tasks}
+}
+
+type fastest struct {
+ tasks []Task
+}
+
+type taskResult struct {
+ val int
+ err error
+}
+
+func (f fastest) Execute(addend int) (int, error) {
+ if len(f.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given")
+ }
+ c := make(chan taskResult)
+ executeTask := func(i int) {
+ val, err := f.tasks[i].Execute(addend)
+ c <- taskResult{val, err}
+ }
+ for i := range f.tasks {
+ go executeTask(i)
+ }
+ fastestTaskResult := <-c
+ return fastestTaskResult.val, fastestTaskResult.err
+}
+
+func Timed(task Task, timeout time.Duration) Task {
+ return &timed{task, timeout}
+}
+
+type timed struct {
+ task Task
+ timeout time.Duration
+}
+
+func (t timed) Execute(addend int) (int, error) {
+ c := make(chan taskResult)
+ go func() {
+ val, err := t.task.Execute(addend)
+ c <- taskResult{val, err}
+ }()
+ select {
+ case result := <-c:
+ close(c)
+ return result.val, result.err
+ case <-time.After(t.timeout):
+ return 0, fmt.Errorf("Execution timed out after %v", t.timeout)
+ }
+}
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return &concurrentMapReduce{tasks, reduce}
+}
+
+type concurrentMapReduce struct {
+ tasks []Task
+ reduce func(results []int) int
+}
+
+func (cmr concurrentMapReduce) Execute(addend int) (int, error) {
+ if len(cmr.tasks) == 0 {
+ return 0, fmt.Errorf("No tasks given")
+ }
+ group := sync.WaitGroup{}
+ c := make(chan taskResult)
+ results := make([]int, 0)
+
+ executeTask := func(i int) {
+ val, err := cmr.tasks[i].Execute(addend)
+ c <- taskResult{val, err}
+ }
+ for i := range cmr.tasks {
+ group.Add(1)
+ go executeTask(i)
+ }
+ go func() {
+ group.Wait()
+ close(c)
+ }()
+ for result := range c {
+ val, err := result.val, result.err
+ if err == nil {
+ results = append(results, val)
+ } else {
+ return 0, err
+ }
+ group.Done()
+ }
+ return cmr.reduce(results), nil
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return &greatestSearcher{tasks, errorLimit}
+}
+
+type greatestSearcher struct {
+ tasks <-chan Task
+ errorLimit int
+}
+
+func (g greatestSearcher) Execute(addend int) (int, error) {
+ errorCount := 0
+ max := math.MinInt64
+ hasSuccessfulTask, hasFailedTask := false, false
+ group := sync.WaitGroup{}
+
+ c := concurentHelper(g.tasks, &group, addend)
+
+ for taskResult := range c {
+ val, err := taskResult.val, taskResult.err
+ if err == nil {
+ hasSuccessfulTask = true
+ if val > max {
+ max = val
+ }
+ } else {
+ hasFailedTask = true
+ errorCount++
+ if errorCount > g.errorLimit {
+ return 0, fmt.Errorf("There are [%v] errors, but the limit is [%v]", errorCount, g.errorLimit)
+ }
+ }
+ group.Done()
+ }
+ if !hasFailedTask && !hasSuccessfulTask {
+ return 0, fmt.Errorf("No task were given")
+ } else if !hasSuccessfulTask {
+ return 0, fmt.Errorf("No successful tasks were executed")
+ }
+ return max, nil
+}
+
+func concurentHelper(tasks <-chan Task, group *sync.WaitGroup, addend int) <-chan taskResult {
+ c := make(chan taskResult)
+ go func() {
+ defer close(c)
+ defer group.Wait()
+ for task := range tasks {
+ group.Add(1)
+ go func(task Task) {
+ val, err := task.Execute(addend)
+ c <- taskResult{val, err}
+ }(task)
+ }
+ }()
+ return c
+}

Николай обнови решението на 29.11.2016 02:36 (преди над 1 година)

+// Package main is solution by Nikolay Genov of the third problem
+// in the course for the Go Programming Language in FMI, Sofia University
+// For more information about the current problem, visit this link
+// https://github.com/fmi/go-homework/tree/master/tasks/03
package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
func Pipeline(tasks ...Task) Task {
return &pipeline{tasks}
}
type pipeline struct {
tasks []Task
}
func (p pipeline) Execute(addend int) (int, error) {
var err error
if len(p.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
val := addend
for _, task := range p.tasks {
if val, err = task.Execute(val); err != nil {
return 0, err
}
}
return val, nil
}
func Fastest(tasks ...Task) Task {
return &fastest{tasks}
}
type fastest struct {
tasks []Task
}
type taskResult struct {
val int
err error
}
func (f fastest) Execute(addend int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
c := make(chan taskResult)
executeTask := func(i int) {
val, err := f.tasks[i].Execute(addend)
c <- taskResult{val, err}
}
for i := range f.tasks {
go executeTask(i)
}
fastestTaskResult := <-c
return fastestTaskResult.val, fastestTaskResult.err
}
func Timed(task Task, timeout time.Duration) Task {
return &timed{task, timeout}
}
type timed struct {
task Task
timeout time.Duration
}
func (t timed) Execute(addend int) (int, error) {
c := make(chan taskResult)
go func() {
val, err := t.task.Execute(addend)
c <- taskResult{val, err}
}()
select {
case result := <-c:
close(c)
return result.val, result.err
case <-time.After(t.timeout):
return 0, fmt.Errorf("Execution timed out after %v", t.timeout)
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &concurrentMapReduce{tasks, reduce}
}
type concurrentMapReduce struct {
tasks []Task
reduce func(results []int) int
}
func (cmr concurrentMapReduce) Execute(addend int) (int, error) {
if len(cmr.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
group := sync.WaitGroup{}
c := make(chan taskResult)
results := make([]int, 0)
executeTask := func(i int) {
val, err := cmr.tasks[i].Execute(addend)
c <- taskResult{val, err}
}
for i := range cmr.tasks {
group.Add(1)
go executeTask(i)
}
go func() {
group.Wait()
close(c)
}()
for result := range c {
val, err := result.val, result.err
if err == nil {
results = append(results, val)
} else {
return 0, err
}
group.Done()
}
return cmr.reduce(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &greatestSearcher{tasks, errorLimit}
}
type greatestSearcher struct {
tasks <-chan Task
errorLimit int
}
func (g greatestSearcher) Execute(addend int) (int, error) {
errorCount := 0
max := math.MinInt64
hasSuccessfulTask, hasFailedTask := false, false
group := sync.WaitGroup{}
c := concurentHelper(g.tasks, &group, addend)
for taskResult := range c {
val, err := taskResult.val, taskResult.err
if err == nil {
hasSuccessfulTask = true
if val > max {
max = val
}
} else {
hasFailedTask = true
errorCount++
if errorCount > g.errorLimit {
return 0, fmt.Errorf("There are [%v] errors, but the limit is [%v]", errorCount, g.errorLimit)
}
}
group.Done()
}
if !hasFailedTask && !hasSuccessfulTask {
return 0, fmt.Errorf("No task were given")
} else if !hasSuccessfulTask {
return 0, fmt.Errorf("No successful tasks were executed")
}
return max, nil
}
func concurentHelper(tasks <-chan Task, group *sync.WaitGroup, addend int) <-chan taskResult {
c := make(chan taskResult)
go func() {
defer close(c)
defer group.Wait()
for task := range tasks {
group.Add(1)
go func(task Task) {
val, err := task.Execute(addend)
c <- taskResult{val, err}
}(task)
}
}()
return c
}

Николай обнови решението на 29.11.2016 09:04 (преди над 1 година)

// Package main is solution by Nikolay Genov of the third problem
// in the course for the Go Programming Language in FMI, Sofia University
// For more information about the current problem, visit this link
// https://github.com/fmi/go-homework/tree/master/tasks/03
package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
func Pipeline(tasks ...Task) Task {
return &pipeline{tasks}
}
type pipeline struct {
tasks []Task
}
func (p pipeline) Execute(addend int) (int, error) {
var err error
if len(p.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
val := addend
for _, task := range p.tasks {
if val, err = task.Execute(val); err != nil {
return 0, err
}
}
return val, nil
}
func Fastest(tasks ...Task) Task {
return &fastest{tasks}
}
type fastest struct {
tasks []Task
}
type taskResult struct {
val int
err error
}
func (f fastest) Execute(addend int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
+ var once sync.Once
c := make(chan taskResult)
executeTask := func(i int) {
val, err := f.tasks[i].Execute(addend)
- c <- taskResult{val, err}
+ once.Do(func() { c <- taskResult{val, err} })
+
}
for i := range f.tasks {
go executeTask(i)
}
fastestTaskResult := <-c
return fastestTaskResult.val, fastestTaskResult.err
}
func Timed(task Task, timeout time.Duration) Task {
return &timed{task, timeout}
}
type timed struct {
task Task
timeout time.Duration
}
func (t timed) Execute(addend int) (int, error) {
c := make(chan taskResult)
go func() {
val, err := t.task.Execute(addend)
c <- taskResult{val, err}
}()
select {
case result := <-c:
close(c)
return result.val, result.err
case <-time.After(t.timeout):
return 0, fmt.Errorf("Execution timed out after %v", t.timeout)
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &concurrentMapReduce{tasks, reduce}
}
type concurrentMapReduce struct {
tasks []Task
reduce func(results []int) int
}
func (cmr concurrentMapReduce) Execute(addend int) (int, error) {
if len(cmr.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
group := sync.WaitGroup{}
c := make(chan taskResult)
results := make([]int, 0)
executeTask := func(i int) {
val, err := cmr.tasks[i].Execute(addend)
c <- taskResult{val, err}
}
for i := range cmr.tasks {
group.Add(1)
go executeTask(i)
}
go func() {
group.Wait()
close(c)
}()
for result := range c {
val, err := result.val, result.err
if err == nil {
results = append(results, val)
} else {
return 0, err
}
group.Done()
}
return cmr.reduce(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &greatestSearcher{tasks, errorLimit}
}
type greatestSearcher struct {
tasks <-chan Task
errorLimit int
}
func (g greatestSearcher) Execute(addend int) (int, error) {
errorCount := 0
max := math.MinInt64
hasSuccessfulTask, hasFailedTask := false, false
group := sync.WaitGroup{}
c := concurentHelper(g.tasks, &group, addend)
for taskResult := range c {
val, err := taskResult.val, taskResult.err
if err == nil {
hasSuccessfulTask = true
if val > max {
max = val
}
} else {
hasFailedTask = true
errorCount++
if errorCount > g.errorLimit {
return 0, fmt.Errorf("There are [%v] errors, but the limit is [%v]", errorCount, g.errorLimit)
}
}
group.Done()
}
if !hasFailedTask && !hasSuccessfulTask {
return 0, fmt.Errorf("No task were given")
} else if !hasSuccessfulTask {
return 0, fmt.Errorf("No successful tasks were executed")
}
return max, nil
}
func concurentHelper(tasks <-chan Task, group *sync.WaitGroup, addend int) <-chan taskResult {
c := make(chan taskResult)
go func() {
defer close(c)
defer group.Wait()
for task := range tasks {
group.Add(1)
go func(task Task) {
val, err := task.Execute(addend)
c <- taskResult{val, err}
}(task)
}
}()
return c
}

Николай обнови решението на 29.11.2016 09:27 (преди над 1 година)

// Package main is solution by Nikolay Genov of the third problem
// in the course for the Go Programming Language in FMI, Sofia University
// For more information about the current problem, visit this link
// https://github.com/fmi/go-homework/tree/master/tasks/03
package main
import (
"fmt"
"math"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
func Pipeline(tasks ...Task) Task {
return &pipeline{tasks}
}
type pipeline struct {
tasks []Task
}
func (p pipeline) Execute(addend int) (int, error) {
var err error
if len(p.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
val := addend
for _, task := range p.tasks {
if val, err = task.Execute(val); err != nil {
return 0, err
}
}
return val, nil
}
func Fastest(tasks ...Task) Task {
return &fastest{tasks}
}
type fastest struct {
tasks []Task
}
type taskResult struct {
val int
err error
}
func (f fastest) Execute(addend int) (int, error) {
if len(f.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
var once sync.Once
c := make(chan taskResult)
executeTask := func(i int) {
val, err := f.tasks[i].Execute(addend)
once.Do(func() { c <- taskResult{val, err} })
}
for i := range f.tasks {
go executeTask(i)
}
fastestTaskResult := <-c
return fastestTaskResult.val, fastestTaskResult.err
}
func Timed(task Task, timeout time.Duration) Task {
return &timed{task, timeout}
}
type timed struct {
task Task
timeout time.Duration
}
func (t timed) Execute(addend int) (int, error) {
+ mutex := sync.Mutex{}
c := make(chan taskResult)
+ defer close(c)
+ finished := false
go func() {
val, err := t.task.Execute(addend)
- c <- taskResult{val, err}
+ mutex.Lock()
+ defer mutex.Unlock()
+ if !finished {
+ c <- taskResult{val, err}
+ }
}()
select {
case result := <-c:
- close(c)
return result.val, result.err
case <-time.After(t.timeout):
+ mutex.Lock()
+ finished = true
+ mutex.Unlock()
return 0, fmt.Errorf("Execution timed out after %v", t.timeout)
}
}
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return &concurrentMapReduce{tasks, reduce}
}
type concurrentMapReduce struct {
tasks []Task
reduce func(results []int) int
}
func (cmr concurrentMapReduce) Execute(addend int) (int, error) {
if len(cmr.tasks) == 0 {
return 0, fmt.Errorf("No tasks given")
}
group := sync.WaitGroup{}
c := make(chan taskResult)
results := make([]int, 0)
executeTask := func(i int) {
val, err := cmr.tasks[i].Execute(addend)
c <- taskResult{val, err}
}
for i := range cmr.tasks {
group.Add(1)
go executeTask(i)
}
go func() {
group.Wait()
close(c)
}()
for result := range c {
val, err := result.val, result.err
if err == nil {
results = append(results, val)
} else {
return 0, err
}
group.Done()
}
return cmr.reduce(results), nil
}
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return &greatestSearcher{tasks, errorLimit}
}
type greatestSearcher struct {
tasks <-chan Task
errorLimit int
}
func (g greatestSearcher) Execute(addend int) (int, error) {
errorCount := 0
max := math.MinInt64
hasSuccessfulTask, hasFailedTask := false, false
group := sync.WaitGroup{}
c := concurentHelper(g.tasks, &group, addend)
for taskResult := range c {
val, err := taskResult.val, taskResult.err
if err == nil {
hasSuccessfulTask = true
if val > max {
max = val
}
} else {
hasFailedTask = true
errorCount++
if errorCount > g.errorLimit {
return 0, fmt.Errorf("There are [%v] errors, but the limit is [%v]", errorCount, g.errorLimit)
}
}
group.Done()
}
if !hasFailedTask && !hasSuccessfulTask {
return 0, fmt.Errorf("No task were given")
} else if !hasSuccessfulTask {
return 0, fmt.Errorf("No successful tasks were executed")
}
return max, nil
}
func concurentHelper(tasks <-chan Task, group *sync.WaitGroup, addend int) <-chan taskResult {
c := make(chan taskResult)
go func() {
defer close(c)
defer group.Wait()
for task := range tasks {
group.Add(1)
go func(task Task) {
val, err := task.Execute(addend)
c <- taskResult{val, err}
}(task)
}
}()
return c
}