Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 73 additions & 68 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,72 +5,77 @@ const Version = "heplify-server 1.59.4"
var Setting HeplifyServer

type HeplifyServer struct {
HEPAddr string `default:"0.0.0.0:9060"`
HEPTCPAddr string `default:""`
HEPTLSAddr string `default:""`
HEPWSAddr string `default:""`
ESAddr string `default:""`
ESDiscovery bool `default:"true"`
HEPv2Enable bool `default:"true"`
ESUser string `default:""`
ESPass string `default:""`
LokiURL string `default:""`
LokiBulk int `default:"400"`
LokiTimer int `default:"4"`
LokiBuffer int `default:"100000"`
LokiHEPFilter []int `default:"1,5,100"`
LokiIPPortLabels bool `default:"false"`
ForceHEPPayload []int `default:""`
PromAddr string `default:":9096"`
PromTargetIP string `default:""`
PromTargetName string `default:""`
DBShema string `default:"homer5"`
DBDriver string `default:"mysql"`
DBAddr string `default:"localhost:3306"`
DBSSLMode string `default:"disable"`
DBUser string `default:"root"`
DBPass string `default:""`
DBDataTable string `default:"homer_data"`
DBConfTable string `default:"homer_configuration"`
DBBulk int `default:"400"`
DBTimer int `default:"4"`
DBBuffer int `default:"400000"`
DBWorker int `default:"8"`
DBRotate bool `default:"true"`
DBPartLog string `default:"2h"`
DBPartIsup string `default:"6h"`
DBPartSip string `default:"2h"`
DBPartQos string `default:"6h"`
DBDropDays int `default:"14"`
DBDropDaysCall int `default:"0"`
DBDropDaysRegister int `default:"0"`
DBDropDaysDefault int `default:"0"`
DBDropOnStart bool `default:"false"`
DBUsageProtection bool `default:"false"`
DBUsageScheme string `default:"percentage"`
DBPercentageUsage string `default:"80%"`
DBMaxSize string `default:"20GB"`
DBProcDropLimit int `default:"2"`
Dedup bool `default:"false"`
DiscardMethod []string `default:""`
CensorMethod []string `default:""`
AlegIDs []string `default:""`
ForceALegID bool `default:"false"`
CustomHeader []string `default:""`
IgnoreCaseCH bool `default:"false"`
SIPHeader []string `default:"ruri_user,ruri_domain,from_user,from_tag,to_user,callid,cseq,method,user_agent"`
LogDbg string `default:""`
LogLvl string `default:"info"`
LogStd bool `default:"false"`
LogSys bool `default:"false"`
Config string `default:"./heplify-server.toml"`
ConfigHTTPAddr string `default:""`
ConfigHTTPPW string `default:""`
Version bool `default:"false"`
ScriptEnable bool `default:"false"`
ScriptEngine string `default:"lua"`
ScriptFolder string `default:""`
ScriptHEPFilter []int `default:"1,5,100"`
TLSCertFolder string `default:"."`
TLSMinVersion string `default:"1.2"`
HEPAddr string `default:"0.0.0.0:9060"`
HEPTCPAddr string `default:""`
HEPTLSAddr string `default:""`
HEPWSAddr string `default:""`
ESAddr string `default:""`
ESDiscovery bool `default:"true"`
HEPv2Enable bool `default:"true"`
ESUser string `default:""`
ESPass string `default:""`
LokiURL string `default:""`
LokiBulk int `default:"400"`
LokiTimer int `default:"4"`
LokiBuffer int `default:"100000"`
LokiHEPFilter []int `default:"1,5,100"`
LokiIPPortLabels bool `default:"false"`
ForceHEPPayload []int `default:""`
PromAddr string `default:":9096"`
PromTargetIP string `default:""`
PromTargetName string `default:""`
DBShema string `default:"homer5"`
DBDriver string `default:"mysql"`
DBAddr string `default:"localhost:3306"`
DBSSLMode string `default:"disable"`
DBUser string `default:"root"`
DBPass string `default:""`
DBDataTable string `default:"homer_data"`
DBConfTable string `default:"homer_configuration"`
DBBulk int `default:"400"`
DBTimer int `default:"4"`
DBBuffer int `default:"400000"`
DBWorker int `default:"8"`
DBRotate bool `default:"true"`
DBPartLog string `default:"2h"`
DBPartIsup string `default:"6h"`
DBPartSip string `default:"2h"`
DBPartQos string `default:"6h"`
DBDropDays int `default:"14"`
DBDropDaysCall int `default:"0"`
DBDropDaysRegister int `default:"0"`
DBDropDaysDefault int `default:"0"`
DBDropOnStart bool `default:"false"`
DBUsageProtection bool `default:"false"`
DBUsageScheme string `default:"percentage"`
DBPercentageUsage string `default:"80%"`
DBMaxSize string `default:"20GB"`
DBProcDropLimit int `default:"2"`
Dedup bool `default:"false"`
DiscardMethod []string `default:""`
CensorMethod []string `default:""`
AlegIDs []string `default:""`
ForceALegID bool `default:"false"`
CustomHeader []string `default:""`
IgnoreCaseCH bool `default:"false"`
SIPHeader []string `default:"ruri_user,ruri_domain,from_user,from_tag,to_user,callid,cseq,method,user_agent"`
LogDbg string `default:""`
LogLvl string `default:"info"`
LogStd bool `default:"false"`
LogSys bool `default:"false"`
Config string `default:"./heplify-server.toml"`
ConfigHTTPAddr string `default:""`
ConfigHTTPPW string `default:""`
Version bool `default:"false"`
ScriptEnable bool `default:"false"`
ScriptEngine string `default:"lua"`
ScriptFolder string `default:""`
ScriptHEPFilter []int `default:"1,5,100"`
TLSCertFolder string `default:"."`
TLSMinVersion string `default:"1.2"`
KafkaBroker string `default:""`
KafkaTopic string `default:""`
KafkaDeliveryTimeout int `default:"60000"`
KafkaBatchNum int `default:"10"`
KafkaLinger int `default:"5000"`
}
75 changes: 75 additions & 0 deletions example/kafka_config/heplify-server.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
HEPAddr = "0.0.0.0:9060"
HEPTCPAddr = ""
HEPTLSAddr = "0.0.0.0:9060"
HEPWSAddr = "0.0.0.0:3000"
ESAddr = ""
ESDiscovery = true
LokiURL = ""
LokiBulk = 200
LokiTimer = 4
LokiBuffer = 100000
LokiHEPFilter = [1,5,100]
ForceHEPPayload = []
PromAddr = ""
PromTargetIP = ""
PromTargetName = ""
DBShema = "homer7"
DBDriver = "postgres"
DBAddr = "localhost:5432"
DBUser = "postgres"
DBPass = ""
DBDataTable = "homer_data"
DBConfTable = "homer_config"
DBBulk = 200
DBTimer = 4
DBBuffer = 400000
DBWorker = 8
DBRotate = true
DBPartLog = "2h"
DBPartSip = "1h"
DBPartQos = "6h"
DBDropDays = 14
DBDropDaysCall = 0
DBDropDaysRegister = 0
DBDropDaysDefault = 0
DBDropOnStart = false
DBUsageProtection = true
DBUsageScheme = "percentage"
DBPercentageUsage = "80%"
DBMaxSize = "30MB"
Dedup = false
DiscardMethod = []
AlegIDs = []
CustomHeader = []
SIPHeader = []
LogDbg = ""
LogLvl = "info"
LogStd = false
LogSys = false
Config = "./heplify-server.toml"
ConfigHTTPAddr = ""
KafkaBroker = ""
KafkaTopic = ""
KafkaDeliveryTimeout = 60000
KafkaBatchNum = 10
KafkaLinger = 5000
# Examples:
# -------------------------------------
# ESAddr = "http://127.0.0.1:9200"
# DBShema = "homer7"
# DBDriver = "postgres"
# LokiURL = "http://localhost:3100/api/prom/push"
# LokiHEPFilter = [1,5,100]
# PromAddr = "0.0.0.0:8899"
# PromTargetIP = "10.1.2.111,10.1.2.4,10.1.2.5,10.1.2.6,10.12.44.222"
# PromTargetName = "sbc_access,sbc_core,kamailio,asterisk,pstn_gateway"
# AlegIDs = ["X-CID","P-Charging-Vector,icid-value=\"?(.*?)(?:\"|;|$)","X-BroadWorks-Correlation-Info"]
# DiscardMethod = ["OPTIONS","NOTIFY"]
# CustomHeader = ["X-CustomerIP","X-Billing"]
# SIPHeader = ["callid","callid_aleg","method","ruri_user","ruri_domain","from_user","from_domain","from_tag","to_user","to_domain","to_tag","via","contact_user"]
# LogDbg = "hep,sql,loki"
# LogLvl = "warning"
# ConfigHTTPAddr = "0.0.0.0:9876"
# -------------------------------------
# To hot reload PromTargetIP and PromTargetName run:
# killall -HUP heplify-server
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ require (
github.com/antonmedv/expr v1.8.8
github.com/buger/jsonparser v1.1.1
github.com/cespare/xxhash/v2 v2.1.1
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/fortytw2/leaktest v1.3.0 // indirect
github.com/go-sql-driver/mysql v1.5.0
github.com/gobwas/httphead v0.0.0-20180130184737-2c6c146eadee // indirect
github.com/gobwas/pool v0.2.0 // indirect
github.com/gobwas/ws v1.0.3
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.1
github.com/golang/snappy v0.0.4
github.com/lib/pq v1.7.0
github.com/mailru/easyjson v0.7.1 // indirect
github.com/negbie/cert v0.0.0-20190324145947-d1018a8fb00f
Expand All @@ -22,14 +23,13 @@ require (
github.com/olivere/elastic v6.2.33+incompatible
github.com/pelletier/go-toml v1.8.0
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.7.0
github.com/prometheus/client_golang v1.7.1
github.com/prometheus/common v0.10.0
github.com/robfig/cron/v3 v3.0.1
github.com/sipcapture/golua v0.0.0-20200610090950-538d24098d76
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
github.com/valyala/bytebufferpool v1.0.0
github.com/valyala/fasttemplate v1.1.1
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9
google.golang.org/genproto v0.0.0-20200619004808-3e7fca5c55db // indirect
google.golang.org/grpc v1.29.1
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
google.golang.org/grpc v1.46.0
)
78 changes: 78 additions & 0 deletions remotelog/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package remotelog

import (
"context"
"encoding/json"
"time"

"github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/negbie/logp"
"github.com/sipcapture/heplify-server/config"
"github.com/sipcapture/heplify-server/decoder"
)

type Kafka struct {
producer *kafka.Producer
topic string
ctx context.Context
}

func (k *Kafka) setup() error {
k.ctx = context.Background()
k.topic = config.Setting.KafkaTopic
return k.createProducer()
}

func (k *Kafka) start(hCh chan *decoder.HEP) {
logp.Info("start kafka...")
go func() {
for e := range k.producer.Events() {
switch ev := e.(type) {
case *kafka.Message:
if ev.TopicPartition.Error != nil {
logp.Err("Delivery failed: %v, payload: %s", ev.TopicPartition, string(ev.Value))
}
}
}
}()

ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for {
select {
case pkt, ok := <-hCh:
if !ok {
return
}

v, err := json.Marshal(pkt)
if err != nil {
logp.Err("%v", err)
}
k.producer.ProduceChannel() <- &kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &k.topic, Partition: kafka.PartitionAny},
Value: v,
}
case <-ticker.C:
if k.producer == nil {
k.createProducer()
}
}
}
}

func (k *Kafka) createProducer() error {
var err error
if len(config.Setting.KafkaBroker) > 0 && len(config.Setting.KafkaTopic) > 0 {
k.producer, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": config.Setting.KafkaBroker,
"delivery.timeout.ms": config.Setting.KafkaDeliveryTimeout,
"linger.ms": config.Setting.KafkaLinger,
"batch.num.messages": config.Setting.KafkaBatchNum,
})
}
if err != nil {
return err
}
return nil
}
1 change: 1 addition & 0 deletions remotelog/remotelog.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func New(name string) *Remotelog {
var register = map[string]RemoteHandler{
"elasticsearch": new(Elasticsearch),
"loki": new(Loki),
"kafka": new(Kafka),
}

return &Remotelog{
Expand Down
Loading