From 6e646574e815063e1acde2a08f01fb1a7484d116 Mon Sep 17 00:00:00 2001 From: tomwu3 Date: Fri, 18 Aug 2023 14:41:31 +0800 Subject: [PATCH] Support sending data to kafka --- config/config.go | 141 ++++++++++++----------- example/kafka_config/heplify-server.toml | 75 ++++++++++++ go.mod | 12 +- remotelog/kafka.go | 78 +++++++++++++ remotelog/remotelog.go | 1 + server/server.go | 27 +++++ 6 files changed, 260 insertions(+), 74 deletions(-) create mode 100644 example/kafka_config/heplify-server.toml create mode 100644 remotelog/kafka.go diff --git a/config/config.go b/config/config.go index 01fbb4e2..e4fa9c0b 100644 --- a/config/config.go +++ b/config/config.go @@ -5,72 +5,77 @@ const Version = "heplify-server 1.59.4" var Setting HeplifyServer type HeplifyServer struct { - HEPAddr string `default:"0.0.0.0:9060"` - HEPTCPAddr string `default:""` - HEPTLSAddr string `default:""` - HEPWSAddr string `default:""` - ESAddr string `default:""` - ESDiscovery bool `default:"true"` - HEPv2Enable bool `default:"true"` - ESUser string `default:""` - ESPass string `default:""` - LokiURL string `default:""` - LokiBulk int `default:"400"` - LokiTimer int `default:"4"` - LokiBuffer int `default:"100000"` - LokiHEPFilter []int `default:"1,5,100"` - LokiIPPortLabels bool `default:"false"` - ForceHEPPayload []int `default:""` - PromAddr string `default:":9096"` - PromTargetIP string `default:""` - PromTargetName string `default:""` - DBShema string `default:"homer5"` - DBDriver string `default:"mysql"` - DBAddr string `default:"localhost:3306"` - DBSSLMode string `default:"disable"` - DBUser string `default:"root"` - DBPass string `default:""` - DBDataTable string `default:"homer_data"` - DBConfTable string `default:"homer_configuration"` - DBBulk int `default:"400"` - DBTimer int `default:"4"` - DBBuffer int `default:"400000"` - DBWorker int `default:"8"` - DBRotate bool `default:"true"` - DBPartLog string `default:"2h"` - DBPartIsup string `default:"6h"` - DBPartSip string `default:"2h"` - DBPartQos string `default:"6h"` - DBDropDays int `default:"14"` - DBDropDaysCall int `default:"0"` - DBDropDaysRegister int `default:"0"` - DBDropDaysDefault int `default:"0"` - DBDropOnStart bool `default:"false"` - DBUsageProtection bool `default:"false"` - DBUsageScheme string `default:"percentage"` - DBPercentageUsage string `default:"80%"` - DBMaxSize string `default:"20GB"` - DBProcDropLimit int `default:"2"` - Dedup bool `default:"false"` - DiscardMethod []string `default:""` - CensorMethod []string `default:""` - AlegIDs []string `default:""` - ForceALegID bool `default:"false"` - CustomHeader []string `default:""` - IgnoreCaseCH bool `default:"false"` - SIPHeader []string `default:"ruri_user,ruri_domain,from_user,from_tag,to_user,callid,cseq,method,user_agent"` - LogDbg string `default:""` - LogLvl string `default:"info"` - LogStd bool `default:"false"` - LogSys bool `default:"false"` - Config string `default:"./heplify-server.toml"` - ConfigHTTPAddr string `default:""` - ConfigHTTPPW string `default:""` - Version bool `default:"false"` - ScriptEnable bool `default:"false"` - ScriptEngine string `default:"lua"` - ScriptFolder string `default:""` - ScriptHEPFilter []int `default:"1,5,100"` - TLSCertFolder string `default:"."` - TLSMinVersion string `default:"1.2"` + HEPAddr string `default:"0.0.0.0:9060"` + HEPTCPAddr string `default:""` + HEPTLSAddr string `default:""` + HEPWSAddr string `default:""` + ESAddr string `default:""` + ESDiscovery bool `default:"true"` + HEPv2Enable bool `default:"true"` + ESUser string `default:""` + ESPass string `default:""` + LokiURL string `default:""` + LokiBulk int `default:"400"` + LokiTimer int `default:"4"` + LokiBuffer int `default:"100000"` + LokiHEPFilter []int `default:"1,5,100"` + LokiIPPortLabels bool `default:"false"` + ForceHEPPayload []int `default:""` + PromAddr string `default:":9096"` + PromTargetIP string `default:""` + PromTargetName string `default:""` + DBShema string `default:"homer5"` + DBDriver string `default:"mysql"` + DBAddr string `default:"localhost:3306"` + DBSSLMode string `default:"disable"` + DBUser string `default:"root"` + DBPass string `default:""` + DBDataTable string `default:"homer_data"` + DBConfTable string `default:"homer_configuration"` + DBBulk int `default:"400"` + DBTimer int `default:"4"` + DBBuffer int `default:"400000"` + DBWorker int `default:"8"` + DBRotate bool `default:"true"` + DBPartLog string `default:"2h"` + DBPartIsup string `default:"6h"` + DBPartSip string `default:"2h"` + DBPartQos string `default:"6h"` + DBDropDays int `default:"14"` + DBDropDaysCall int `default:"0"` + DBDropDaysRegister int `default:"0"` + DBDropDaysDefault int `default:"0"` + DBDropOnStart bool `default:"false"` + DBUsageProtection bool `default:"false"` + DBUsageScheme string `default:"percentage"` + DBPercentageUsage string `default:"80%"` + DBMaxSize string `default:"20GB"` + DBProcDropLimit int `default:"2"` + Dedup bool `default:"false"` + DiscardMethod []string `default:""` + CensorMethod []string `default:""` + AlegIDs []string `default:""` + ForceALegID bool `default:"false"` + CustomHeader []string `default:""` + IgnoreCaseCH bool `default:"false"` + SIPHeader []string `default:"ruri_user,ruri_domain,from_user,from_tag,to_user,callid,cseq,method,user_agent"` + LogDbg string `default:""` + LogLvl string `default:"info"` + LogStd bool `default:"false"` + LogSys bool `default:"false"` + Config string `default:"./heplify-server.toml"` + ConfigHTTPAddr string `default:""` + ConfigHTTPPW string `default:""` + Version bool `default:"false"` + ScriptEnable bool `default:"false"` + ScriptEngine string `default:"lua"` + ScriptFolder string `default:""` + ScriptHEPFilter []int `default:"1,5,100"` + TLSCertFolder string `default:"."` + TLSMinVersion string `default:"1.2"` + KafkaBroker string `default:""` + KafkaTopic string `default:""` + KafkaDeliveryTimeout int `default:"60000"` + KafkaBatchNum int `default:"10"` + KafkaLinger int `default:"5000"` } diff --git a/example/kafka_config/heplify-server.toml b/example/kafka_config/heplify-server.toml new file mode 100644 index 00000000..99969cb2 --- /dev/null +++ b/example/kafka_config/heplify-server.toml @@ -0,0 +1,75 @@ +HEPAddr = "0.0.0.0:9060" +HEPTCPAddr = "" +HEPTLSAddr = "0.0.0.0:9060" +HEPWSAddr = "0.0.0.0:3000" +ESAddr = "" +ESDiscovery = true +LokiURL = "" +LokiBulk = 200 +LokiTimer = 4 +LokiBuffer = 100000 +LokiHEPFilter = [1,5,100] +ForceHEPPayload = [] +PromAddr = "" +PromTargetIP = "" +PromTargetName = "" +DBShema = "homer7" +DBDriver = "postgres" +DBAddr = "localhost:5432" +DBUser = "postgres" +DBPass = "" +DBDataTable = "homer_data" +DBConfTable = "homer_config" +DBBulk = 200 +DBTimer = 4 +DBBuffer = 400000 +DBWorker = 8 +DBRotate = true +DBPartLog = "2h" +DBPartSip = "1h" +DBPartQos = "6h" +DBDropDays = 14 +DBDropDaysCall = 0 +DBDropDaysRegister = 0 +DBDropDaysDefault = 0 +DBDropOnStart = false +DBUsageProtection = true +DBUsageScheme = "percentage" +DBPercentageUsage = "80%" +DBMaxSize = "30MB" +Dedup = false +DiscardMethod = [] +AlegIDs = [] +CustomHeader = [] +SIPHeader = [] +LogDbg = "" +LogLvl = "info" +LogStd = false +LogSys = false +Config = "./heplify-server.toml" +ConfigHTTPAddr = "" +KafkaBroker = "" +KafkaTopic = "" +KafkaDeliveryTimeout = 60000 +KafkaBatchNum = 10 +KafkaLinger = 5000 +# Examples: +# ------------------------------------- +# ESAddr = "http://127.0.0.1:9200" +# DBShema = "homer7" +# DBDriver = "postgres" +# LokiURL = "http://localhost:3100/api/prom/push" +# LokiHEPFilter = [1,5,100] +# PromAddr = "0.0.0.0:8899" +# PromTargetIP = "10.1.2.111,10.1.2.4,10.1.2.5,10.1.2.6,10.12.44.222" +# PromTargetName = "sbc_access,sbc_core,kamailio,asterisk,pstn_gateway" +# AlegIDs = ["X-CID","P-Charging-Vector,icid-value=\"?(.*?)(?:\"|;|$)","X-BroadWorks-Correlation-Info"] +# DiscardMethod = ["OPTIONS","NOTIFY"] +# CustomHeader = ["X-CustomerIP","X-Billing"] +# SIPHeader = ["callid","callid_aleg","method","ruri_user","ruri_domain","from_user","from_domain","from_tag","to_user","to_domain","to_tag","via","contact_user"] +# LogDbg = "hep,sql,loki" +# LogLvl = "warning" +# ConfigHTTPAddr = "0.0.0.0:9876" +# ------------------------------------- +# To hot reload PromTargetIP and PromTargetName run: +# killall -HUP heplify-server diff --git a/go.mod b/go.mod index 53238abf..f2fccba1 100644 --- a/go.mod +++ b/go.mod @@ -7,13 +7,14 @@ require ( github.com/antonmedv/expr v1.8.8 github.com/buger/jsonparser v1.1.1 github.com/cespare/xxhash/v2 v2.1.1 + github.com/confluentinc/confluent-kafka-go v1.9.2 github.com/fortytw2/leaktest v1.3.0 // indirect github.com/go-sql-driver/mysql v1.5.0 github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect github.com/gobwas/pool v0.2.0 // indirect github.com/gobwas/ws v1.0.3 github.com/gogo/protobuf v1.3.2 - github.com/golang/snappy v0.0.1 + github.com/golang/snappy v0.0.4 github.com/lib/pq v1.7.0 github.com/mailru/easyjson v0.7.1 // indirect github.com/negbie/cert v0.0.0-20190324145947-d1018a8fb00f @@ -22,14 +23,13 @@ require ( github.com/olivere/elastic v6.2.33+incompatible github.com/pelletier/go-toml v1.8.0 github.com/pkg/errors v0.9.1 // indirect - github.com/prometheus/client_golang v1.7.0 + github.com/prometheus/client_golang v1.7.1 github.com/prometheus/common v0.10.0 github.com/robfig/cron/v3 v3.0.1 github.com/sipcapture/golua v0.0.0-20200610090950-538d24098d76 - github.com/stretchr/testify v1.7.0 + github.com/stretchr/testify v1.7.1 github.com/valyala/bytebufferpool v1.0.0 github.com/valyala/fasttemplate v1.1.1 - golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 - google.golang.org/genproto v0.0.0-20200619004808-3e7fca5c55db // indirect - google.golang.org/grpc v1.29.1 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + google.golang.org/grpc v1.46.0 ) diff --git a/remotelog/kafka.go b/remotelog/kafka.go new file mode 100644 index 00000000..c7eb257d --- /dev/null +++ b/remotelog/kafka.go @@ -0,0 +1,78 @@ +package remotelog + +import ( + "context" + "encoding/json" + "time" + + "github.com/confluentinc/confluent-kafka-go/kafka" + "github.com/negbie/logp" + "github.com/sipcapture/heplify-server/config" + "github.com/sipcapture/heplify-server/decoder" +) + +type Kafka struct { + producer *kafka.Producer + topic string + ctx context.Context +} + +func (k *Kafka) setup() error { + k.ctx = context.Background() + k.topic = config.Setting.KafkaTopic + return k.createProducer() +} + +func (k *Kafka) start(hCh chan *decoder.HEP) { + logp.Info("start kafka...") + go func() { + for e := range k.producer.Events() { + switch ev := e.(type) { + case *kafka.Message: + if ev.TopicPartition.Error != nil { + logp.Err("Delivery failed: %v, payload: %s", ev.TopicPartition, string(ev.Value)) + } + } + } + }() + + ticker := time.NewTicker(1 * time.Hour) + defer ticker.Stop() + for { + select { + case pkt, ok := <-hCh: + if !ok { + return + } + + v, err := json.Marshal(pkt) + if err != nil { + logp.Err("%v", err) + } + k.producer.ProduceChannel() <- &kafka.Message{ + TopicPartition: kafka.TopicPartition{Topic: &k.topic, Partition: kafka.PartitionAny}, + Value: v, + } + case <-ticker.C: + if k.producer == nil { + k.createProducer() + } + } + } +} + +func (k *Kafka) createProducer() error { + var err error + if len(config.Setting.KafkaBroker) > 0 && len(config.Setting.KafkaTopic) > 0 { + k.producer, err = kafka.NewProducer(&kafka.ConfigMap{ + "bootstrap.servers": config.Setting.KafkaBroker, + "delivery.timeout.ms": config.Setting.KafkaDeliveryTimeout, + "linger.ms": config.Setting.KafkaLinger, + "batch.num.messages": config.Setting.KafkaBatchNum, + }) + } + if err != nil { + return err + } + return nil +} diff --git a/remotelog/remotelog.go b/remotelog/remotelog.go index 4d325fe5..21200dfa 100644 --- a/remotelog/remotelog.go +++ b/remotelog/remotelog.go @@ -19,6 +19,7 @@ func New(name string) *Remotelog { var register = map[string]RemoteHandler{ "elasticsearch": new(Elasticsearch), "loki": new(Loki), + "kafka": new(Kafka), } return &Remotelog{ diff --git a/server/server.go b/server/server.go index a37425b9..930ce688 100644 --- a/server/server.go +++ b/server/server.go @@ -24,6 +24,7 @@ type HEPInput struct { promCh chan *decoder.HEP esCh chan *decoder.HEP lokiCh chan *decoder.HEP + kafkaCh chan *decoder.HEP wg *sync.WaitGroup buffer *sync.Pool exitUDP chan bool @@ -38,6 +39,7 @@ type HEPInput struct { usePM bool useES bool useLK bool + useKafka bool } type HEPStats struct { @@ -77,6 +79,10 @@ func NewHEPInput() *HEPInput { h.useLK = true h.lokiCh = make(chan *decoder.HEP, config.Setting.LokiBuffer) } + if len(config.Setting.KafkaBroker) > 2 { + h.useKafka = true + h.kafkaCh = make(chan *decoder.HEP, 40000) + } return h } @@ -154,6 +160,16 @@ func (h *HEPInput) Run() { defer d.End() } + if h.useKafka { + k := remotelog.New("kafka") + k.Chan = h.kafkaCh + + if err := k.Run(); err != nil { + logp.Err("%v", err) + } + defer k.End() + } + h.wg.Wait() } @@ -284,6 +300,17 @@ func (h *HEPInput) worker() { } } } + + if h.useKafka { + select { + case h.kafkaCh <- hepPkt: + default: + if time.Since(lastWarn) > 1e9 { + logp.Warn("overflowing kafka channel") + } + lastWarn = time.Now() + } + } } } }