Skip to content

Commit bfab895

Browse files
committed
Add examples/singleapp/cyclic_barrier
1 parent 16d9ec4 commit bfab895

File tree

3 files changed

+222
-0
lines changed

3 files changed

+222
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# https://taskfile.dev
2+
3+
version: '3'
4+
5+
tasks:
6+
default:
7+
cmds:
8+
- go run .
Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"sync"
6+
)
7+
8+
// CyclicBarrier は、指定された数のgoroutineが特定のポイントで待ち合わせることができる同期プリミティブです.
9+
// すべてのgoroutineが到達するか、コンテキストがキャンセルされるまでブロックします.
10+
type (
11+
CyclicBarrier struct {
12+
parties int // 待ち合わせが必要なgoroutineの数
13+
waiting int // 現在待機中のgoroutine数
14+
mutex sync.Mutex // 内部状態を保護するためのmutex
15+
cond *sync.Cond // 条件変数
16+
ctx context.Context
17+
cancel context.CancelFunc
18+
barrier chan struct{} // バリアチャネル
19+
}
20+
)
21+
22+
// NewCyclicBarrier は、新しいCyclicBarrierを作成します.
23+
// partiesには、同期が必要なgoroutineの数を指定します.
24+
func NewCyclicBarrier(parties int) *CyclicBarrier {
25+
if parties <= 0 {
26+
panic("parties must be greater than 0")
27+
}
28+
29+
var (
30+
ctx, cancel = context.WithCancel(context.Background())
31+
barrier = &CyclicBarrier{
32+
parties: parties,
33+
ctx: ctx,
34+
cancel: cancel,
35+
barrier: make(chan struct{}),
36+
}
37+
)
38+
barrier.cond = sync.NewCond(&barrier.mutex)
39+
40+
return barrier
41+
}
42+
43+
// Await は、他のgoroutineが到達するのを待機します.
44+
// すべてのgoroutineが到達すると、バリアが解放され、カウンターがリセットされます.
45+
// コンテキストがキャンセルされた場合はエラーを返します.
46+
func (me *CyclicBarrier) Await() error {
47+
me.mutex.Lock()
48+
defer me.mutex.Unlock()
49+
50+
// コンテキストが既にキャンセルされているかチェック
51+
if me.ctx.Err() != nil {
52+
return me.ctx.Err()
53+
}
54+
55+
var (
56+
generation = me.barrier // 現在の世代を記録(バリア条件が満了した場合、次のチャネルに切り替わるため)
57+
)
58+
me.waiting++
59+
if me.waiting == me.parties {
60+
// 最後のgoroutineが到達
61+
me.waiting = 0
62+
63+
close(me.barrier)
64+
me.barrier = make(chan struct{}) // 新しい世代のためのチャネルを作成
65+
me.cond.Broadcast() // 待機解除
66+
67+
return nil
68+
}
69+
70+
// 他のgoroutineを待つ
71+
for generation == me.barrier && me.ctx.Err() == nil {
72+
me.cond.Wait()
73+
}
74+
75+
if me.ctx.Err() != nil {
76+
return me.ctx.Err()
77+
}
78+
79+
return nil
80+
}
81+
82+
// Reset は、バリアをリセットし、待機中のすべてのgoroutineをキャンセルします.
83+
func (me *CyclicBarrier) Reset() {
84+
me.mutex.Lock()
85+
defer me.mutex.Unlock()
86+
87+
// 現在待機しているgoroutineを解除
88+
me.cancel()
89+
90+
var (
91+
ctx, cancel = context.WithCancel(context.Background())
92+
)
93+
me.ctx = ctx
94+
me.cancel = cancel
95+
me.waiting = 0
96+
97+
// 世代入れ替え
98+
close(me.barrier)
99+
me.barrier = make(chan struct{})
100+
me.cond.Broadcast()
101+
}
102+
103+
// GetNumberWaiting は、現在待機中のgoroutineの数を返します.
104+
func (me *CyclicBarrier) GetNumberWaiting() int {
105+
me.mutex.Lock()
106+
defer me.mutex.Unlock()
107+
108+
return me.waiting
109+
}
110+
111+
// GetParties は、同期に必要なgoroutineの数を返します.
112+
func (me *CyclicBarrier) GetParties() int {
113+
return me.parties
114+
}
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log"
7+
"sync"
8+
"time"
9+
)
10+
11+
const (
12+
MainTimeout = 20 * time.Second
13+
ProcTimeout = 10 * time.Second
14+
)
15+
16+
var (
17+
ErrMainTooSlow = errors.New("(MAIN) TOO SLOW")
18+
ErrProcTooSlow = errors.New("(PROC) TOO SLOW")
19+
)
20+
21+
func init() {
22+
log.SetFlags(log.Ltime)
23+
}
24+
25+
func main() {
26+
var (
27+
rootCtx = context.Background()
28+
mainCtx, mainCxl = context.WithTimeoutCause(rootCtx, MainTimeout, ErrMainTooSlow)
29+
procCtx = run(mainCtx)
30+
err error
31+
)
32+
defer mainCxl()
33+
34+
select {
35+
case <-mainCtx.Done():
36+
err = context.Cause(mainCtx)
37+
case <-procCtx.Done():
38+
if err = context.Cause(procCtx); errors.Is(err, context.Canceled) {
39+
err = nil
40+
}
41+
}
42+
43+
if err != nil {
44+
log.Fatal(err)
45+
}
46+
}
47+
48+
func run(pCtx context.Context) context.Context {
49+
var (
50+
ctx, cxl = context.WithCancelCause(pCtx)
51+
)
52+
53+
go func() {
54+
cxl(proc(ctx))
55+
}()
56+
go func() {
57+
<-time.After(ProcTimeout)
58+
cxl(ErrProcTooSlow)
59+
}()
60+
61+
return ctx
62+
}
63+
64+
func proc(_ context.Context) error {
65+
const (
66+
WORKER_COUNT = 3
67+
)
68+
var (
69+
barrier = NewCyclicBarrier(WORKER_COUNT)
70+
wg sync.WaitGroup
71+
)
72+
73+
// 3つのワーカーを起動し、全員揃ったら先に進むを繰り返す
74+
for i := 0; i < WORKER_COUNT; i++ {
75+
wg.Add(1)
76+
go worker(i+1, &wg, barrier)
77+
}
78+
79+
wg.Wait()
80+
81+
return nil
82+
}
83+
84+
func worker(id int, wg *sync.WaitGroup, barrier *CyclicBarrier) {
85+
defer wg.Done()
86+
87+
for i := 0; i < 3; i++ {
88+
log.Printf("Worker-[%2d] 準備作業 %2d週目", id, i+1)
89+
time.Sleep(time.Duration(id) * time.Second)
90+
91+
log.Printf("Worker-[%2d] 待機開始", id)
92+
{
93+
if err := barrier.Await(); err != nil {
94+
log.Printf("Worker-[%2d] エラー: %v", id, err)
95+
return
96+
}
97+
}
98+
log.Printf("Worker-[%2d] 待機解除", id)
99+
}
100+
}

0 commit comments

Comments
 (0)