Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 76 additions & 10 deletions network/nsq_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strconv"
"strings"

"github.com/nsqio/nsq/nsqd"
"github.com/rs/zerolog"
)

Expand All @@ -23,11 +22,78 @@ type NSQClient struct {
// to NSQ's /stats endpoint, including the number of items in each
// topic and queue.
type NSQStatsData struct {
Version string `json:"version"`
Health string `json:"health"`
StartTime uint64 `json:"start_time"`
Topics []*nsqd.TopicStats `json:"topics"`
Info *NSQInfo `json:"info"`
Version string `json:"version"`
Health string `json:"health"`
StartTime uint64 `json:"start_time"`
Topics []NSQTopicStats `json:"topics"`
Info *NSQInfo `json:"info"`
}

type NSQTopicStats struct {
TopicName string `json:"topic_name"`
Channels []NSQChannelStats `json:"channels"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
MessageCount uint64 `json:"message_count"`
MessageBytes uint64 `json:"message_bytes"`
Paused bool `json:"paused"`

E2eProcessingLatency QuantileResult `json:"e2e_processing_latency"`
}

type NSQChannelStats struct {
ChannelName string `json:"channel_name"`
Depth int64 `json:"depth"`
BackendDepth int64 `json:"backend_depth"`
InFlightCount int `json:"in_flight_count"`
DeferredCount int `json:"deferred_count"`
MessageCount uint64 `json:"message_count"`
RequeueCount uint64 `json:"requeue_count"`
TimeoutCount uint64 `json:"timeout_count"`
ClientCount int `json:"client_count"`
Clients []NSQClientV2Stats `json:"clients"`
Paused bool `json:"paused"`

E2eProcessingLatency QuantileResult `json:"e2e_processing_latency"`
}

type QuantileResult struct {
Count int `json:"count"`
Percentiles []map[string]float64 `json:"percentiles"`
}

type NSQClientV2Stats struct {
ClientID string `json:"client_id"`
Hostname string `json:"hostname"`
Version string `json:"version"`
RemoteAddress string `json:"remote_address"`
State int32 `json:"state"`
ReadyCount int64 `json:"ready_count"`
InFlightCount int64 `json:"in_flight_count"`
MessageCount uint64 `json:"message_count"`
FinishCount uint64 `json:"finish_count"`
RequeueCount uint64 `json:"requeue_count"`
ConnectTime int64 `json:"connect_ts"`
SampleRate int32 `json:"sample_rate"`
Deflate bool `json:"deflate"`
Snappy bool `json:"snappy"`
UserAgent string `json:"user_agent"`
Authed bool `json:"authed,omitempty"`
AuthIdentity string `json:"auth_identity,omitempty"`
AuthIdentityURL string `json:"auth_identity_url,omitempty"`

PubCounts []PubCount `json:"pub_counts,omitempty"`

TLS bool `json:"tls"`
CipherSuite string `json:"tls_cipher_suite"`
TLSVersion string `json:"tls_version"`
TLSNegotiatedProtocol string `json:"tls_negotiated_protocol"`
TLSNegotiatedProtocolIsMutual bool `json:"tls_negotiated_protocol_is_mutual"`
}

type PubCount struct {
Topic string `json:"topic"`
Count uint64 `json:"count"`
}

type NSQInfo struct {
Expand All @@ -39,16 +105,16 @@ type NSQInfo struct {
StartTime int64 `json:"start_time"`
}

func (data *NSQStatsData) GetTopic(name string) *nsqd.TopicStats {
func (data *NSQStatsData) GetTopic(name string) *NSQTopicStats {
for _, topic := range data.Topics {
if topic.TopicName == name {
return topic
return &topic
}
}
return nil
}

func (data *NSQStatsData) GetChannel(topicName, channelName string) *nsqd.ChannelStats {
func (data *NSQStatsData) GetChannel(topicName, channelName string) *NSQChannelStats {
topic := data.GetTopic(topicName)
if topic != nil {
for _, channel := range topic.Channels {
Expand All @@ -64,7 +130,7 @@ func (data *NSQStatsData) ClientIsRunning(hostname string) bool {
for _, topic := range data.Topics {
for _, channel := range topic.Channels {
for _, client := range channel.Clients {
if strings.Contains(client.String(), hostname) {
if strings.Contains(client.Hostname, hostname) {
return true
}
}
Expand Down