From efda556b659104ed2a49a23e9502379e3ed6df70 Mon Sep 17 00:00:00 2001 From: chaitanya6416 Date: Thu, 25 Jul 2024 10:21:54 +0530 Subject: [PATCH 1/8] feat: delivery-agent --- cmd/cx-delivery/main.go | 65 ++++++++++++++++++++++++ cmd/cx-processor/processor.go | 45 ++++++++++++++++ internal/configstore/channel_topics.go | 9 +++- internal/configstore/storefront_ifttt.go | 6 ++- internal/util/http.go | 6 ++- 5 files changed, 127 insertions(+), 4 deletions(-) create mode 100644 cmd/cx-delivery/main.go diff --git a/cmd/cx-delivery/main.go b/cmd/cx-delivery/main.go new file mode 100644 index 0000000..17736ac --- /dev/null +++ b/cmd/cx-delivery/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "encoding/json" + + "github.com/spf13/viper" + . "github.com/valmi-io/cx-pipeline/internal/log" + . "github.com/valmi-io/cx-pipeline/internal/msgbroker" + util "github.com/valmi-io/cx-pipeline/internal/util" +) + +type ChannelTopic struct { + LinkID string `json:"link_id"` + WriteKey string `json:"write_key"` + storefront string + channel string +} + +func Deliver() { + Log.Info().Msgf("delivery agent started") + + // subscribe to topics where channel=processor + jsonPayload := `{"channel_in": ["processor"], "channel_not_in": [""]}` + data, _, err := util.PostUrl( + viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", + []byte(jsonPayload), + util.SetConfigAuth, + nil) + if err != nil { + Log.Error().Msgf("Error fetching processor destination") + } + + var topicsToSubscribe []ChannelTopic + if err = json.Unmarshal([]byte(data), &topicsToSubscribe); err != nil { + Log.Error().Msgf("Error Unmarshalling topicsToSubscribe by delivery agent") + } + + // get write keys for channel=postgres + jsonPayload = `{"channel_in": ["postgres"], "channel_not_in": [""]}` + data, _, err = util.PostUrl( + viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", + []byte(jsonPayload), + util.SetConfigAuth, + nil) + if err != nil { + Log.Error().Msgf("Error fetching processor destination") + } + + var channelTopics []ChannelTopic + if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + + for _, ct := range channelTopics { + headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} + Log.Info().Msgf("************************* making 3049 with %v", headerItems) + eventBytes, _ := json.Marshal(event) + _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) + if err != nil { + Log.Error().Msgf("error sending request to Jitsu: %v", err) + } + } + +} diff --git a/cmd/cx-processor/processor.go b/cmd/cx-processor/processor.go index c59c22d..0a59eec 100644 --- a/cmd/cx-processor/processor.go +++ b/cmd/cx-processor/processor.go @@ -1,9 +1,54 @@ package main import ( + "encoding/json" + + "github.com/spf13/viper" . "github.com/valmi-io/cx-pipeline/internal/log" + util "github.com/valmi-io/cx-pipeline/internal/util" ) +type ChannelTopic struct { + LinkID string `json:"link_id"` + WriteKey string `json:"write_key"` + storefront string + channel string +} + func processor(msg string) { Log.Info().Msgf("processing msg %v", msg) + + var event map[string]interface{} + if unmarshalErr := json.Unmarshal([]byte(msg), &event); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + event["processed"] = true + + jsonPayload := `{"channel_in": ["processor"], "channel_not_in": [""]}` + data, _, err := util.PostUrl( + viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", + []byte(jsonPayload), + util.SetConfigAuth, + nil) + if err != nil { + Log.Error().Msgf("Error fetching processor destination") + } + + var channelTopics []ChannelTopic + if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + + for _, ct := range channelTopics { + headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} + Log.Info().Msgf("************************* making 3049 with %v", headerItems) + eventBytes, _ := json.Marshal(event) + _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) + if err != nil { + Log.Error().Msgf("error sending request to Jitsu: %v", err) + } + } + } diff --git a/internal/configstore/channel_topics.go b/internal/configstore/channel_topics.go index 866d110..e5ef8d6 100644 --- a/internal/configstore/channel_topics.go +++ b/internal/configstore/channel_topics.go @@ -31,7 +31,8 @@ func fetchChannelTopics(currentCT *ChannelTopics) []ChannelTopic { data, respCode, err := util.PostUrl( viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", []byte(jsonPayload), - util.SetConfigAuth) + util.SetConfigAuth, + nil) Log.Info().Msg(data) Log.Info().Msgf("%v", respCode) @@ -90,7 +91,7 @@ func initChannelTopics(wg *sync.WaitGroup) (*ChannelTopics, error) { d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL")) ticker := time.NewTicker(d) channelTopics := &ChannelTopics{done: make(chan bool)} - + firstTime := true wg.Add(1) go func() { defer wg.Done() @@ -101,12 +102,16 @@ func initChannelTopics(wg *sync.WaitGroup) (*ChannelTopics, error) { Log.Info().Msg("received done") return case t := <-ticker.C: + if !firstTime { + continue + } Log.Debug().Msgf("ChannelTopics Refresh Tick at %v", t) newChannels := fetchChannelTopics(channelTopics) channelTopics.mu.Lock() channelTopics.Channels = newChannels channelTopics.mu.Unlock() Log.Debug().Msgf("ChannelTopics Refresh Tick at %+v", channelTopics) + firstTime = false //testing // top := "in.id.clyszkfc70002zpa9ooq25gq1-5lef-ldkv-sP2mEg.m.batch.t.events" // if i != 0 { diff --git a/internal/configstore/storefront_ifttt.go b/internal/configstore/storefront_ifttt.go index c0a1815..03ab80b 100644 --- a/internal/configstore/storefront_ifttt.go +++ b/internal/configstore/storefront_ifttt.go @@ -48,7 +48,7 @@ func initStoreFrontIfttts(wg *sync.WaitGroup) (*StorefrontIfttts, error) { d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL")) ticker := time.NewTicker(d) storefrontIfttts := &StorefrontIfttts{done: make(chan bool)} - + firstTime := true wg.Add(1) go func() { defer wg.Done() @@ -59,6 +59,9 @@ func initStoreFrontIfttts(wg *sync.WaitGroup) (*StorefrontIfttts, error) { ticker.Stop() return case t := <-ticker.C: + if !firstTime { + continue + } Log.Debug().Msgf("StorefrontIfttts Refresh Tick at %v", t) newStorefrontIfttts, err := fetchIfttts() if err != nil { @@ -68,6 +71,7 @@ func initStoreFrontIfttts(wg *sync.WaitGroup) (*StorefrontIfttts, error) { storefrontIfttts.mu.Lock() storefrontIfttts.StoreIfttts = newStorefrontIfttts storefrontIfttts.mu.Unlock() + firstTime = false } } }() diff --git a/internal/util/http.go b/internal/util/http.go index 3531ac3..3e68e94 100644 --- a/internal/util/http.go +++ b/internal/util/http.go @@ -51,7 +51,7 @@ func GetUrl(url string, setAuth func(*http.Request)) (string, int, error) { } -func PostUrl(url string, body []byte, setAuth func(*http.Request)) (string, int, error) { +func PostUrl(url string, body []byte, setAuth func(*http.Request), headerItems map[string]string) (string, int, error) { client := http.Client{Timeout: 5 * time.Second} req, err := http.NewRequest(http.MethodPost, url, bytes.NewBuffer(body)) @@ -64,6 +64,10 @@ func PostUrl(url string, body []byte, setAuth func(*http.Request)) (string, int, setAuth(req) } + for headerKey, headerValue := range headerItems { + req.Header.Set(headerKey, headerValue) + } + resp, err := client.Do(req) if err != nil { panic(err) From ab7f05fb9d66560d47e4d6118748fbee2d8e8210 Mon Sep 17 00:00:00 2001 From: chaitanya6416 Date: Thu, 25 Jul 2024 10:29:22 +0530 Subject: [PATCH 2/8] feat: init topicMan --- cmd/cx-delivery/main.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/cx-delivery/main.go b/cmd/cx-delivery/main.go index 17736ac..fab9069 100644 --- a/cmd/cx-delivery/main.go +++ b/cmd/cx-delivery/main.go @@ -35,6 +35,8 @@ func Deliver() { Log.Error().Msgf("Error Unmarshalling topicsToSubscribe by delivery agent") } + // init topicMan, but topicMan needs processorFunc() :( + // get write keys for channel=postgres jsonPayload = `{"channel_in": ["postgres"], "channel_not_in": [""]}` data, _, err = util.PostUrl( From 000257b93edbff47193af62aaf2476b59027fe94 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 25 Jul 2024 17:25:19 +0530 Subject: [PATCH 3/8] feat: added service delivery agent --- cmd/cx-delivery/delivery.go | 53 ++++++++++++++++ cmd/cx-delivery/main.go | 121 ++++++++++++++++++++++-------------- 2 files changed, 128 insertions(+), 46 deletions(-) create mode 100644 cmd/cx-delivery/delivery.go diff --git a/cmd/cx-delivery/delivery.go b/cmd/cx-delivery/delivery.go new file mode 100644 index 0000000..93ac408 --- /dev/null +++ b/cmd/cx-delivery/delivery.go @@ -0,0 +1,53 @@ +package main + +import ( + "encoding/json" + + "github.com/spf13/viper" + . "github.com/valmi-io/cx-pipeline/internal/log" + util "github.com/valmi-io/cx-pipeline/internal/util" +) + +type ChannelTopic struct { + LinkID string `json:"link_id"` + WriteKey string `json:"write_key"` + storefront string + channel string +} + +func delivery(msg string) { + Log.Info().Msgf("processing msg %v", msg) + + var event map[string]interface{} + if unmarshalErr := json.Unmarshal([]byte(msg), &event); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + + jsonPayload := `{"channel_in": ["postgres"], "channel_not_in": [""]}` + data, _, err := util.PostUrl( + viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", + []byte(jsonPayload), + util.SetConfigAuth, + nil) + if err != nil { + Log.Error().Msgf("Error fetching processor destination") + } + + var channelTopics []ChannelTopic + if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil { + Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + return + } + + for _, ct := range channelTopics { + headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} + Log.Info().Msgf("************************* making 3049 with %v", headerItems) + eventBytes, _ := json.Marshal(event) + _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) + if err != nil { + Log.Error().Msgf("error sending request to Jitsu: %v", err) + } + } + +} diff --git a/cmd/cx-delivery/main.go b/cmd/cx-delivery/main.go index fab9069..0fa793f 100644 --- a/cmd/cx-delivery/main.go +++ b/cmd/cx-delivery/main.go @@ -1,67 +1,96 @@ package main import ( - "encoding/json" + "os" + "os/signal" + "syscall" "github.com/spf13/viper" + "github.com/valmi-io/cx-pipeline/internal/configstore" + "github.com/valmi-io/cx-pipeline/internal/env" . "github.com/valmi-io/cx-pipeline/internal/log" . "github.com/valmi-io/cx-pipeline/internal/msgbroker" - util "github.com/valmi-io/cx-pipeline/internal/util" ) -type ChannelTopic struct { - LinkID string `json:"link_id"` - WriteKey string `json:"write_key"` - storefront string - channel string -} - -func Deliver() { +func main() { Log.Info().Msgf("delivery agent started") - // subscribe to topics where channel=processor - jsonPayload := `{"channel_in": ["processor"], "channel_not_in": [""]}` - data, _, err := util.PostUrl( - viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", - []byte(jsonPayload), - util.SetConfigAuth, - nil) - if err != nil { - Log.Error().Msgf("Error fetching processor destination") - } + // // subscribe to topics where channel=processor + // jsonPayload := `{"channel_in": ["processor"], "channel_not_in": [""]}` + // data, _, err := util.PostUrl( + // viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", + // []byte(jsonPayload), + // util.SetConfigAuth, + // nil) + // if err != nil { + // Log.Error().Msgf("Error fetching processor destination") + // } - var topicsToSubscribe []ChannelTopic - if err = json.Unmarshal([]byte(data), &topicsToSubscribe); err != nil { - Log.Error().Msgf("Error Unmarshalling topicsToSubscribe by delivery agent") - } + // var topicsToSubscribe []ChannelTopic + // if err = json.Unmarshal([]byte(data), &topicsToSubscribe); err != nil { + // Log.Error().Msgf("Error Unmarshalling topicsToSubscribe by delivery agent") + // } + + // // init topicMan, but topicMan needs processorFunc() :( - // init topicMan, but topicMan needs processorFunc() :( + // // get write keys for channel=postgres + // jsonPayload = `{"channel_in": ["postgres"], "channel_not_in": [""]}` + // data, _, err = util.PostUrl( + // viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", + // []byte(jsonPayload), + // util.SetConfigAuth, + // nil) + // if err != nil { + // Log.Error().Msgf("Error fetching processor destination") + // } - // get write keys for channel=postgres - jsonPayload = `{"channel_in": ["postgres"], "channel_not_in": [""]}` - data, _, err = util.PostUrl( - viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", - []byte(jsonPayload), - util.SetConfigAuth, - nil) + // var channelTopics []ChannelTopic + // if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil { + // Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) + // return + // } + + // for _, ct := range channelTopics { + // headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} + // Log.Info().Msgf("************************* making 3049 with %v", headerItems) + // eventBytes, _ := json.Marshal(event) + // _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) + // if err != nil { + // Log.Error().Msgf("error sending request to Jitsu: %v", err) + // } + // } + // initialize environment & config variables + env.InitConfig() + Log.Info().Msg(viper.GetString("APP_BACKEND_URL")) + Log.Info().Msg(viper.GetString("KAFKA_BROKER")) + + // initialize ConfigStore + jsonPayload := `{"channel_in": ["processor"], "channel_not_in": ["x", "y"]}` + cs, err := configstore.Init(jsonPayload) if err != nil { - Log.Error().Msgf("Error fetching processor destination") + Log.Fatal().Msg(err.Error()) } - var channelTopics []ChannelTopic - if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil { - Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) - return + // initialize Broker + topicMan, err := InitBroker(delivery) + if err != nil { + Log.Fatal().Msg(err.Error()) } - for _, ct := range channelTopics { - headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} - Log.Info().Msgf("************************* making 3049 with %v", headerItems) - eventBytes, _ := json.Marshal(event) - _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) - if err != nil { - Log.Error().Msgf("error sending request to Jitsu: %v", err) - } - } + // Connect ConfigStore and Broker + cs.AttachTopicMan(topicMan) + + cleanupChan := make(chan bool) + + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + cleanupChan <- true + }() + <-cleanupChan + Log.Info().Msg("Received an interrupt signal, shutting down...") + cs.Close() // close ConfigStore to stop refreshing IFTTTs + topicMan.Close() // close broker topic management } From 4daa30a2055938236587c46d9c73313ab832bfe0 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 25 Jul 2024 17:26:21 +0530 Subject: [PATCH 4/8] feat: made changes for config store to make work for both processor and delivery agent --- cmd/cx-processor/main.go | 3 ++- internal/configstore/channel_topics.go | 7 +++---- internal/configstore/configstore.go | 4 ++-- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/cx-processor/main.go b/cmd/cx-processor/main.go index ac226c4..e9064d3 100644 --- a/cmd/cx-processor/main.go +++ b/cmd/cx-processor/main.go @@ -26,7 +26,8 @@ func main() { Log.Info().Msg(viper.GetString("KAFKA_BROKER")) // initialize ConfigStore - cs, err := configstore.Init() + jsonPayload := `{"channel_in": ["chatbox"], "channel_not_in": ["x", "y"]}` + cs, err := configstore.Init(jsonPayload) if err != nil { Log.Fatal().Msg(err.Error()) } diff --git a/internal/configstore/channel_topics.go b/internal/configstore/channel_topics.go index e5ef8d6..f997960 100644 --- a/internal/configstore/channel_topics.go +++ b/internal/configstore/channel_topics.go @@ -26,8 +26,7 @@ type ChannelTopics struct { topicMan *TopicMan } -func fetchChannelTopics(currentCT *ChannelTopics) []ChannelTopic { - jsonPayload := `{"channel_in": ["chatbox"], "channel_not_in": ["x", "y"]}` +func fetchChannelTopics(currentCT *ChannelTopics, jsonPayload string) []ChannelTopic { data, respCode, err := util.PostUrl( viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", []byte(jsonPayload), @@ -87,7 +86,7 @@ func matchChannelState(newCT []ChannelTopic, currentCT []ChannelTopic, topicMan // var i int = 0 -func initChannelTopics(wg *sync.WaitGroup) (*ChannelTopics, error) { +func initChannelTopics(wg *sync.WaitGroup, jsonPayload string) (*ChannelTopics, error) { d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL")) ticker := time.NewTicker(d) channelTopics := &ChannelTopics{done: make(chan bool)} @@ -106,7 +105,7 @@ func initChannelTopics(wg *sync.WaitGroup) (*ChannelTopics, error) { continue } Log.Debug().Msgf("ChannelTopics Refresh Tick at %v", t) - newChannels := fetchChannelTopics(channelTopics) + newChannels := fetchChannelTopics(channelTopics, jsonPayload) channelTopics.mu.Lock() channelTopics.Channels = newChannels channelTopics.mu.Unlock() diff --git a/internal/configstore/configstore.go b/internal/configstore/configstore.go index 6afdcdb..92b3a57 100644 --- a/internal/configstore/configstore.go +++ b/internal/configstore/configstore.go @@ -19,9 +19,9 @@ type ConfigStore struct { wg *sync.WaitGroup } -func Init() (*ConfigStore, error) { +func Init(jsonPayload string) (*ConfigStore, error) { var wg sync.WaitGroup - channelTopics, error := initChannelTopics(&wg) + channelTopics, error := initChannelTopics(&wg, jsonPayload) if error != nil { return nil, error } From 996ea935a71980c6fa6887aacfbcd6540aaa42fe Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 25 Jul 2024 17:37:05 +0530 Subject: [PATCH 5/8] feat: removed checks for testing --- internal/configstore/channel_topics.go | 5 ----- internal/configstore/storefront_ifttt.go | 5 ----- 2 files changed, 10 deletions(-) diff --git a/internal/configstore/channel_topics.go b/internal/configstore/channel_topics.go index f997960..4cac63d 100644 --- a/internal/configstore/channel_topics.go +++ b/internal/configstore/channel_topics.go @@ -90,7 +90,6 @@ func initChannelTopics(wg *sync.WaitGroup, jsonPayload string) (*ChannelTopics, d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL")) ticker := time.NewTicker(d) channelTopics := &ChannelTopics{done: make(chan bool)} - firstTime := true wg.Add(1) go func() { defer wg.Done() @@ -101,16 +100,12 @@ func initChannelTopics(wg *sync.WaitGroup, jsonPayload string) (*ChannelTopics, Log.Info().Msg("received done") return case t := <-ticker.C: - if !firstTime { - continue - } Log.Debug().Msgf("ChannelTopics Refresh Tick at %v", t) newChannels := fetchChannelTopics(channelTopics, jsonPayload) channelTopics.mu.Lock() channelTopics.Channels = newChannels channelTopics.mu.Unlock() Log.Debug().Msgf("ChannelTopics Refresh Tick at %+v", channelTopics) - firstTime = false //testing // top := "in.id.clyszkfc70002zpa9ooq25gq1-5lef-ldkv-sP2mEg.m.batch.t.events" // if i != 0 { diff --git a/internal/configstore/storefront_ifttt.go b/internal/configstore/storefront_ifttt.go index 03ab80b..38ecd5d 100644 --- a/internal/configstore/storefront_ifttt.go +++ b/internal/configstore/storefront_ifttt.go @@ -48,7 +48,6 @@ func initStoreFrontIfttts(wg *sync.WaitGroup) (*StorefrontIfttts, error) { d, _ := time.ParseDuration(viper.GetString("CONFIG_REFRESH_INTERVAL")) ticker := time.NewTicker(d) storefrontIfttts := &StorefrontIfttts{done: make(chan bool)} - firstTime := true wg.Add(1) go func() { defer wg.Done() @@ -59,9 +58,6 @@ func initStoreFrontIfttts(wg *sync.WaitGroup) (*StorefrontIfttts, error) { ticker.Stop() return case t := <-ticker.C: - if !firstTime { - continue - } Log.Debug().Msgf("StorefrontIfttts Refresh Tick at %v", t) newStorefrontIfttts, err := fetchIfttts() if err != nil { @@ -71,7 +67,6 @@ func initStoreFrontIfttts(wg *sync.WaitGroup) (*StorefrontIfttts, error) { storefrontIfttts.mu.Lock() storefrontIfttts.StoreIfttts = newStorefrontIfttts storefrontIfttts.mu.Unlock() - firstTime = false } } }() From 3263e7bea74193a148bfc3124c66e466354ddcf8 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 25 Jul 2024 17:50:06 +0530 Subject: [PATCH 6/8] feat: removed unnecessary comments --- cmd/cx-delivery/main.go | 46 ----------------------------------------- 1 file changed, 46 deletions(-) diff --git a/cmd/cx-delivery/main.go b/cmd/cx-delivery/main.go index 0fa793f..19fd159 100644 --- a/cmd/cx-delivery/main.go +++ b/cmd/cx-delivery/main.go @@ -14,52 +14,6 @@ import ( func main() { Log.Info().Msgf("delivery agent started") - - // // subscribe to topics where channel=processor - // jsonPayload := `{"channel_in": ["processor"], "channel_not_in": [""]}` - // data, _, err := util.PostUrl( - // viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", - // []byte(jsonPayload), - // util.SetConfigAuth, - // nil) - // if err != nil { - // Log.Error().Msgf("Error fetching processor destination") - // } - - // var topicsToSubscribe []ChannelTopic - // if err = json.Unmarshal([]byte(data), &topicsToSubscribe); err != nil { - // Log.Error().Msgf("Error Unmarshalling topicsToSubscribe by delivery agent") - // } - - // // init topicMan, but topicMan needs processorFunc() :( - - // // get write keys for channel=postgres - // jsonPayload = `{"channel_in": ["postgres"], "channel_not_in": [""]}` - // data, _, err = util.PostUrl( - // viper.GetString("APP_BACKEND_URL")+"/api/v1/superuser/channeltopics", - // []byte(jsonPayload), - // util.SetConfigAuth, - // nil) - // if err != nil { - // Log.Error().Msgf("Error fetching processor destination") - // } - - // var channelTopics []ChannelTopic - // if unmarshalErr := json.Unmarshal([]byte(data), &channelTopics); unmarshalErr != nil { - // Log.Error().Msgf("Error Unmarshalling event: %v", unmarshalErr) - // return - // } - - // for _, ct := range channelTopics { - // headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} - // Log.Info().Msgf("************************* making 3049 with %v", headerItems) - // eventBytes, _ := json.Marshal(event) - // _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) - // if err != nil { - // Log.Error().Msgf("error sending request to Jitsu: %v", err) - // } - // } - // initialize environment & config variables env.InitConfig() Log.Info().Msg(viper.GetString("APP_BACKEND_URL")) Log.Info().Msg(viper.GetString("KAFKA_BROKER")) From ca353c3114ddcf4b5016d2d56dd195d1267e73c7 Mon Sep 17 00:00:00 2001 From: supradeep2819 Date: Thu, 25 Jul 2024 18:00:08 +0530 Subject: [PATCH 7/8] feat: removed unecessary comments --- cmd/cx-delivery/delivery.go | 1 - cmd/cx-processor/processor.go | 1 - 2 files changed, 2 deletions(-) diff --git a/cmd/cx-delivery/delivery.go b/cmd/cx-delivery/delivery.go index 93ac408..b32cff0 100644 --- a/cmd/cx-delivery/delivery.go +++ b/cmd/cx-delivery/delivery.go @@ -42,7 +42,6 @@ func delivery(msg string) { for _, ct := range channelTopics { headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} - Log.Info().Msgf("************************* making 3049 with %v", headerItems) eventBytes, _ := json.Marshal(event) _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) if err != nil { diff --git a/cmd/cx-processor/processor.go b/cmd/cx-processor/processor.go index 0a59eec..d3c71eb 100644 --- a/cmd/cx-processor/processor.go +++ b/cmd/cx-processor/processor.go @@ -43,7 +43,6 @@ func processor(msg string) { for _, ct := range channelTopics { headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} - Log.Info().Msgf("************************* making 3049 with %v", headerItems) eventBytes, _ := json.Marshal(event) _, _, err = util.PostUrl("http://localhost:3049/api/s/s2s/event", eventBytes, nil, headerItems) if err != nil { From 47a8bcc3120227c3935ab68ba22e62e2123efa96 Mon Sep 17 00:00:00 2001 From: chaitanya6416 Date: Thu, 25 Jul 2024 18:04:09 +0530 Subject: [PATCH 8/8] feat: add TODO comment --- cmd/cx-delivery/delivery.go | 1 + cmd/cx-processor/processor.go | 1 + 2 files changed, 2 insertions(+) diff --git a/cmd/cx-delivery/delivery.go b/cmd/cx-delivery/delivery.go index b32cff0..9e44f02 100644 --- a/cmd/cx-delivery/delivery.go +++ b/cmd/cx-delivery/delivery.go @@ -40,6 +40,7 @@ func delivery(msg string) { return } + // TODO: send the msg to only particular store write_key(delivery) for _, ct := range channelTopics { headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} eventBytes, _ := json.Marshal(event) diff --git a/cmd/cx-processor/processor.go b/cmd/cx-processor/processor.go index d3c71eb..10531d7 100644 --- a/cmd/cx-processor/processor.go +++ b/cmd/cx-processor/processor.go @@ -41,6 +41,7 @@ func processor(msg string) { return } + // TODO: send the msg to only particular store write_key(processor) for _, ct := range channelTopics { headerItems := map[string]string{"Content-Type": "application/json", "X-Write-Key": ct.WriteKey} eventBytes, _ := json.Marshal(event)