Skip to content

Commit bf07582

Browse files
author
rxie
committed
Merge branch 'release/2.0.0' into develop
2 parents 0543263 + 0400d54 commit bf07582

File tree

6 files changed

+78
-13
lines changed

6 files changed

+78
-13
lines changed

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
PLUGIN_NAME=splunk/docker-logging-plugin
2-
PLUGIN_TAG=latest
2+
PLUGIN_TAG=2.0.0-rc2
33
PLUGIN_DIR=./splunk-logging-plugin
44

55
all: clean docker rootfs create

config.json

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,48 @@
1515
"description": "Set log level to output for plugin logs",
1616
"value": "info",
1717
"settable": ["value"]
18+
},
19+
{
20+
"name": "SPLUNK_LOGGING_DRIVER_FIFO_ERROR_RETRY_TIME",
21+
"description": "Set number of retry when reading fifo from docker failed. -1 means retry forever",
22+
"value": "3",
23+
"settable": ["value"]
24+
},
25+
{
26+
"name": "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_FREQUENCY",
27+
"description": "Set how often do we send messages (if we are not reaching batch size)",
28+
"value": "5s",
29+
"settable": ["value"]
30+
},
31+
{
32+
"name": "SPLUNK_LOGGING_DRIVER_POST_MESSAGES_BATCH_SIZE",
33+
"description": "Set number of messages to batch before buffer timeout",
34+
"value": "1000",
35+
"settable": ["value"]
36+
},
37+
{
38+
"name": "SPLUNK_LOGGING_DRIVER_BUFFER_MAX",
39+
"description": "Set maximum number of messages wait in the buffer before sent to Splunk",
40+
"value": "10000",
41+
"settable": ["value"]
42+
},
43+
{
44+
"name": "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE",
45+
"description": "Set number of messages allowed to be queued in the channel when reading from the docker provided FIFO",
46+
"value": "4000",
47+
"settable": ["value"]
48+
},
49+
{
50+
"name": "SPLUNK_LOGGING_DRIVER_TEMP_MESSAGES_HOLD_DURATION",
51+
"description": "Used when logs that are chunked by docker with 16kb limit. Set how long the system can wait for the next message to come.",
52+
"value": "100ms",
53+
"settable": ["value"]
54+
},
55+
{
56+
"name": "SPLUNK_LOGGING_DRIVER_TEMP_MESSAGES_BUFFER_SIZE",
57+
"description": "Used when logs that are chunked by docker with 16kb limit. Set the biggest message that the system can reassemble.",
58+
"value": "1048576",
59+
"settable": ["value"]
1860
}
1961
]
2062
}

driver.go

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,12 @@ type logPair struct {
5151
info logger.Info
5252
}
5353

