diff --git a/cmd/cx-delivery/delivery.go b/cmd/cx-delivery/delivery.go new file mode 100644 index 0000000..9e44f02 --- /dev/null +++ b/cmd/cx-delivery/delivery.go @@ -0,0 +1,53 @@ +package main + +import ( + "encoding/json" + + "github.com/spf13/viper" + . "github.com/valmi-io/cx-pipeline/internal/log" + util "github.com/valmi-io/cx-pipeline/internal/util" +) + +type ChannelTopic struct { + LinkID string `json:"link_id"` + WriteKey string `json:"write_key"` + storefront string + channel string +} + +func delivery(msg string) { + Log.Info().Msgf("processing msg %v", msg) + + var event map[string]interface{} + if unmarshalErr := json.Unmarshal([]byte(msg), &event); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + + jsonPayload := `{"channel_in": ["postgres"], "channel_not_in": [""]}` + data, _, err := util.PostUrl( + viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", + []byte(jsonPayload), + util.SetConfigAuth, + nil) + if err != nil { + Log.Error().Msgf("Error fetching processor destination") + } + + var channelTopics []ChannelTopic + if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + + // TODO: send the msg to only particular store write_key(delivery) + for _, ct := range channelTopics { + headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} + eventBytes, _ := json.Marshal(event) + _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) + if err != nil { + Log.Error().Msgf("error sending request to Jitsu: %v", err) + } + } + +} diff --git a/cmd/cx-delivery/main.go b/cmd/cx-delivery/main.go new file mode 100644 index 0000000..19fd159 --- /dev/null +++ b/cmd/cx-delivery/main.go @@ -0,0 +1,50 @@ +package main + +import ( + "os" + "os/signal" + "syscall" + + "github.com/spf13/viper" + "github.com/valmi-io/cx-pipeline/internal/configstore" + "github.com/valmi-io/cx-pipeline/internal/env" + . "github.com/valmi-io/cx-pipeline/internal/log" + . "github.com/valmi-io/cx-pipeline/internal/msgbroker" +) + +func main() { + Log.Info().Msgf("delivery agent started") + env.InitConfig() + Log.Info().Msg(viper.GetString("APP_BACKEND_URL")) + Log.Info().Msg(viper.GetString("KAFKA_BROKER")) + + // initialize ConfigStore + jsonPayload := `{"channel_in": ["processor"], "channel_not_in": ["x", "y"]}` + cs, err := configstore.Init(jsonPayload) + if err != nil { + Log.Fatal().Msg(err.Error()) + } + + // initialize Broker + topicMan, err := InitBroker(delivery) + if err != nil { + Log.Fatal().Msg(err.Error()) + } + + // Connect ConfigStore and Broker + cs.AttachTopicMan(topicMan) + + cleanupChan := make(chan bool) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + cleanupChan <- true + }() + + <-cleanupChan + Log.Info().Msg("Received an interrupt signal, shutting down...") + cs.Close() // close ConfigStore to stop refreshing IFTTTs + topicMan.Close() // close broker topic management +} diff --git a/cmd/cx-processor/main.go b/cmd/cx-processor/main.go index ac226c4..e9064d3 100644 --- a/cmd/cx-processor/main.go +++ b/cmd/cx-processor/main.go @@ -26,7 +26,8 @@ func main() { Log.Info().Msg(viper.GetString("KAFKA_BROKER")) // initialize ConfigStore - cs, err := configstore.Init() + jsonPayload := `{"channel_in": ["chatbox"], "channel_not_in": ["x", "y"]}` + cs, err := configstore.Init(jsonPayload) if err != nil { Log.Fatal().Msg(err.Error()) } diff --git a/cmd/cx-processor/processor.go b/cmd/cx-processor/processor.go index c59c22d..10531d7 100644 --- a/cmd/cx-processor/processor.go +++ b/cmd/cx-processor/processor.go @@ -1,9 +1,54 @@ package main import ( + "encoding/json" + + "github.com/spf13/viper" . "github.com/valmi-io/cx-pipeline/internal/log" + util "github.com/valmi-io/cx-pipeline/internal/util" ) +type ChannelTopic struct { + LinkID string `json:"link_id"` + WriteKey string `json:"write_key"` + storefront string + channel string +} + func processor(msg string) { Log.Info().Msgf("processing msg %v", msg) + + var event map[string]interface{} + if unmarshalErr := json.Unmarshal([]byte(msg), &event); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + event["processed"] = true + + jsonPayload := `{"channel_in": ["processor"], "channel_not_in": [""]}` + data, _, err := util.PostUrl( + viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", + []byte(jsonPayload), + util.SetConfigAuth, + nil) + if err != nil { + Log.Error().Msgf("Error fetching processor destination") + } + + var channelTopics []ChannelTopic + if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + + // TODO: send the msg to only particular store write_key(processor) + for _, ct := range channelTopics { + headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} + eventBytes, _ := json.Marshal(event) + _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) + if err != nil { + Log.Error().Msgf("error sending request to Jitsu: %v", err) + } + } + } diff --git a/internal/configstore/channel_topics.go b/internal/configstore/channel_topics.go index 866d110..4cac63d 100644 --- a/internal/configstore/channel_topics.go +++ b/internal/configstore/channel_topics.go @@ -26,12 +26,12 @@ type ChannelTopics struct { topicMan *TopicMan } -func fetchChannelTopics(currentCT *ChannelTopics) []ChannelTopic { - jsonPayload := `{"channel_in": ["chatbox"], "channel_not_in": ["x", "y"]}` +func fetchChannelTopics(currentCT *ChannelTopics, jsonPayload string) []ChannelTopic { data, respCode, err := util.PostUrl( viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", []byte(jsonPayload), - util.SetConfigAuth) + util.SetConfigAuth, + nil) Log.Info().Msg(data) Log.Info().Msgf("%v", respCode) @@ -86,11 +86,10 @@ func matchChannelState(newCT []ChannelTopic, currentCT []ChannelTopic, topicMan // var i int = 0 -func initChannelTopics(wg *sync.WaitGroup) (*ChannelTopics, error) { +func initChannelTopics(wg *sync.WaitGroup, jsonPayload string) (*ChannelTopics, error) { d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL")) ticker := time.NewTicker(d) channelTopics := &ChannelTopics{done: make(chan bool)} - wg.Add(1) go func() { defer wg.Done() @@ -102,7 +101,7 @@ func initChannelTopics(wg *sync.WaitGroup) (*ChannelTopics, error) { return case t := <-ticker.C: Log.Debug().Msgf("ChannelTopics Refresh Tick at %v", t) - newChannels := fetchChannelTopics(channelTopics) + newChannels := fetchChannelTopics(channelTopics, jsonPayload) channelTopics.mu.Lock() channelTopics.Channels = newChannels channelTopics.mu.Unlock() diff --git a/internal/configstore/configstore.go b/internal/configstore/configstore.go index 6afdcdb..92b3a57 100644 --- a/internal/configstore/configstore.go +++ b/internal/configstore/configstore.go @@ -19,9 +19,9 @@ type ConfigStore struct { wg *sync.WaitGroup } -func Init() (*ConfigStore, error) { +func Init(jsonPayload string) (*ConfigStore, error) { var wg sync.WaitGroup - channelTopics, error := initChannelTopics(&wg) + channelTopics, error := initChannelTopics(&wg, jsonPayload) if error != nil { return nil, error } diff --git a/internal/configstore/storefront_ifttt.go b/internal/configstore/storefront_ifttt.go index c0a1815..38ecd5d 100644 --- a/internal/configstore/storefront_ifttt.go +++ b/internal/configstore/storefront_ifttt.go @@ -48,7 +48,6 @@ func initStoreFrontIfttts(wg *sync.WaitGroup) (*StorefrontIfttts, error) { d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL")) ticker := time.NewTicker(d) storefrontIfttts := &StorefrontIfttts{done: make(chan bool)} - wg.Add(1) go func() { defer wg.Done() diff --git a/internal/util/http.go b/internal/util/http.go index 3531ac3..3e68e94 100644 --- a/internal/util/http.go +++ b/internal/util/http.go @@ -51,7 +51,7 @@ func GetUrl(url string, setAuth func(*http.Request)) (string, int, error) { } -func PostUrl(url string, body []byte, setAuth func(*http.Request)) (string, int, error) { +func PostUrl(url string, body []byte, setAuth func(*http.Request), headerItems map[string]string) (string, int, error) { client := http.Client{Timeout: 5 * time.Second} req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(body)) @@ -64,6 +64,10 @@ func PostUrl(url string, body []byte, setAuth func(*http.Request)) (string, int, setAuth(req) } + for headerKey, headerValue := range headerItems { + req.Header.Set(headerKey, headerValue) + } + resp, err := client.Do(req) if err != nil { panic(err)