Решение на Concurrent Tasks от Николай Бабулков

Обратно към всички решения

Към профила на Николай Бабулков

Резултати

  • 12 точки от тестове
  • 0 бонус точки
  • 12 точки общо
  • 12 успешни тест(а)
  • 1 неуспешни тест(а)

Код

package main
import (
"errors"
"fmt"
"sync"
"time"
)
type Task interface {
Execute(int) (int, error)
}
type Result struct {
res int
err error
}
//----------------------------------------------------------------------------
func Pipeline(tasks ...Task) Task {
return Pipeliner{tasks}
}
type Pipeliner struct {
tasks []Task
}
func (p Pipeliner) Execute(arg int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("No tasks given!")
}
for _, task := range p.tasks {
if res, err := task.Execute(arg); err == nil {
arg = res
} else {
return res, err
}
}
return arg, nil
}
//----------------------------------------------------------------------------
func Fastest(tasks ...Task) Task {
return Timer{tasks}
}
type Timer struct {
tasks []Task
}
func (t Timer) Execute(arg int) (int, error) {
if len(t.tasks) == 0 {
return 0, errors.New("No tasks given!")
}
var once sync.Once
fastResChan := make(chan Result)
for _, task := range t.tasks {
go func(task Task) {
res, err := task.Execute(arg)
once.Do(func() {
fastResChan <- Result{res, err}
})
}(task)
}
result := <-fastResChan
return result.res, result.err
}
//----------------------------------------------------------------------------
func Timed(task Task, timeout time.Duration) Task {
return Limiter{task, timeout}
}
type Limiter struct {
task Task
limit time.Duration
}
func (l Limiter) Execute(arg int) (int, error) {
resultChan, errChan := make(chan Result), make(chan struct{}, 1)
errChan <- struct{}{}
go func() {
res, err := l.task.Execute(arg)
select {
case <-errChan:
resultChan <- Result{res, err}
default:
return
}
}()
select {
case result := <-resultChan:
return result.res, result.err
case <-time.After(l.limit):
<-errChan
return 0, fmt.Errorf("Task didn't finish for %s!", l.limit)
}
}
//----------------------------------------------------------------------------
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return MapReducer{reduce, tasks}
}
type MapReducer struct {
reduce func(results []int) int
tasks []Task
}
func (mr MapReducer) Execute(arg int) (int, error) {
if len(mr.tasks) == 0 {
return 0, errors.New("No tasks given!")
}
results := make([]int, len(mr.tasks))
resultChan := make(chan Result)
errChan := make(chan struct{}, 1)
errChan <- struct{}{}
var taskWg sync.WaitGroup
for ind, task := range mr.tasks {
taskWg.Add(1)
go func(ind int, task Task) {
res, err := task.Execute(arg)
if err == nil {
results[ind] = res
} else {
select {
case <-errChan:
resultChan <- Result{0, fmt.Errorf("Error in task %d", ind)}
default:
}
}
taskWg.Done()
}(ind, task)
}
go func() {
taskWg.Wait()
select {
case <-errChan:
resultChan <- Result{mr.reduce(results), nil}
default:
}
}()
result := <-resultChan
return result.res, result.err
}
//----------------------------------------------------------------------------
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return AsyncExecuter{errorLimit, tasks}
}
type AsyncExecuter struct {
errorLimit int
tasks <-chan Task
}
func setMaxSync(val int, pMaxSync *int, m *sync.RWMutex) {
m.RLock()
max := pMaxSync
m.RUnlock()
if max == nil || *max < val {
m.Lock()
pMaxSync = &val
m.Unlock()
}
}
func incrementSync(val *int, m *sync.RWMutex) {
m.Lock()
*val++
m.Unlock()
}
func (ae AsyncExecuter) Execute(arg int) (int, error) {
var (
taskWg sync.WaitGroup
pMaxSync *int = nil
maxMutex sync.RWMutex
errorCountSync int = 0
errCntMutex sync.RWMutex
)
taskCnt := 0
for task := range ae.tasks {
taskWg.Add(1)
taskCnt++
go func(task Task) {
res, err := task.Execute(arg)
if err == nil {
setMaxSync(res, pMaxSync, &maxMutex)
} else {
incrementSync(&errorCountSync, &errCntMutex)
}
taskWg.Done()
}(task)
}
taskWg.Wait()
maxMutex.RLock()
errCntMutex.RLock()
pMax, errorCnt := pMaxSync, errorCountSync
maxMutex.RUnlock()
errCntMutex.RUnlock()
max := 42
if pMax != nil {
max = *pMax
}
var err error = nil
if taskCnt == 0 {
err = fmt.Errorf("No tasks given!")
} else if taskCnt <= errorCnt {
err = fmt.Errorf("All tasks failed!")
} else if ae.errorLimit < errorCnt {
err = fmt.Errorf("The error limit of %d was exceeded!", ae.errorLimit)
}
return max, err
}