54+
func (lf *logPair) Close() {
55+
lf.stream.Close()
56+
lf.splunkl.Close()
57+
lf.jsonl.Close()
58+
}
59+
5460
func newDriver() *driver {
5561
return &driver{
5662
logs: make(map[string]*logPair),
@@ -108,7 +114,9 @@ func (d *driver) StartLogging(file string, logCtx logger.Info) error {
108114

109115
// start to process the logs generated by docker
110116
logrus.Debug("Start processing messages")
111-
mg := &messageProcessor{}
117+
mg := &messageProcessor{
118+
retryNumber: getAdvancedOptionInt(envVarReadFifoErrorRetryNumber, defaultReadFifoErrorRetryNumber),
119+
}
112120
go mg.process(lf)
113121
return nil
114122
}
@@ -118,7 +126,7 @@ func (d *driver) StopLogging(file string) error {
118126
d.mu.Lock()
119127
lf, ok := d.logs[file]
120128
if ok {
121-
lf.stream.Close()
129+
lf.Close()
122130
delete(d.logs, file)
123131
}
124132
d.mu.Unlock()

message_processor.go

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ import (
2020
"bytes"
2121
"encoding/binary"
2222
"io"
23+
"os"
24+
"strings"
2325
"time"
2426
"unicode/utf8"
2527

@@ -30,6 +32,7 @@ import (
3032
)
3133

3234
type messageProcessor struct {
35+
retryNumber int
3336
}
3437

3538
func (mg messageProcessor) process(lf *logPair) {
@@ -49,21 +52,32 @@ func (mg messageProcessor) consumeLog(lf *logPair) {
4952
// create a protobuf reader for the log stream
5053
dec := protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6)
5154
defer dec.Close()
55+
defer lf.Close()
5256
// a temp buffer for each log entry
5357
var buf logdriver.LogEntry
58+
curRetryNumber := 0
5459
for {
55-
// reads a message from the log stream and put it in a buffer until the EOF
56-
// if there is any other error, recreate the stream reader
60+
// reads a message from the log stream and put it in a buffer
5761
if err := dec.ReadMsg(&buf); err != nil {
58-
if err == io.EOF {
59-
logrus.WithField("id", lf.info.ContainerID).WithError(err).Debug("shutting down log logger")
60-
lf.stream.Close()
62+
// exit the loop if reader reaches EOF or the fifo is closed by the writer
63+
if err == io.EOF || err == os.ErrClosed || strings.Contains(err.Error(), "file already closed") {
64+
logrus.WithField("id", lf.info.ContainerID).WithError(err).Info("shutting down loggers")
6165
return
6266
}
6367

64-
logrus.WithField("id", lf.info.ContainerID).WithError(err).Debug("Ignoring error")
68+
// exit the loop if retry number reaches the specified number
69+
if mg.retryNumber != -1 && curRetryNumber > mg.retryNumber {
70+
logrus.WithField("id", lf.info.ContainerID).WithField("curRetryNumber", curRetryNumber).WithField("retryNumber", mg.retryNumber).WithError(err).Error("Stop retrying. Shutting down loggers")
71+
return
72+
}
73+
74+
// if there is any other error, retry for robustness. If retryNumber is -1, retry forever
75+
curRetryNumber++
76+
logrus.WithField("id", lf.info.ContainerID).WithField("curRetryNumber", curRetryNumber).WithField("retryNumber", mg.retryNumber).WithError(err).Error("Encountered error and retrying")
77+
time.Sleep(500 * time.Millisecond)
6578
dec = protoio.NewUint32DelimitedReader(lf.stream, binary.BigEndian, 1e6)
6679
}
80+
curRetryNumber = 0
6781

6882
if mg.shouldSendMessage(buf.Line) {
6983
// Append to temp buffer
@@ -85,8 +99,6 @@ func (mg messageProcessor) sendMessage(l logger.Logger, buf *logdriver.LogEntry,
8599
// Only send if partial bit is not set or temp buffer size reached max or temp buffer timer expired
86100
// Check for temp buffer timer expiration
87101
if !buf.Partial || t.shouldFlush(time.Now()) {
88-
logrus.WithField("id", containerid).WithField("Buffer partial flag should be false:",
89-
buf.Partial).WithField("Temp buffer Length:", t.tBuf.Len()).Debug("Buffer details")
90102
msg.Line = t.tBuf.Bytes()
91103
msg.Source = buf.Source
92104
msg.Partial = buf.Partial

partial_message_buffer.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@ func (b *partialMsgBuffer) append(l *logdriver.LogEntry) (err error) {
5050

5151
func (b *partialMsgBuffer) reset() {
5252
if b.bufferReset {
53-
logrus.Debug("Resetting temp buffer")
5453
b.tBuf.Reset()
5554
b.bufferTimer = time.Now()
5655
}
@@ -67,4 +66,4 @@ func (b *partialMsgBuffer) hasLengthExceeded() bool {
6766

6867
func (b *partialMsgBuffer) shouldFlush(t time.Time) bool {
6968
return b.hasLengthExceeded() || b.hasHoldDurationExpired(t)
70-
}
69+
}

splunk_logger.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,9 @@ const (
7272
defaultPartialMsgBufferHoldDuration = 100 * time.Millisecond
7373
// Maximum buffer size for partial logging
7474
defaultPartialMsgBufferMaximum = 1024 * 1024
75+
// Number of retry if error happens while reading logs from docker provided fifo
76+
// -1 means retry forever
77+
defaultReadFifoErrorRetryNumber = 3
7578
)
7679

7780
const (
@@ -81,6 +84,7 @@ const (
8184
envVarStreamChannelSize = "SPLUNK_LOGGING_DRIVER_CHANNEL_SIZE"
8285
envVarPartialMsgBufferHoldDuration = "SPLUNK_LOGGING_DRIVER_TEMP_MESSAGES_HOLD_DURATION"
8386
envVarPartialMsgBufferMaximum = "SPLUNK_LOGGING_DRIVER_TEMP_MESSAGES_BUFFER_SIZE"
87+
envVarReadFifoErrorRetryNumber = "SPLUNK_LOGGING_DRIVER_FIFO_ERROR_RETRY_TIME"
8488
)
8589

8690
type splunkLoggerInterface interface {

0 commit comments

Comments
 (0)