Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 45 additions & 11 deletions wpool/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@ package wpool
import (
"context"
"fmt"
"sync"
)

func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results chan<- Result) {
defer wg.Done()
func worker(ctx context.Context, jobs <-chan Job, results chan<- Result, sem semaphore) {
defer sem.release()
for {
select {
case job, ok := <-jobs:
Expand All @@ -26,35 +25,70 @@ func worker(ctx context.Context, wg *sync.WaitGroup, jobs <-chan Job, results ch
}
}

type semaphore interface {
acquire()
release()
wait()
close()
}

type slot struct{}
type slots chan slot

type execution struct {
slots slots
}

func newExecutionSlots(capacity int) execution {
slots := make(chan slot, capacity)
return execution{slots: slots}
}

func (e execution) acquire() {
e.slots <- slot{}
}

func (e execution) release() {
<-e.slots
}

func (e execution) wait() {
for i := 0; i < cap(e.slots); i++ {
e.slots <- slot{}
}
}

func (e execution) close() {
close(e.slots)
}

type WorkerPool struct {
workersCount int
jobs chan Job
results chan Result
Done chan struct{}
}

func New(wcount int) WorkerPool {
return WorkerPool{
workersCount: wcount,
jobs: make(chan Job, wcount),
results: make(chan Result, wcount),
Done: make(chan struct{}),
}
}

func (wp WorkerPool) Run(ctx context.Context) {
var wg sync.WaitGroup
eSlots := newExecutionSlots(wp.workersCount)
defer eSlots.close()

for i := 0; i < wp.workersCount; i++ {
wg.Add(1)
eSlots.acquire()
// fan out worker goroutines
//reading from jobs channel and
//pushing calcs into results channel
go worker(ctx, &wg, wp.jobs, wp.results)
go worker(ctx, wp.jobs, wp.results, eSlots)
}

wg.Wait()
close(wp.Done)
eSlots.wait()
close(wp.results)
}

Expand All @@ -63,7 +97,7 @@ func (wp WorkerPool) Results() <-chan Result {
}

func (wp WorkerPool) GenerateFrom(jobsBulk []Job) {
for i, _ := range jobsBulk {
for i := range jobsBulk {
wp.jobs <- jobsBulk[i]
}
close(wp.jobs)
Expand Down
20 changes: 11 additions & 9 deletions wpool/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func TestWorkerPool(t *testing.T) {
select {
case r, ok := <-wp.Results():
if !ok {
continue
return
}

i, err := strconv.ParseInt(string(r.Descriptor.ID), 10, 64)
Expand All @@ -39,8 +39,6 @@ func TestWorkerPool(t *testing.T) {
if val != int(i)*2 {
t.Fatalf("wrong value %v; expected %v", val, int(i)*2)
}
case <-wp.Done:
return
default:
}
}
Expand All @@ -56,12 +54,14 @@ func TestWorkerPool_TimeOut(t *testing.T) {

for {
select {
case r := <-wp.Results():
case r, ok := <-wp.Results():
if !ok {
return
}

if r.Err != nil && r.Err != context.DeadlineExceeded {
t.Fatalf("expected error: %v; got: %v", context.DeadlineExceeded, r.Err)
}
case <-wp.Done:
return
default:
}
}
Expand All @@ -77,12 +77,14 @@ func TestWorkerPool_Cancel(t *testing.T) {

for {
select {
case r := <-wp.Results():
case r, ok := <-wp.Results():
if !ok {
return
}

if r.Err != nil && r.Err != context.Canceled {
t.Fatalf("expected error: %v; got: %v", context.Canceled, r.Err)
}
case <-wp.Done:
return
default:
}
}
Expand Down