Лог от изпълнението

PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.103s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.203s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.134s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.204s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.003s
--- FAIL: TestGreatestSearcherSimple (0.00s)
	solution_test.go:289: Received result 42 when expecting 2
FAIL
exit status 1
FAIL	_/tmp/d20161129-30451-1rk4wcv	0.003s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.048s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.048s
PASS
ok  	_/tmp/d20161129-30451-1rk4wcv	0.123s

История (2 версии и 1 коментар)

Николай обнови решението на 26.11.2016 16:38 (преди над 1 година)

+package main
+
+import (
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+ "unsafe"
+)
+
+//
+// Help functions and types
+//
+type Task interface {
+ Execute(int) (int, error)
+}
+
+type Result struct {
+ res int
+ err error
+}
+
+type SyncInt struct {
+ sync.RWMutex
+ num int
+}
+
+func CreateSyncInt(start int) SyncInt {
+ return SyncInt{num: start}
+}
+
+func (ai *SyncInt) Increment() {
+ ai.Lock()
+ ai.num++
+ ai.Unlock()
+}
+
+func (ai *SyncInt) SetMax(val int) {
+ if val > ai.Get() {
+ ai.Lock()
+ ai.num = val
+ ai.Unlock()
+ }
+}
+
+func (ai *SyncInt) Get() (ret int) {
+ ai.RLock()
+ ret = ai.num
+ ai.RUnlock()
+ return
+}
+
+//----------------------------------------------------------------------------
+
+func Pipeline(tasks ...Task) Task {
+ return Pipeliner{tasks}
+}
+
+type Pipeliner struct {
+ tasks []Task
+}
+
+func (p Pipeliner) Execute(arg int) (int, error) {
+ if len(p.tasks) == 0 {
+ return 0, errors.New("No tasks given!")
+ }
+
+ finChan := make(chan Result)
+ prevChan, nextChan := finChan, finChan
+
+ for i := len(p.tasks) - 1; i >= 0; i-- {
+ nextChan = make(chan Result)
+ go chainTask(prevChan, nextChan, p.tasks[i])
+ prevChan = nextChan
+ }
+
+ go func(feedChan chan Result) {
+ feedChan <- Result{arg, nil}
+ }(nextChan)
+
+ result := <-finChan
+ return result.res, result.err
+}
+
+func chainTask(prevChan, nextChan chan Result, task Task) {
+ result := <-nextChan
+
+ if result.err != nil {
+ prevChan <- result
+ } else {
+ res, err := task.Execute(result.res)
+ prevChan <- Result{res, err}
+ }
+}
+
+//----------------------------------------------------------------------------
+
+func Fastest(tasks ...Task) Task {
+ return Timer{tasks}
+}
+
+type Timer struct {
+ tasks []Task
+}
+
+func (t Timer) Execute(arg int) (int, error) {
+ if len(t.tasks) == 0 {
+ return 0, errors.New("No tasks given!")
+ }
+
+ var once sync.Once
+ fastResChan := make(chan Result)
+
+ for _, task := range t.tasks {
+ go func(task Task) {
+ res, err := task.Execute(arg)
+ once.Do(func() {
+ fastResChan <- Result{res, err}
+ })
+ }(task)
+ }
+
+ result := <-fastResChan
+ return result.res, result.err
+}
+
+//----------------------------------------------------------------------------
+
+func Timed(task Task, timeout time.Duration) Task {
+ return Limiter{task, timeout}
+}
+
+type Limiter struct {
+ task Task
+ limit time.Duration
+}
+
+func (l Limiter) Execute(arg int) (int, error) {
+ resultChan := make(chan Result)
+
+ go func() {
+ res, err := l.task.Execute(arg)
+ resultChan <- Result{res, err}
+ }()
+
+ select {
+ case result := <-resultChan:
+ return result.res, result.err
+ case <-time.After(l.limit):
+ return 0, fmt.Errorf("Task didn't finish for %s!", l.limit)
+ }
+}
+
+//----------------------------------------------------------------------------
+
+func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
+ return MapReducer{reduce, tasks}
+}
+
+type MapReducer struct {
+ reduce func(results []int) int
+ tasks []Task
+}
+
+func (mr MapReducer) Execute(arg int) (int, error) {
+ if len(mr.tasks) == 0 {
+ return 0, errors.New("No tasks given!")
+ }
+
+ results := make([]int, len(mr.tasks))
+ resultChan := make(chan Result)
+ errCnt := CreateSyncInt(0)
+
+ var taskWg sync.WaitGroup
+ for ind, task := range mr.tasks {
+ taskWg.Add(1)
+ go func(ind int, task Task) {
+ res, err := task.Execute(arg)
+ if err == nil {
+ results[ind] = res
+ } else {
+ resultChan <- Result{0, fmt.Errorf("Error in task %d", ind)}
+ if errCnt.Get() == 0 {
+ errCnt.Increment()
+ }
+ }
+ taskWg.Done()
+ }(ind, task)
+ }
+
+ go func() {
+ taskWg.Wait()
+ if errCnt.Get() == 0 {
+ resultChan <- Result{mr.reduce(results), nil}
+ }
+ }()
+
+ result := <-resultChan
+ return result.res, result.err
+}
+
+//----------------------------------------------------------------------------
+
+func MinInt() int {
+ // 'i' is used to take the size of an int for the current build
+ var i int
+ return -(1 << ((uint(unsafe.Sizeof(i)) * 8) - 1))
+}
+
+func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
+ return AsyncExecuter{errorLimit, tasks}
+}
+
+type AsyncExecuter struct {
+ errorLimit int
+ tasks <-chan Task
+}
+
+func (ae AsyncExecuter) Execute(arg int) (int, error) {
+ var taskWg sync.WaitGroup
+ errorCountSync, maxSync := CreateSyncInt(0), CreateSyncInt(MinInt())
+
+ taskCnt := 0
+ for task := range ae.tasks {
+ taskWg.Add(1)
+ taskCnt++
+ go func(task Task) {
+ res, err := task.Execute(arg)
+ if err == nil {
+ maxSync.SetMax(res)
+ } else {
+ errorCountSync.Increment()
+ }
+ taskWg.Done()
+ }(task)
+ }
+
+ taskWg.Wait()
+ max, errorCnt := maxSync.Get(), errorCountSync.Get()
+ var err error = nil
+ if taskCnt == 0 {
+ err = fmt.Errorf("No tasks given!")
+ } else if taskCnt <= errorCnt {
+ err = fmt.Errorf("All tasks failed!")
+ } else if ae.errorLimit < errorCnt {
+ err = fmt.Errorf("The error limit of %d was exceeded!", ae.errorLimit)
+ }
+ return max, err
+}
  • Виж пак условието за Pipeline(), реално всичките тези горутини и канали не са ли лек overkill там?
  • Какво ще стане с горутината, която пускаш в Execute() на Timed(), ако все пак се стигне до timeout? Или в тези, които пускаш в ConcurrentMapReduce(), след като си прочел един резултат от resultChan и си го върнал?
  • Вместо MinIntunsafe!), помисли за по-добър начин да укажеш "нищо" или "няма стойност". Или разгледай math пакета от стандартната библиотека и ползвай MinInt нещата от там :)

