diff --git a/network/nsq_client.go b/network/nsq_client.go index 7e8bd92..95f2f93 100644 --- a/network/nsq_client.go +++ b/network/nsq_client.go @@ -10,7 +10,6 @@ import ( "strconv" "strings" - "github.com/nsqio/nsq/nsqd" "github.com/rs/zerolog" ) @@ -23,11 +22,78 @@ type NSQClient struct { // to NSQ's /stats endpoint, including the number of items in each // topic and queue. type NSQStatsData struct { - Version string `json:"version"` - Health string `json:"health"` - StartTime uint64 `json:"start_time"` - Topics []*nsqd.TopicStats `json:"topics"` - Info *NSQInfo `json:"info"` + Version string `json:"version"` + Health string `json:"health"` + StartTime uint64 `json:"start_time"` + Topics []NSQTopicStats `json:"topics"` + Info *NSQInfo `json:"info"` +} + +type NSQTopicStats struct { + TopicName string `json:"topic_name"` + Channels []NSQChannelStats `json:"channels"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + MessageCount uint64 `json:"message_count"` + MessageBytes uint64 `json:"message_bytes"` + Paused bool `json:"paused"` + + E2eProcessingLatency QuantileResult `json:"e2e_processing_latency"` +} + +type NSQChannelStats struct { + ChannelName string `json:"channel_name"` + Depth int64 `json:"depth"` + BackendDepth int64 `json:"backend_depth"` + InFlightCount int `json:"in_flight_count"` + DeferredCount int `json:"deferred_count"` + MessageCount uint64 `json:"message_count"` + RequeueCount uint64 `json:"requeue_count"` + TimeoutCount uint64 `json:"timeout_count"` + ClientCount int `json:"client_count"` + Clients []NSQClientV2Stats `json:"clients"` + Paused bool `json:"paused"` + + E2eProcessingLatency QuantileResult `json:"e2e_processing_latency"` +} + +type QuantileResult struct { + Count int `json:"count"` + Percentiles []map[string]float64 `json:"percentiles"` +} + +type NSQClientV2Stats struct { + ClientID string `json:"client_id"` + Hostname string `json:"hostname"` + Version string `json:"version"` + RemoteAddress string `json:"remote_address"` + State int32 `json:"state"` + ReadyCount int64 `json:"ready_count"` + InFlightCount int64 `json:"in_flight_count"` + MessageCount uint64 `json:"message_count"` + FinishCount uint64 `json:"finish_count"` + RequeueCount uint64 `json:"requeue_count"` + ConnectTime int64 `json:"connect_ts"` + SampleRate int32 `json:"sample_rate"` + Deflate bool `json:"deflate"` + Snappy bool `json:"snappy"` + UserAgent string `json:"user_agent"` + Authed bool `json:"authed,omitempty"` + AuthIdentity string `json:"auth_identity,omitempty"` + AuthIdentityURL string `json:"auth_identity_url,omitempty"` + + PubCounts []PubCount `json:"pub_counts,omitempty"` + + TLS bool `json:"tls"` + CipherSuite string `json:"tls_cipher_suite"` + TLSVersion string `json:"tls_version"` + TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"` + TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"` +} + +type PubCount struct { + Topic string `json:"topic"` + Count uint64 `json:"count"` } type NSQInfo struct { @@ -39,16 +105,16 @@ type NSQInfo struct { StartTime int64 `json:"start_time"` } -func (data *NSQStatsData) GetTopic(name string) *nsqd.TopicStats { +func (data *NSQStatsData) GetTopic(name string) *NSQTopicStats { for _, topic := range data.Topics { if topic.TopicName == name { - return topic + return &topic } } return nil } -func (data *NSQStatsData) GetChannel(topicName, channelName string) *nsqd.ChannelStats { +func (data *NSQStatsData) GetChannel(topicName, channelName string) *NSQChannelStats { topic := data.GetTopic(topicName) if topic != nil { for _, channel := range topic.Channels { @@ -64,7 +130,7 @@ func (data *NSQStatsData) ClientIsRunning(hostname string) bool { for _, topic := range data.Topics { for _, channel := range topic.Channels { for _, client := range channel.Clients { - if strings.Contains(client.String(), hostname) { + if strings.Contains(client.Hostname, hostname) { return true } }