Skip to content

Commit 16d9ec4

Browse files
authored
Merge pull request #870 from devlights/add-gate-example
2 parents f3a0eaa + 35e6051 commit 16d9ec4

File tree

4 files changed

+221
-0
lines changed

4 files changed

+221
-0
lines changed
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
# https://taskfile.dev
2+
3+
version: '3'
4+
5+
tasks:
6+
default:
7+
cmds:
8+
- go run .
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
package main
2+
3+
import (
4+
"sync"
5+
"sync/atomic"
6+
)
7+
8+
// CountdownLatch は、C#のCountdownEventやJavaのCountDownLatchと同様の機能を提供する構造体です.
9+
type CountdownLatch struct {
10+
count atomic.Int32
11+
mutex sync.Mutex
12+
cond *sync.Cond
13+
}
14+
15+
// NewCountdownLatch は、指定されたカウント数でCountdownLatchを初期化します.
16+
func NewCountdownLatch(initialCount int) *CountdownLatch {
17+
if initialCount < 0 {
18+
panic("初期カウントは0以上である必要があります")
19+
}
20+
21+
var (
22+
latch CountdownLatch
23+
)
24+
latch.count.Store(int32(initialCount))
25+
latch.cond = sync.NewCond(&latch.mutex)
26+
27+
return &latch
28+
}
29+
30+
// Signal は、カウントを1減らします.
31+
// 戻り値として、カウントダウンが満了したかどうかを返します.
32+
func (me *CountdownLatch) Signal() bool {
33+
return me.SignalCount(1)
34+
}
35+
36+
// SignalCount は、指定された数だけカウントを減らします.
37+
// 戻り値として、カウントダウンが満了したかどうかを返します.
38+
func (me *CountdownLatch) SignalCount(count int) bool {
39+
if count <= 0 {
40+
return false
41+
}
42+
43+
me.mutex.Lock()
44+
defer me.mutex.Unlock()
45+
46+
newCount := me.count.Add(-int32(count))
47+
if newCount <= 0 {
48+
me.cond.Broadcast()
49+
return true
50+
}
51+
52+
return false
53+
}
54+
55+
// Wait は、カウントが0になるまでブロックします.
56+
func (me *CountdownLatch) Wait() {
57+
me.mutex.Lock()
58+
defer me.mutex.Unlock()
59+
60+
for me.count.Load() > 0 {
61+
me.cond.Wait()
62+
}
63+
}
64+
65+
// CurrentCount は、現在のカウント値を返します.
66+
func (me *CountdownLatch) CurrentCount() int {
67+
me.mutex.Lock()
68+
defer me.mutex.Unlock()
69+
70+
return int(me.count.Load())
71+
}
72+
73+
// Reset は、カウントを指定された値にリセットします.
74+
// リセットすることになるため、強制的にカウント満了したことになり、待機している非同期処理が存在する場合は解除されます.
75+
func (me *CountdownLatch) Reset(count int) {
76+
if count < 0 {
77+
panic("リセットカウントは0以上である必要があります")
78+
}
79+
80+
me.mutex.Lock()
81+
defer me.mutex.Unlock()
82+
83+
me.cond.Broadcast()
84+
85+
me.count.Store(int32(count))
86+
}

examples/singleapp/gate/gate.go

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package main
2+
3+
type (
4+
Gate struct {
5+
latch *CountdownLatch
6+
}
7+
)
8+
9+
func NewGate() *Gate {
10+
var (
11+
latch = NewCountdownLatch(1)
12+
gate = Gate{latch}
13+
)
14+
15+
return &gate
16+
}
17+
18+
func (me *Gate) Await() {
19+
if me.latch.CurrentCount() < 1 {
20+
return
21+
}
22+
23+
me.latch.Wait()
24+
}
25+
26+
func (me *Gate) Open() {
27+
if me.latch.CurrentCount() < 1 {
28+
return
29+
}
30+
31+
me.latch.Signal()
32+
}

examples/singleapp/gate/main.go

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"errors"
6+
"log"
7+
"sync"
8+
"time"
9+
)
10+
11+
const (
12+
MainTimeout = 20 * time.Second
13+
ProcTimeout = 10 * time.Second
14+
)
15+
16+
var (
17+
ErrMainTooSlow = errors.New("(MAIN) TOO SLOW")
18+
ErrProcTooSlow = errors.New("(PROC) TOO SLOW")
19+
)
20+
21+
func init() {
22+
log.SetFlags(0)
23+
}
24+
25+
func main() {
26+
var (
27+
rootCtx = context.Background()
28+
mainCtx, mainCxl = context.WithTimeoutCause(rootCtx, MainTimeout, ErrMainTooSlow)
29+
procCtx = run(mainCtx)
30+
err error
31+
)
32+
defer mainCxl()
33+
34+
select {
35+
case <-mainCtx.Done():
36+
err = context.Cause(mainCtx)
37+
case <-procCtx.Done():
38+
if err = context.Cause(procCtx); errors.Is(err, context.Canceled) {
39+
err = nil
40+
}
41+
}
42+
43+
if err != nil {
44+
log.Fatal(err)
45+
}
46+
}
47+
48+
func run(pCtx context.Context) context.Context {
49+
var (
50+
ctx, cxl = context.WithCancelCause(pCtx)
51+
)
52+
53+
go func() {
54+
cxl(proc(ctx))
55+
}()
56+
go func() {
57+
<-time.After(ProcTimeout)
58+
cxl(ErrProcTooSlow)
59+
}()
60+
61+
return ctx
62+
}
63+
64+
func proc(_ context.Context) error {
65+
var (
66+
gate = NewGate()
67+
wg sync.WaitGroup
68+
)
69+
70+
// 10個のゴルーチンがゲート前に待機する
71+
for i := range 10 {
72+
wg.Add(1)
73+
go func(i int) {
74+
defer wg.Done()
75+
76+
log.Printf("[%2d] 待機開始", i)
77+
gate.Await()
78+
log.Printf("[%2d] 待機解除", i)
79+
}(i)
80+
}
81+
82+
// 何か準備処理などを行っているとする
83+
<-time.After(time.Second)
84+
log.Println("-------------------------------------")
85+
86+
// ゲートを開き、待機解除したゴルーチン達が全完了するのを待つ
87+
gate.Open()
88+
wg.Wait()
89+
90+
// 一度開いたゲートは開きっぱなしになるため、開いた後のAwait呼び出しは即座に返る.
91+
gate.Await()
92+
gate.Await()
93+
94+
return nil
95+
}

0 commit comments

Comments
 (0)