Николай обнови решението на 27.11.2016 14:23 (преди над 1 година)

package main
import (
"errors"
"fmt"
"sync"
"time"
- "unsafe"
)
-//
-// Help functions and types
-//
type Task interface {
Execute(int) (int, error)
}
type Result struct {
res int
err error
}
-type SyncInt struct {
- sync.RWMutex
- num int
-}
-
-func CreateSyncInt(start int) SyncInt {
- return SyncInt{num: start}
-}
-
-func (ai *SyncInt) Increment() {
- ai.Lock()
- ai.num++
- ai.Unlock()
-}
-
-func (ai *SyncInt) SetMax(val int) {
- if val > ai.Get() {
- ai.Lock()
- ai.num = val
- ai.Unlock()
- }
-}
-
-func (ai *SyncInt) Get() (ret int) {
- ai.RLock()
- ret = ai.num
- ai.RUnlock()
- return
-}
-
//----------------------------------------------------------------------------
func Pipeline(tasks ...Task) Task {
return Pipeliner{tasks}
}
type Pipeliner struct {
tasks []Task
}
func (p Pipeliner) Execute(arg int) (int, error) {
if len(p.tasks) == 0 {
return 0, errors.New("No tasks given!")
}
- finChan := make(chan Result)
- prevChan, nextChan := finChan, finChan
-
- for i := len(p.tasks) - 1; i >= 0; i-- {
- nextChan = make(chan Result)
- go chainTask(prevChan, nextChan, p.tasks[i])
- prevChan = nextChan
+ for _, task := range p.tasks {
+ if res, err := task.Execute(arg); err == nil {
+ arg = res
+ } else {
+ return res, err
+ }
}
- go func(feedChan chan Result) {
- feedChan <- Result{arg, nil}
- }(nextChan)
-
- result := <-finChan
- return result.res, result.err
+ return arg, nil
}
-func chainTask(prevChan, nextChan chan Result, task Task) {
- result := <-nextChan
-
- if result.err != nil {
- prevChan <- result
- } else {
- res, err := task.Execute(result.res)
- prevChan <- Result{res, err}
- }
-}
-
//----------------------------------------------------------------------------
func Fastest(tasks ...Task) Task {
return Timer{tasks}
}
type Timer struct {
tasks []Task
}
func (t Timer) Execute(arg int) (int, error) {
if len(t.tasks) == 0 {
return 0, errors.New("No tasks given!")
}
var once sync.Once
fastResChan := make(chan Result)
for _, task := range t.tasks {
go func(task Task) {
res, err := task.Execute(arg)
once.Do(func() {
fastResChan <- Result{res, err}
})
}(task)
}
result := <-fastResChan
return result.res, result.err
}
//----------------------------------------------------------------------------
func Timed(task Task, timeout time.Duration) Task {
return Limiter{task, timeout}
}
type Limiter struct {
task Task
limit time.Duration
}
func (l Limiter) Execute(arg int) (int, error) {
- resultChan := make(chan Result)
+ resultChan, errChan := make(chan Result), make(chan struct{}, 1)
+ errChan <- struct{}{}
go func() {
res, err := l.task.Execute(arg)
- resultChan <- Result{res, err}
+
+ select {
+ case <-errChan:
+ resultChan <- Result{res, err}
+ default:
+ return
+ }
}()
select {
case result := <-resultChan:
return result.res, result.err
case <-time.After(l.limit):
+ <-errChan
return 0, fmt.Errorf("Task didn't finish for %s!", l.limit)
}
}
//----------------------------------------------------------------------------
func ConcurrentMapReduce(reduce func(results []int) int, tasks ...Task) Task {
return MapReducer{reduce, tasks}
}
type MapReducer struct {
reduce func(results []int) int
tasks []Task
}
func (mr MapReducer) Execute(arg int) (int, error) {
if len(mr.tasks) == 0 {
return 0, errors.New("No tasks given!")
}
results := make([]int, len(mr.tasks))
resultChan := make(chan Result)
- errCnt := CreateSyncInt(0)
+ errChan := make(chan struct{}, 1)
+ errChan <- struct{}{}
var taskWg sync.WaitGroup
for ind, task := range mr.tasks {
taskWg.Add(1)
go func(ind int, task Task) {
res, err := task.Execute(arg)
if err == nil {
results[ind] = res
} else {
- resultChan <- Result{0, fmt.Errorf("Error in task %d", ind)}
- if errCnt.Get() == 0 {
- errCnt.Increment()
+ select {
+ case <-errChan:
+ resultChan <- Result{0, fmt.Errorf("Error in task %d", ind)}
+ default:
}
}
taskWg.Done()
}(ind, task)
}
go func() {
taskWg.Wait()
- if errCnt.Get() == 0 {
+ select {
+ case <-errChan:
resultChan <- Result{mr.reduce(results), nil}
+ default:
}
}()
result := <-resultChan
return result.res, result.err
}
//----------------------------------------------------------------------------
-func MinInt() int {
- // 'i' is used to take the size of an int for the current build
- var i int
- return -(1 << ((uint(unsafe.Sizeof(i)) * 8) - 1))
-}
-
func GreatestSearcher(errorLimit int, tasks <-chan Task) Task {
return AsyncExecuter{errorLimit, tasks}
}
type AsyncExecuter struct {
errorLimit int
tasks <-chan Task
}
+func setMaxSync(val int, pMaxSync *int, m *sync.RWMutex) {
+ m.RLock()
+ max := pMaxSync
+ m.RUnlock()
+
+ if max == nil || *max < val {
+ m.Lock()
+ pMaxSync = &val
+ m.Unlock()
+ }
+}
+
+func incrementSync(val *int, m *sync.RWMutex) {
+ m.Lock()
+ *val++
+ m.Unlock()
+}
+
func (ae AsyncExecuter) Execute(arg int) (int, error) {
- var taskWg sync.WaitGroup
- errorCountSync, maxSync := CreateSyncInt(0), CreateSyncInt(MinInt())
+ var (
+ taskWg sync.WaitGroup
+ pMaxSync *int = nil
+ maxMutex sync.RWMutex
+
+ errorCountSync int = 0
+ errCntMutex sync.RWMutex
+ )
+
taskCnt := 0
for task := range ae.tasks {
taskWg.Add(1)
taskCnt++
go func(task Task) {
res, err := task.Execute(arg)
if err == nil {
- maxSync.SetMax(res)
+ setMaxSync(res, pMaxSync, &maxMutex)
} else {
- errorCountSync.Increment()
+ incrementSync(&errorCountSync, &errCntMutex)
}
taskWg.Done()
}(task)
}
taskWg.Wait()
- max, errorCnt := maxSync.Get(), errorCountSync.Get()
+
+ maxMutex.RLock()
+ errCntMutex.RLock()
+ pMax, errorCnt := pMaxSync, errorCountSync
+ maxMutex.RUnlock()
+ errCntMutex.RUnlock()
+
+ max := 42
+ if pMax != nil {
+ max = *pMax
+ }
+
var err error = nil
if taskCnt == 0 {
err = fmt.Errorf("No tasks given!")
} else if taskCnt <= errorCnt {
err = fmt.Errorf("All tasks failed!")
} else if ae.errorLimit < errorCnt {
err = fmt.Errorf("The error limit of %d was exceeded!", ae.errorLimit)
}
return max, err
}