Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ splunk_app_gogen.spl
.configcache*
.versioncache*
roveralls*
.idea/*
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
github.com/LawrenceWoodman/roveralls v0.0.0-20171119193843-51b78509b607 // indirect
github.com/Sirupsen/logrus v0.0.0-20160829202321-3ec0642a7fb6
github.com/ahmetb/govvv v0.2.0 // indirect
github.com/aws/aws-sdk-go v1.20.7
github.com/cactus/gostrftime v0.0.0-20160820080632-908b7e9ab9b8
github.com/coccyx/go-s2s v0.0.0-20190615184235-2f7a532dc47c
github.com/coccyx/timeparser v0.0.0-20160911001308-9d8b37af28f5
Expand All @@ -20,6 +21,7 @@ require (
github.com/layeh/gopher-luar v0.0.0-20160805223923-921d03e21a78
github.com/mattn/go-runewidth v0.0.1 // indirect
github.com/mattn/goveralls v0.0.2 // indirect
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8 // indirect
github.com/mgutz/ansi v0.0.0-20150914162238-c286dcecd19f // indirect
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826
github.com/olekukonko/tablewriter v0.0.0-20160923125401-bdcc175572fd
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ github.com/Sirupsen/logrus v0.0.0-20160829202321-3ec0642a7fb6 h1:Tp6VdyWz8sPuNnR
github.com/Sirupsen/logrus v0.0.0-20160829202321-3ec0642a7fb6/go.mod h1:rmk17hk6i8ZSAJkSDa7nOxamrG+SP4P0mm+DAvExv4U=
github.com/ahmetb/govvv v0.2.0 h1:3rzpWlytDIDQfL64/Kbjs88tNJltY9E8sk1tNju6FAU=
github.com/ahmetb/govvv v0.2.0/go.mod h1:4WRFpdWtc/YtKgPFwa1dr5+9hiRY5uKAL08bOlxOR6s=
github.com/aws/aws-sdk-go v1.20.7 h1:fRjZRYJg0wPCA8yaDdb3DeP4rVjjmEiuqYhYoqOaIJg=
github.com/aws/aws-sdk-go v1.20.7/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/cactus/gostrftime v0.0.0-20160820080632-908b7e9ab9b8 h1:obM8OBXqb463/q4xRedAmLhfPc3oE7N84R66sxNxnAg=
github.com/cactus/gostrftime v0.0.0-20160820080632-908b7e9ab9b8/go.mod h1:OUM5bzoyxlhh7aPq8iO7vbl/jsiWvrbn70uCqOVOPkE=
github.com/coccyx/go-s2s v0.0.0-20190413180338-bdc39b6946f5 h1:2GFB4vlbR4HFLl0EBcXrXjC+iBeSQ9Qx3TXcTEFdZ0E=
Expand All @@ -28,6 +30,8 @@ github.com/hhkbp2/go-strftime v0.0.0-20150709091403-d82166ec6782 h1:Evl9i7wBY3bj
github.com/hhkbp2/go-strftime v0.0.0-20150709091403-d82166ec6782/go.mod h1:x8/IOQ5qQ4DKfiTmD9wBhQ40edg5wh7gMRwdLg07mMw=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869 h1:IPJ3dvxmJ4uczJe5YQdrYB16oTJlGSC/OyZDqUk9xX4=
github.com/jehiah/go-strftime v0.0.0-20171201141054-1d33003b3869/go.mod h1:cJ6Cj7dQo+O6GJNiMx+Pa94qKj+TG8ONdKHgMNIyyag=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/joyt/godate v0.0.0-20150226210126-7151572574a7 h1:2wH5antjhmU3EuWyidm0lJ4B9hGMpl5lNRo+M9uGJ5A=
github.com/joyt/godate v0.0.0-20150226210126-7151572574a7/go.mod h1:R+UgFL3iylLhx9N4w35zZ2HdhDlgorRDx4SxbchWuN0=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
Expand All @@ -42,6 +46,8 @@ github.com/mattn/go-runewidth v0.0.1 h1:+EiaBVXhogb1Klb4tRJ7hYnuGK6PkKOZlK04D/GM
github.com/mattn/go-runewidth v0.0.1/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/goveralls v0.0.2 h1:7eJB6EqsPhRVxvwEXGnqdO2sJI0PTsrWoTMXEk9/OQc=
github.com/mattn/goveralls v0.0.2/go.mod h1:8d1ZMHsd7fW6IRPKQh46F2WRpyib5/X4FOpevwGNQEw=
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8 h1:7TJiWD1knYDpOAPyFBoKqoyvlsa+UwDw0kv0jVN5Mrk=
github.com/meirf/gopart v0.0.0-20180520194036-37e9492a85a8/go.mod h1:Uz8uoD6o+eQN19hr6Yro/qKvW+KP6olFq+PK/Nn7gCE=
github.com/mgutz/ansi v0.0.0-20150914162238-c286dcecd19f h1:9RKjtkGo+3aNumDva3Gdia9GAYWSHhlh/QHEsTgBF6Y=
github.com/mgutz/ansi v0.0.0-20150914162238-c286dcecd19f/go.mod h1:01TrycV0kFyexm33Z7vhZRXopbI8J3TDReVlkTgMUxE=
github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 h1:RWengNIwukTxcDr9M+97sNutRR1RKhG96O6jWumTTnw=
Expand Down
95 changes: 95 additions & 0 deletions outputter/kinesis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package outputter

import (
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/kinesis"
config "github.com/coccyx/gogen/internal"
"math/rand"
)

type kinesisout struct {
buf []*kinesis.PutRecordsRequestEntry
client *kinesis.Kinesis
initialized bool
closed bool
endpoint string
}

func (k *kinesisout) Send(item *config.OutQueueItem) error {
if !k.initialized {
k.buf = []*kinesis.PutRecordsRequestEntry{}
sess, err := session.NewSession()
if err != nil {
return err
}
k.client = kinesis.New(sess)
k.endpoint = item.S.Output.Endpoints[rand.Intn(len(item.S.Output.Endpoints))]
}

for _, e := range item.Events {
partkey := e["host"]

evt := e["_raw"]

if evt[len(evt) - 1] != '\n' {
evt = evt + "\n"
}
rec := kinesis.PutRecordsRequestEntry{
PartitionKey: &partkey,
Data: []byte(evt),
}
k.buf = append(k.buf, &rec)
}

if len(k.buf) >= 500 {
return k.flush()
}

return nil
}

func (k *kinesisout) flush() error {
var records []*kinesis.PutRecordsRequestEntry
if len(k.buf) == 0 {
return nil
}
if len(k.buf) > 500 {
records = k.buf[:500]
k.buf = k.buf[500:]
} else {
records = k.buf
k.buf = []*kinesis.PutRecordsRequestEntry{}
}

kinesisRequest := kinesis.PutRecordsInput{
Records: records,
StreamName: &k.endpoint,
}

results, e := k.client.PutRecords(&kinesisRequest)

if e == nil {
if *results.FailedRecordCount > 0 {
print(*results.FailedRecordCount, " records failed.\n")
for i, record := range results.Records {
if record.ErrorCode != nil {
print(i, " failed with error ", *record.ErrorMessage, "\n")
k.buf = append(records[i:i], k.buf...)
}
}
}
}

return e
}

func (k *kinesisout) Close() error {
if !k.closed {
k.closed = true
err := k.flush()
if err != nil {
return err
}
}
return nil
}
2 changes: 2 additions & 0 deletions outputter/outputter.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,8 @@ func setup(generator *rand.Rand, item *config.OutQueueItem, num int) config.Outp
gout[num] = new(network)
case "kafka":
gout[num] = new(kafkaout)
case "kinesis":
gout[num] = new(kinesisout)
default:
gout[num] = new(stdout)
}
Expand Down
202 changes: 202 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/LICENSE.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions vendor/github.com/aws/aws-sdk-go/NOTICE.txt

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading