-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathwaiting_buffer.go
More file actions
120 lines (99 loc) · 2.24 KB
/
waiting_buffer.go
File metadata and controls
120 lines (99 loc) · 2.24 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package threads
import (
"io"
"sync"
"time"
)
// WaitingBuffer is a buffer that can be written to from one thread and read from another thread.
// Reads will wait for enough data to be written or for the buffer to be closed. It will
// automatically discard data that has already been read.
type WaitingBuffer struct {
buffer [][]byte
isClosed bool
byteCount uint64
lock sync.Mutex
}
func NewWaitingBuffer() *WaitingBuffer {
return &WaitingBuffer{}
}
func (wb WaitingBuffer) ByteCount() uint64 {
wb.lock.Lock()
defer wb.lock.Unlock()
return wb.byteCount
}
func (wb *WaitingBuffer) Read(b []byte) (int, error) {
offset := 0
l := len(b)
for {
wb.lock.Lock()
byteCount, err := wb.read(b)
if err == nil {
offset += byteCount
if offset == l {
wb.lock.Unlock()
return offset, nil
}
b = b[byteCount:]
} else if err == io.EOF {
offset += byteCount
b = b[byteCount:]
} else {
wb.lock.Unlock()
return offset, err
}
if wb.isClosed {
wb.lock.Unlock()
return offset, io.EOF
}
wb.lock.Unlock()
time.Sleep(1) // sleep to wait for more writes
}
}
func (wb *WaitingBuffer) read(b []byte) (int, error) {
readOffset := 0
writeOffset := 0
for {
if len(wb.buffer) == 0 {
return readOffset, io.EOF
}
destLen := len(b) - writeOffset
sourceLen := len(wb.buffer[0])
copyLen := copy(b[writeOffset:], wb.buffer[0])
readOffset += copyLen
if copyLen == sourceLen {
// byte slice is consumed so remove it
wb.buffer = wb.buffer[1:]
} else {
// byte slice is partially consumed so truncate it
wb.buffer[0] = wb.buffer[0][copyLen:]
}
if copyLen == destLen {
// destination is full so finish
return readOffset, nil
}
// move byte slice up for the next write
writeOffset += copyLen
// b = b[copyLen:]
}
}
func (wb *WaitingBuffer) Write(b []byte) (n int, err error) {
wb.lock.Lock()
defer wb.lock.Unlock()
l := len(b)
if wb.isClosed {
return l, io.ErrClosedPipe
}
// Copy byte slice
wb.byteCount += uint64(l)
newBytes := make([]byte, l)
copy(newBytes, b)
// Append byte slice to list
wb.buffer = append(wb.buffer, newBytes)
return l, nil
}
func (wb *WaitingBuffer) Close() error {
wb.lock.Lock()
defer wb.lock.Unlock()
wb.isClosed = true
return nil
}