Skip to content

Commit 2a4d74b

Browse files
committed
work on backpressure
1 parent 580530b commit 2a4d74b

File tree

7 files changed

+74
-17
lines changed

7 files changed

+74
-17
lines changed

.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
1-
.idea
1+
.idea
2+
.vendor

queue/Makefile

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
.PHONY: server client
2+
3+
PORT ?= 8080
4+
CAPACITY ?= 100
5+
WORKERS ?= 4
6+
TOTAL ?= 100
7+
CONCURRENCY ?= 4
8+
ITERATIONS ?= 1
9+
PPROF_PORT ?= 8081
10+
PPROF_FILE ?= mem.pprof
11+
12+
server:
13+
go run main.go -mode=server -addr=:$(PORT) -capacity=$(CAPACITY) -workers=$(WORKERS)
14+
15+
client:
16+
go run main.go -mode=client -addr=:$(PORT) -total=$(TOTAL) -concurrency=$(CONCURRENCY) -iterations=$(ITERATIONS)
17+
18+
profmem:
19+
go tool pprof -http=:$(PPROF_PORT) $(PPROF_FILE)

queue/main.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88
"github.com/nathanverse/go-concurrency-exercises/queue/runner"
99
)
1010

11+
//go run main.go -mode=server -workers=4 -capacity=10 -addr=:8082
12+
1113
func main() {
1214
mode := flag.String("mode", "server", "choose server or client mode")
1315
addr := flag.String("addr", ":8080", "tcp listen address")

queue/mem.pprof

5.16 KB
Binary file not shown.

queue/runner/client.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ func RunClient(cfg ClientConfig) error {
3939

4040
for i := 0; i < cfg.Concurrency; i++ {
4141
wg.Add(1)
42-
go func() {
42+
go func(index int) {
43+
fmt.Printf("Starting goroutine %d\n", index+1)
4344
defer wg.Done()
4445

4546
conn, err := net.Dial("tcp", cfg.Addr)
@@ -54,6 +55,7 @@ func RunClient(cfg ClientConfig) error {
5455

5556
for {
5657
idx := atomic.AddInt64(&sent, 1)
58+
fmt.Printf("Goroutine %d runs task %d\n", index+1, int(idx))
5759
if idx > int64(cfg.Total) {
5860
return
5961
}
@@ -75,13 +77,14 @@ func RunClient(cfg ClientConfig) error {
7577
return
7678
}
7779
if resp.Error != "" {
80+
fmt.Printf("Error received: %s", resp.Error)
7881
recordError(errCh, &once, errors.New(resp.Error))
7982
return
8083
}
8184

8285
atomic.AddInt64(&completed, 1)
8386
}
84-
}()
87+
}(i)
8588
}
8689

8790
wg.Wait()

queue/server/server.go

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,14 @@ import (
44
"encoding/json"
55
"errors"
66
"fmt"
7+
"github.com/nathanverse/go-concurrency-exercises/queue/internal"
8+
"github.com/nathanverse/go-concurrency-exercises/queue/tasks"
79
"github.com/pkg/profile"
810
"io"
911
"net"
1012
"sync"
11-
12-
"github.com/nathanverse/go-concurrency-exercises/queue/internal"
13-
"github.com/nathanverse/go-concurrency-exercises/queue/tasks"
13+
"sync/atomic"
14+
"time"
1415
)
1516

1617
type response struct {
@@ -19,9 +20,11 @@ type response struct {
1920
Error string `json:"error,omitempty"`
2021
}
2122

23+
var waitingGoroutines int64
24+
2225
// Serve listens for TCP connections and forwards incoming tasks to the queue.
2326
func Serve(addr string, queue internal.IQueue, done <-chan struct{}) error {
24-
defer profile.Start(profile.TraceProfile, profile.ProfilePath(".")).Stop()
27+
defer profile.Start(profile.MemProfile, profile.ProfilePath(".")).Stop()
2528

2629
listener, err := net.Listen("tcp", addr)
2730
if err != nil {
@@ -37,6 +40,20 @@ func Serve(addr string, queue internal.IQueue, done <-chan struct{}) error {
3740
listener.Close()
3841
}()
3942

43+
go func() {
44+
for {
45+
select {
46+
case <-done:
47+
{
48+
break
49+
}
50+
default:
51+
time.Sleep(5 * time.Second)
52+
fmt.Println("Goroutines count: ", atomic.LoadInt64(&waitingGoroutines))
53+
}
54+
}
55+
}()
56+
4057
for {
4158
conn, err := listener.Accept()
4259
if err != nil {
@@ -58,18 +75,28 @@ func Serve(addr string, queue internal.IQueue, done <-chan struct{}) error {
5875

5976
wg.Add(1)
6077
go func(c net.Conn) {
61-
defer wg.Done()
78+
idx := atomic.AddInt64(&waitingGoroutines, 1)
79+
defer func() {
80+
wg.Done()
81+
fmt.Printf("Goroutine %d exits\n", idx+1)
82+
atomic.AddInt64(&waitingGoroutines, -1)
83+
}()
84+
fmt.Printf("Goroutine %d accpet connection\n", idx+1)
6285
handleConnection(c, queue, done)
6386
}(conn)
6487
}
6588
}
6689

6790
func handleConnection(conn net.Conn, queue internal.IQueue, done <-chan struct{}) {
68-
defer conn.Close()
91+
isFull := false
92+
93+
defer func() {
94+
conn.Close()
95+
fmt.Println("Connection closed, isFull=", isFull)
96+
}()
6997

7098
decoder := json.NewDecoder(conn)
7199
encoder := json.NewEncoder(conn)
72-
73100
for {
74101
var task tasks.Task
75102
if err := decoder.Decode(&task); err != nil {
@@ -84,6 +111,8 @@ func handleConnection(conn net.Conn, queue internal.IQueue, done <-chan struct{}
84111
ch, err := queue.Put(&task)
85112
if err != nil {
86113
_ = encoder.Encode(response{ID: task.Id, Error: err.Error()})
114+
fmt.Printf("Error putting task to queue: %s, retry \n", err.Error())
115+
isFull = true
87116
continue
88117
}
89118

queue/tasks/base.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
package tasks
22

33
import (
4-
"crypto/sha256"
54
"encoding/json"
65
"fmt"
6+
"time"
77
)
88

99
const (
@@ -48,10 +48,13 @@ type HashTaskOutput struct {
4848

4949
func HashTask(iterations int) []byte {
5050
data := []byte("benchmark")
51-
var sum []byte
52-
for i := 0; i < iterations; i++ {
53-
h := sha256.Sum256(data)
54-
sum = h[:]
55-
}
56-
return sum
51+
//var sum []byte
52+
//for i := 0; i < iterations; i++ {
53+
// h := sha256.Sum256(data)
54+
// sum = h[:]
55+
//}
56+
//return sum
57+
58+
time.Sleep(time.Duration(iterations) * time.Second)
59+
return data
5760
}

0 commit comments

Comments
 (0)