Skip to content

Commit 89e7596

Browse files
authored
Merge pull request #873 from devlights/add-cond-producer-consumer-watcher
2 parents c640cd6 + fdb75d4 commit 89e7596

File tree

3 files changed

+185
-0
lines changed

3 files changed

+185
-0
lines changed

examples/async/cond/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
out.txt

examples/async/cond/Taskfile.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
# https://taskfile.dev
2+
3+
version: '3'
4+
5+
tasks:
6+
default:
7+
cmds:
8+
- go run . -debug=false | tee out.txt
9+
ignore_error: true

examples/async/cond/main.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
package main
2+
3+
import (
4+
"flag"
5+
"log"
6+
"math/rand/v2"
7+
"os"
8+
"os/signal"
9+
"sync"
10+
"time"
11+
)
12+
13+
const (
14+
MAX_ITEM_COUNT = 10
15+
MAX_PRODUCERS = 2
16+
MAX_CONSUMERS = MAX_ITEM_COUNT / 2
17+
WATCH_INTERVAL = 1 * time.Second
18+
)
19+
20+
type (
21+
Args struct {
22+
debug bool
23+
}
24+
)
25+
26+
var (
27+
args Args
28+
)
29+
30+
func init() {
31+
flag.BoolVar(&args.debug, "debug", false, "debug mode")
32+
}
33+
34+
func main() {
35+
//
36+
// Producer-Consumer-Watcher のサンプル
37+
// 各非同期処理のランデブーポイントの制御に
38+
// *sync.Cond を利用。
39+
//
40+
41+
log.SetFlags(log.Lmicroseconds)
42+
log.SetOutput(os.Stdout)
43+
44+
flag.Parse()
45+
46+
if err := run(); err != nil {
47+
log.Fatal(err)
48+
}
49+
}
50+
51+
func run() error {
52+
var (
53+
ch = make(chan int, MAX_ITEM_COUNT*MAX_PRODUCERS)
54+
sig = make(chan os.Signal, 1)
55+
done = make(chan struct{})
56+
)
57+
defer close(ch)
58+
59+
signal.Notify(sig, os.Interrupt)
60+
go func() {
61+
<-sig
62+
log.Printf("<<Interrupt>>")
63+
close(done)
64+
}()
65+
66+
var (
67+
producer = sync.NewCond(&sync.Mutex{})
68+
consumer = sync.NewCond(&sync.Mutex{})
69+
)
70+
71+
// 役割: 生産者と消費者を監視し、必要であれば叩き起こす
72+
go watch(1, ch, producer, consumer)
73+
74+
// 役割: 生産を行う
75+
for i := range MAX_PRODUCERS {
76+
go produce(i+1, ch, (i+1)*10000, producer, consumer)
77+
}
78+
79+
// 役割: 消費を行う
80+
for i := range MAX_CONSUMERS {
81+
go consume(i+1, ch, consumer)
82+
}
83+
84+
// ゴルーチンの終了待機などについては割愛
85+
<-done
86+
log.Printf("<<DONE>>")
87+
88+
return nil
89+
}
90+
91+
func watch(id int, ch <-chan int, producer, consumer *sync.Cond) {
92+
for {
93+
func() {
94+
producer.L.Lock()
95+
defer producer.L.Unlock()
96+
97+
if len(ch) == 0 {
98+
producer.Broadcast()
99+
log.Printf("[W][%02d] >>> 0個です。生産しなさい。", id)
100+
}
101+
}()
102+
103+
<-time.After(WATCH_INTERVAL)
104+
105+
func() {
106+
consumer.L.Lock()
107+
defer consumer.L.Unlock()
108+
109+
if len(ch) != 0 {
110+
consumer.Broadcast()
111+
log.Printf("[W][%02d] >>> 生産されています。消費しなさい。", id)
112+
}
113+
}()
114+
}
115+
}
116+
117+
func produce(id int, ch chan<- int, start int, producer, consumer *sync.Cond) {
118+
var (
119+
count int
120+
)
121+
for i := start; ; {
122+
func() {
123+
producer.L.Lock()
124+
defer producer.L.Unlock()
125+
126+
for len(ch) > cap(ch)/2 {
127+
dbg("[P][%02d] <<< 消費されるまで待機します。(残:%d)", id, len(ch))
128+
producer.Wait()
129+
}
130+
}()
131+
132+
func() {
133+
consumer.L.Lock()
134+
defer consumer.L.Unlock()
135+
136+
count = rand.IntN(MAX_ITEM_COUNT)
137+
for c := range count {
138+
ch <- i + (c + 1)
139+
}
140+
141+
log.Printf("[P][%02d] >>> %d個生産しました。(残:%d)", id, count, len(ch))
142+
consumer.Broadcast()
143+
144+
i += count
145+
}()
146+
147+
// 次のタスク着手まで少し休憩
148+
<-time.After(time.Duration(rand.IntN(500)) * time.Millisecond)
149+
}
150+
}
151+
152+
func consume(id int, ch <-chan int, consumer *sync.Cond) {
153+
for {
154+
func() {
155+
consumer.L.Lock()
156+
defer consumer.L.Unlock()
157+
158+
for len(ch) == 0 {
159+
dbg("[C][%02d] <<< 生産されるまで待機します。(残:%d)", id, len(ch))
160+
consumer.Wait()
161+
}
162+
163+
log.Printf("[C][%02d] >>> 消費しました (%v)(残:%d)", id, <-ch, len(ch))
164+
}()
165+
166+
// 次のタスク着手まで少し休憩
167+
<-time.After(time.Duration(rand.IntN(1000)) * time.Millisecond)
168+
}
169+
}
170+
171+
func dbg(format string, v ...any) {
172+
if args.debug {
173+
log.Printf(format, v...)
174+
}
175+
}

0 commit comments

Comments
 (0)