From 8913012c21d5210993ac6dce3c99eb8a4a823346 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Fri, 11 Jul 2025 16:03:45 +0700 Subject: [PATCH 01/12] chore: set up boilerplate destazureservicebus --- build/dev/azure/config.json | 11 ++ .../azure_servicebus/instructions.md | 1 + .../providers/azure_servicebus/metadata.json | 27 +++++ .../destazureservicebus.go | 98 +++++++++++++++++ .../destazureservicebus_publish_test.go | 103 ++++++++++++++++++ internal/util/testinfra/azure.go | 7 ++ 6 files changed, 247 insertions(+) create mode 100644 internal/destregistry/metadata/providers/azure_servicebus/instructions.md create mode 100644 internal/destregistry/metadata/providers/azure_servicebus/metadata.json create mode 100644 internal/destregistry/providers/destazureservicebus/destazureservicebus.go create mode 100644 internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go diff --git a/build/dev/azure/config.json b/build/dev/azure/config.json index a8419d68..de7cff6d 100644 --- a/build/dev/azure/config.json +++ b/build/dev/azure/config.json @@ -53,6 +53,17 @@ "Rules": [] } ] + }, + { + "Name": "TestDestinationAzureServiceBusSuite-topic", + "Properties": {}, + "Subscriptions": [ + { + "Name": "TestDestinationAzureServiceBusSuite-subscription", + "Properties": {}, + "Rules": [] + } + ] } ] } diff --git a/internal/destregistry/metadata/providers/azure_servicebus/instructions.md b/internal/destregistry/metadata/providers/azure_servicebus/instructions.md new file mode 100644 index 00000000..2115ebca --- /dev/null +++ b/internal/destregistry/metadata/providers/azure_servicebus/instructions.md @@ -0,0 +1 @@ +# Azure ServiceBus Configuration Instructions diff --git a/internal/destregistry/metadata/providers/azure_servicebus/metadata.json b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json new file mode 100644 index 00000000..532142fe --- /dev/null +++ b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json @@ -0,0 +1,27 @@ +{ + "type": "azure_servicebus", + "config_fields": [ + { + "key": "topic", + "type": "text", + "label": "Topic Name", + "description": "The name of the Azure Service Bus topic to publish to", + "required": true, + "pattern": "^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$" + } + ], + "credential_fields": [ + { + "key": "connection_string", + "type": "text", + "label": "Connection String", + "description": "Azure Service Bus connection string with Send permissions", + "required": true, + "sensitive": true + } + ], + "label": "Azure Service Bus", + "link": "https://azure.microsoft.com/en-us/services/service-bus/", + "description": "Send events to Azure Service Bus topics for reliable cloud messaging between applications and services.", + "icon": "" +} diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go new file mode 100644 index 00000000..9e6ff462 --- /dev/null +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -0,0 +1,98 @@ +package destazureservicebus + +import ( + "context" + "fmt" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/models" +) + +type AzureServiceBusDestination struct { + *destregistry.BaseProvider +} + +type AzureServiceBusDestinationConfig struct { + Topic string +} + +type AzureServiceBusDestinationCredentials struct { + ConnectionString string +} + +var _ destregistry.Provider = (*AzureServiceBusDestination)(nil) + +func New(loader metadata.MetadataLoader) (*AzureServiceBusDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "azure_servicebus") + if err != nil { + return nil, err + } + + return &AzureServiceBusDestination{ + BaseProvider: base, + }, nil +} + +func (d *AzureServiceBusDestination) Validate(ctx context.Context, destination *models.Destination) error { + // For phase 1, just call base validation + return d.BaseProvider.Validate(ctx, destination) +} + +func (d *AzureServiceBusDestination) CreatePublisher(ctx context.Context, destination *models.Destination) (destregistry.Publisher, error) { + cfg, creds, err := d.resolveMetadata(ctx, destination) + if err != nil { + return nil, err + } + + return &AzureServiceBusPublisher{ + BasePublisher: &destregistry.BasePublisher{}, + connectionString: creds.ConnectionString, + topic: cfg.Topic, + }, nil +} + +func (d *AzureServiceBusDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { + if topic, ok := destination.Config["topic"]; ok { + return destregistry.DestinationTarget{ + Target: topic, + TargetURL: "", + } + } + return destregistry.DestinationTarget{} +} + +func (d *AzureServiceBusDestination) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *destregistry.PreprocessDestinationOpts) error { + // Phase 1: empty implementation + return nil +} + +func (d *AzureServiceBusDestination) resolveMetadata(ctx context.Context, destination *models.Destination) (*AzureServiceBusDestinationConfig, *AzureServiceBusDestinationCredentials, error) { + if err := d.BaseProvider.Validate(ctx, destination); err != nil { + return nil, nil, err + } + + return &AzureServiceBusDestinationConfig{ + Topic: destination.Config["topic"], + }, &AzureServiceBusDestinationCredentials{ + ConnectionString: destination.Credentials["connection_string"], + }, nil +} + +// Publisher implementation +type AzureServiceBusPublisher struct { + *destregistry.BasePublisher + connectionString string + topic string +} + +func (p *AzureServiceBusPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { + // Phase 1: minimal implementation that returns an error + return nil, fmt.Errorf("Azure Service Bus publishing not yet implemented") +} + +func (p *AzureServiceBusPublisher) Close() error { + p.BasePublisher.StartClose() + // Phase 1: empty implementation + return nil +} diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go new file mode 100644 index 00000000..ae7c8582 --- /dev/null +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go @@ -0,0 +1,103 @@ +package destazureservicebus_test + +import ( + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" + testsuite "github.com/hookdeck/outpost/internal/destregistry/testing" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testinfra" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// AzureServiceBusConsumer implements testsuite.MessageConsumer +type AzureServiceBusConsumer struct { + msgChan chan testsuite.Message + done chan struct{} +} + +func NewAzureServiceBusConsumer() *AzureServiceBusConsumer { + c := &AzureServiceBusConsumer{ + msgChan: make(chan testsuite.Message), + done: make(chan struct{}), + } + // Phase 1: Mock consumer that doesn't actually consume + return c +} + +func (c *AzureServiceBusConsumer) Consume() <-chan testsuite.Message { + return c.msgChan +} + +func (c *AzureServiceBusConsumer) Close() error { + close(c.done) + return nil +} + +type AzureServiceBusAsserter struct{} + +func (a *AzureServiceBusAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, event models.Event) { + // Phase 1: Basic assertion structure + metadata := msg.Metadata + + // Verify system metadata + assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") + assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") + assert.Equal(t, event.Topic, metadata["topic"], "topic should match") + + // Verify custom metadata + for k, v := range event.Metadata { + assert.Equal(t, v, metadata[k], "metadata key %s should match expected value", k) + } +} + +type AzureServiceBusSuite struct { + testsuite.PublisherSuite + consumer *AzureServiceBusConsumer +} + +func TestDestinationAzureServiceBusSuite(t *testing.T) { + suite.Run(t, new(AzureServiceBusSuite)) +} + +func (s *AzureServiceBusSuite) SetupSuite() { + t := s.T() + t.Cleanup(testinfra.Start(t)) + mqConfig := testinfra.GetMQAzureConfig(t, "TestDestinationAzureServiceBusSuite") + + // Create consumer + s.consumer = NewAzureServiceBusConsumer() + + // Create provider + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + require.NoError(t, err) + + // Create destination + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("azure_servicebus"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "topic": mqConfig.AzureServiceBus.Topic, + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "connection_string": mqConfig.AzureServiceBus.ConnectionString, + }), + ) + + // Initialize publisher suite with custom asserter + cfg := testsuite.Config{ + Provider: provider, + Dest: &destination, + Consumer: s.consumer, + Asserter: &AzureServiceBusAsserter{}, + } + s.InitSuite(cfg) +} + +func (s *AzureServiceBusSuite) TearDownSuite() { + if s.consumer != nil { + _ = s.consumer.Close() + } +} diff --git a/internal/util/testinfra/azure.go b/internal/util/testinfra/azure.go index 7fe3de4f..68549403 100644 --- a/internal/util/testinfra/azure.go +++ b/internal/util/testinfra/azure.go @@ -71,6 +71,13 @@ func GetMQAzureConfig(t *testing.T, testName string) mqs.QueueConfig { Subscription: "TestIntegrationMQ_AzureServiceBus-subscription", }, }, + "TestDestinationAzureServiceBusSuite": { + AzureServiceBus: &mqs.AzureServiceBusConfig{ + ConnectionString: connString, + Topic: "TestDestinationAzureServiceBusSuite-topic", + Subscription: "TestDestinationAzureServiceBusSuite-subscription", + }, + }, } if cfg, ok := azureSBMap[testName]; !ok { From 18be47ff8cf2f3a3d53cd67a30714f042cdb26fe Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 15 Jul 2025 09:48:05 +0700 Subject: [PATCH 02/12] feat: naive destazureservicebus implementation --- .../destazureservicebus.go | 99 ++++++++++++++++++- .../destazureservicebus_publish_test.go | 96 ++++++++++++++++-- 2 files changed, 183 insertions(+), 12 deletions(-) diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go index 9e6ff462..3996c72b 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -2,8 +2,11 @@ package destazureservicebus import ( "context" + "encoding/json" "fmt" + "time" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/hookdeck/outpost/internal/destregistry" "github.com/hookdeck/outpost/internal/destregistry/metadata" "github.com/hookdeck/outpost/internal/models" @@ -63,7 +66,6 @@ func (d *AzureServiceBusDestination) ComputeTarget(destination *models.Destinati } func (d *AzureServiceBusDestination) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *destregistry.PreprocessDestinationOpts) error { - // Phase 1: empty implementation return nil } @@ -84,15 +86,104 @@ type AzureServiceBusPublisher struct { *destregistry.BasePublisher connectionString string topic string + client *azservicebus.Client + sender *azservicebus.Sender +} + +func (p *AzureServiceBusPublisher) ensureSender() (*azservicebus.Sender, error) { + if p.client == nil { + client, err := azservicebus.NewClientFromConnectionString(p.connectionString, nil) + if err != nil { + return nil, fmt.Errorf("failed to create Azure Service Bus client: %w", err) + } + p.client = client + } + + if p.sender == nil { + sender, err := p.client.NewSender(p.topic, nil) + if err != nil { + return nil, fmt.Errorf("failed to create sender for topic %s: %w", p.topic, err) + } + p.sender = sender + } + + return p.sender, nil +} + +func (p *AzureServiceBusPublisher) Format(ctx context.Context, event *models.Event) (*azservicebus.Message, error) { + dataBytes, err := json.Marshal(event.Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal event data: %w", err) + } + + messageMetadata := map[string]any{} + metadata := p.BasePublisher.MakeMetadata(event, time.Now()) + for k, v := range metadata { + messageMetadata[k] = v + } + + message := &azservicebus.Message{ + Body: dataBytes, + ApplicationProperties: messageMetadata, + } + + return message, nil } func (p *AzureServiceBusPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { - // Phase 1: minimal implementation that returns an error - return nil, fmt.Errorf("Azure Service Bus publishing not yet implemented") + if err := p.BasePublisher.StartPublish(); err != nil { + return nil, err + } + defer p.BasePublisher.FinishPublish() + + // Format the message + message, err := p.Format(ctx, event) + if err != nil { + return nil, err + } + + // Get or create sender and send the message + sender, err := p.ensureSender() + if err != nil { + return nil, err + } + + if err := sender.SendMessage(ctx, message, nil); err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ + "error": err.Error(), + }) + } + + return &destregistry.Delivery{ + Status: "success", + Code: "OK", + Response: map[string]interface{}{}, + }, nil } func (p *AzureServiceBusPublisher) Close() error { p.BasePublisher.StartClose() - // Phase 1: empty implementation + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if p.sender != nil { + if err := p.sender.Close(ctx); err != nil { + return err + } + } + + if p.client != nil { + if err := p.client.Close(ctx); err != nil { + return err + } + } + return nil } diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go index ae7c8582..a79138f4 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go @@ -1,8 +1,12 @@ package destazureservicebus_test import ( + "context" + "encoding/json" "testing" + "time" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" testsuite "github.com/hookdeck/outpost/internal/destregistry/testing" "github.com/hookdeck/outpost/internal/models" @@ -15,17 +19,77 @@ import ( // AzureServiceBusConsumer implements testsuite.MessageConsumer type AzureServiceBusConsumer struct { - msgChan chan testsuite.Message - done chan struct{} + client *azservicebus.Client + receiver *azservicebus.Receiver + msgChan chan testsuite.Message + done chan struct{} } -func NewAzureServiceBusConsumer() *AzureServiceBusConsumer { +func NewAzureServiceBusConsumer(connectionString, topic, subscription string) (*AzureServiceBusConsumer, error) { + client, err := azservicebus.NewClientFromConnectionString(connectionString, nil) + if err != nil { + return nil, err + } + + receiver, err := client.NewReceiverForSubscription(topic, subscription, nil) + if err != nil { + return nil, err + } + c := &AzureServiceBusConsumer{ - msgChan: make(chan testsuite.Message), - done: make(chan struct{}), + client: client, + receiver: receiver, + msgChan: make(chan testsuite.Message), + done: make(chan struct{}), + } + go c.consume() + return c, nil +} + +func (c *AzureServiceBusConsumer) consume() { + ctx := context.Background() + for { + select { + case <-c.done: + return + default: + messages, err := c.receiver.ReceiveMessages(ctx, 1, nil) + if err != nil { + continue + } + + for _, msg := range messages { + // Parse custom properties as metadata + metadata := make(map[string]string) + for k, v := range msg.ApplicationProperties { + if strVal, ok := v.(string); ok { + metadata[k] = strVal + } + } + + // Parse the message body + var data interface{} + if err := json.Unmarshal(msg.Body, &data); err != nil { + // If JSON parsing fails, use raw bytes + data = msg.Body + } + + dataBytes, _ := json.Marshal(data) + + c.msgChan <- testsuite.Message{ + Data: dataBytes, + Metadata: metadata, + Raw: msg, + } + + // Complete the message + if err := c.receiver.CompleteMessage(ctx, msg, nil); err != nil { + // Log error but continue + continue + } + } + } } - // Phase 1: Mock consumer that doesn't actually consume - return c } func (c *AzureServiceBusConsumer) Consume() <-chan testsuite.Message { @@ -34,6 +98,16 @@ func (c *AzureServiceBusConsumer) Consume() <-chan testsuite.Message { func (c *AzureServiceBusConsumer) Close() error { close(c.done) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if c.receiver != nil { + c.receiver.Close(ctx) + } + if c.client != nil { + c.client.Close(ctx) + } return nil } @@ -69,7 +143,13 @@ func (s *AzureServiceBusSuite) SetupSuite() { mqConfig := testinfra.GetMQAzureConfig(t, "TestDestinationAzureServiceBusSuite") // Create consumer - s.consumer = NewAzureServiceBusConsumer() + consumer, err := NewAzureServiceBusConsumer( + mqConfig.AzureServiceBus.ConnectionString, + mqConfig.AzureServiceBus.Topic, + mqConfig.AzureServiceBus.Subscription, + ) + require.NoError(t, err) + s.consumer = consumer // Create provider provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) From 78da1a9423335732e3c8bd40e1a0b1c70ebdacf7 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 15 Jul 2025 10:23:41 +0700 Subject: [PATCH 03/12] chore: rename "topic" to "name" --- .../providers/azure_servicebus/metadata.json | 6 +++--- .../destazureservicebus/destazureservicebus.go | 16 ++++++---------- .../destazureservicebus_publish_test.go | 2 +- 3 files changed, 10 insertions(+), 14 deletions(-) diff --git a/internal/destregistry/metadata/providers/azure_servicebus/metadata.json b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json index 532142fe..153264e4 100644 --- a/internal/destregistry/metadata/providers/azure_servicebus/metadata.json +++ b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json @@ -2,10 +2,10 @@ "type": "azure_servicebus", "config_fields": [ { - "key": "topic", + "key": "name", "type": "text", - "label": "Topic Name", - "description": "The name of the Azure Service Bus topic to publish to", + "label": "Queue or Topic Name", + "description": "The name of the Azure Service Bus queue or topic to publish to", "required": true, "pattern": "^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$" } diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go index 3996c72b..6c6055cb 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -17,7 +17,7 @@ type AzureServiceBusDestination struct { } type AzureServiceBusDestinationConfig struct { - Topic string + Name string } type AzureServiceBusDestinationCredentials struct { @@ -38,7 +38,6 @@ func New(loader metadata.MetadataLoader) (*AzureServiceBusDestination, error) { } func (d *AzureServiceBusDestination) Validate(ctx context.Context, destination *models.Destination) error { - // For phase 1, just call base validation return d.BaseProvider.Validate(ctx, destination) } @@ -51,7 +50,7 @@ func (d *AzureServiceBusDestination) CreatePublisher(ctx context.Context, destin return &AzureServiceBusPublisher{ BasePublisher: &destregistry.BasePublisher{}, connectionString: creds.ConnectionString, - topic: cfg.Topic, + queueOrTopic: cfg.Name, }, nil } @@ -75,17 +74,16 @@ func (d *AzureServiceBusDestination) resolveMetadata(ctx context.Context, destin } return &AzureServiceBusDestinationConfig{ - Topic: destination.Config["topic"], + Name: destination.Config["name"], }, &AzureServiceBusDestinationCredentials{ ConnectionString: destination.Credentials["connection_string"], }, nil } -// Publisher implementation type AzureServiceBusPublisher struct { *destregistry.BasePublisher connectionString string - topic string + queueOrTopic string client *azservicebus.Client sender *azservicebus.Sender } @@ -100,9 +98,9 @@ func (p *AzureServiceBusPublisher) ensureSender() (*azservicebus.Sender, error) } if p.sender == nil { - sender, err := p.client.NewSender(p.topic, nil) + sender, err := p.client.NewSender(p.queueOrTopic, nil) if err != nil { - return nil, fmt.Errorf("failed to create sender for topic %s: %w", p.topic, err) + return nil, fmt.Errorf("failed to create sender for queue or topic %s: %w", p.queueOrTopic, err) } p.sender = sender } @@ -136,13 +134,11 @@ func (p *AzureServiceBusPublisher) Publish(ctx context.Context, event *models.Ev } defer p.BasePublisher.FinishPublish() - // Format the message message, err := p.Format(ctx, event) if err != nil { return nil, err } - // Get or create sender and send the message sender, err := p.ensureSender() if err != nil { return nil, err diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go index a79138f4..053e6011 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go @@ -159,7 +159,7 @@ func (s *AzureServiceBusSuite) SetupSuite() { destination := testutil.DestinationFactory.Any( testutil.DestinationFactory.WithType("azure_servicebus"), testutil.DestinationFactory.WithConfig(map[string]string{ - "topic": mqConfig.AzureServiceBus.Topic, + "name": mqConfig.AzureServiceBus.Topic, }), testutil.DestinationFactory.WithCredentials(map[string]string{ "connection_string": mqConfig.AzureServiceBus.ConnectionString, From bdee9c16e541f4669a402c3feeac1df2b19e6f32 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 15 Jul 2025 10:31:24 +0700 Subject: [PATCH 04/12] feat: extract namespace from connection string --- .../destazureservicebus.go | 45 ++++- .../destazureservicebus_test.go | 168 ++++++++++++++++++ 2 files changed, 208 insertions(+), 5 deletions(-) create mode 100644 internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go index 6c6055cb..671f02bb 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "strings" "time" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" @@ -55,13 +56,27 @@ func (d *AzureServiceBusDestination) CreatePublisher(ctx context.Context, destin } func (d *AzureServiceBusDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { - if topic, ok := destination.Config["topic"]; ok { - return destregistry.DestinationTarget{ - Target: topic, - TargetURL: "", + name, ok := destination.Config["name"] + if !ok { + return destregistry.DestinationTarget{} + } + + // Try to extract namespace from connection string + if connStr, ok := destination.Credentials["connection_string"]; ok { + namespace := parseNamespaceFromConnectionString(connStr) + if namespace != "" { + return destregistry.DestinationTarget{ + Target: fmt.Sprintf("%s/%s", namespace, name), + TargetURL: "", + } } } - return destregistry.DestinationTarget{} + + // Fallback to just the name if we can't parse namespace + return destregistry.DestinationTarget{ + Target: name, + TargetURL: "", + } } func (d *AzureServiceBusDestination) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *destregistry.PreprocessDestinationOpts) error { @@ -183,3 +198,23 @@ func (p *AzureServiceBusPublisher) Close() error { return nil } + +// parseNamespaceFromConnectionString extracts the namespace from an Azure Service Bus connection string. +// Connection strings typically have the format: +// Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=... +func parseNamespaceFromConnectionString(connStr string) string { + // Split by semicolons to get individual components + parts := strings.Split(connStr, ";") + for _, part := range parts { + if strings.HasPrefix(part, "Endpoint=") { + endpoint := strings.TrimPrefix(part, "Endpoint=") + // Remove protocol prefix + endpoint = strings.TrimPrefix(endpoint, "sb://") + // Extract namespace (everything before first dot) + if idx := strings.Index(endpoint, "."); idx > 0 { + return endpoint[:idx] + } + } + } + return "" +} diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go new file mode 100644 index 00000000..78c9b98f --- /dev/null +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go @@ -0,0 +1,168 @@ +package destazureservicebus_test + +import ( + "testing" + + "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestComputeTarget(t *testing.T) { + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + require.NoError(t, err) + + tests := []struct { + name string + destination models.Destination + expectedTarget string + }{ + { + name: "with valid connection string and name", + destination: models.Destination{ + Config: map[string]string{ + "name": "my-queue", + }, + Credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + }, + expectedTarget: "mynamespace/my-queue", + }, + { + name: "with different namespace format", + destination: models.Destination{ + Config: map[string]string{ + "name": "my-topic", + }, + Credentials: map[string]string{ + "connection_string": "Endpoint=sb://test-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz789", + }, + }, + expectedTarget: "test-namespace/my-topic", + }, + { + name: "with missing name config", + destination: models.Destination{ + Config: map[string]string{}, + Credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + }, + expectedTarget: "", + }, + { + name: "with invalid connection string format", + destination: models.Destination{ + Config: map[string]string{ + "name": "my-queue", + }, + Credentials: map[string]string{ + "connection_string": "invalid-connection-string", + }, + }, + expectedTarget: "my-queue", // Falls back to just the name + }, + { + name: "with missing connection string", + destination: models.Destination{ + Config: map[string]string{ + "name": "my-queue", + }, + Credentials: map[string]string{}, + }, + expectedTarget: "my-queue", // Falls back to just the name + }, + { + name: "with connection string missing Endpoint", + destination: models.Destination{ + Config: map[string]string{ + "name": "my-queue", + }, + Credentials: map[string]string{ + "connection_string": "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + }, + expectedTarget: "my-queue", // Falls back to just the name + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := provider.ComputeTarget(&tt.destination) + assert.Equal(t, tt.expectedTarget, result.Target) + assert.Empty(t, result.TargetURL) // TargetURL should always be empty for now + }) + } +} + +func TestParseNamespaceFromConnectionString(t *testing.T) { + tests := []struct { + name string + connectionString string + expectedNamespace string + }{ + { + name: "standard connection string", + connectionString: "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + expectedNamespace: "mynamespace", + }, + { + name: "connection string with hyphenated namespace", + connectionString: "Endpoint=sb://my-test-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz789", + expectedNamespace: "my-test-namespace", + }, + { + name: "connection string with different order", + connectionString: "SharedAccessKeyName=RootManageSharedAccessKey;Endpoint=sb://namespace123.servicebus.windows.net/;SharedAccessKey=key123", + expectedNamespace: "namespace123", + }, + { + name: "invalid connection string", + connectionString: "invalid-string", + expectedNamespace: "", + }, + { + name: "missing endpoint", + connectionString: "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + expectedNamespace: "", + }, + { + name: "malformed endpoint", + connectionString: "Endpoint=invalid-endpoint;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + expectedNamespace: "", + }, + { + name: "empty connection string", + connectionString: "", + expectedNamespace: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // We need to test the parseNamespaceFromConnectionString function + // Since it's not exported, we test it indirectly through ComputeTarget + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + require.NoError(t, err) + + dest := models.Destination{ + Config: map[string]string{ + "name": "test-entity", + }, + Credentials: map[string]string{ + "connection_string": tt.connectionString, + }, + } + + result := provider.ComputeTarget(&dest) + if tt.expectedNamespace == "" { + assert.Equal(t, "test-entity", result.Target) + } else { + assert.Equal(t, tt.expectedNamespace+"/test-entity", result.Target) + } + }) + } +} From c79004519739dbafb2c0700adf815a3750e09956 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 15 Jul 2025 10:47:49 +0700 Subject: [PATCH 05/12] chore: validation --- .../destazureservicebus.go | 2 + .../destazureservicebus_test.go | 164 +++++++++++++----- 2 files changed, 125 insertions(+), 41 deletions(-) diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go index 671f02bb..5f687606 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -39,6 +39,7 @@ func New(loader metadata.MetadataLoader) (*AzureServiceBusDestination, error) { } func (d *AzureServiceBusDestination) Validate(ctx context.Context, destination *models.Destination) error { + // Just use base validation - let Azure SDK handle connection string validation at runtime return d.BaseProvider.Validate(ctx, destination) } @@ -80,6 +81,7 @@ func (d *AzureServiceBusDestination) ComputeTarget(destination *models.Destinati } func (d *AzureServiceBusDestination) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *destregistry.PreprocessDestinationOpts) error { + // No preprocessing needed for Azure Service Bus return nil } diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go index 78c9b98f..ba9015e5 100644 --- a/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go @@ -1,8 +1,11 @@ package destazureservicebus_test import ( + "context" + "strings" "testing" + "github.com/hookdeck/outpost/internal/destregistry" "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" "github.com/hookdeck/outpost/internal/models" "github.com/hookdeck/outpost/internal/util/testutil" @@ -16,74 +19,63 @@ func TestComputeTarget(t *testing.T) { tests := []struct { name string - destination models.Destination + config map[string]string + credentials map[string]string expectedTarget string }{ { name: "with valid connection string and name", - destination: models.Destination{ - Config: map[string]string{ - "name": "my-queue", - }, - Credentials: map[string]string{ - "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", - }, + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", }, expectedTarget: "mynamespace/my-queue", }, { name: "with different namespace format", - destination: models.Destination{ - Config: map[string]string{ - "name": "my-topic", - }, - Credentials: map[string]string{ - "connection_string": "Endpoint=sb://test-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz789", - }, + config: map[string]string{ + "name": "my-topic", + }, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://test-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz789", }, expectedTarget: "test-namespace/my-topic", }, { - name: "with missing name config", - destination: models.Destination{ - Config: map[string]string{}, - Credentials: map[string]string{ - "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", - }, + name: "with missing name config", + config: map[string]string{}, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", }, expectedTarget: "", }, { name: "with invalid connection string format", - destination: models.Destination{ - Config: map[string]string{ - "name": "my-queue", - }, - Credentials: map[string]string{ - "connection_string": "invalid-connection-string", - }, + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{ + "connection_string": "invalid-connection-string", }, expectedTarget: "my-queue", // Falls back to just the name }, { name: "with missing connection string", - destination: models.Destination{ - Config: map[string]string{ - "name": "my-queue", - }, - Credentials: map[string]string{}, + config: map[string]string{ + "name": "my-queue", }, + credentials: map[string]string{}, expectedTarget: "my-queue", // Falls back to just the name }, { name: "with connection string missing Endpoint", - destination: models.Destination{ - Config: map[string]string{ - "name": "my-queue", - }, - Credentials: map[string]string{ - "connection_string": "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", - }, + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{ + "connection_string": "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", }, expectedTarget: "my-queue", // Falls back to just the name }, @@ -91,7 +83,12 @@ func TestComputeTarget(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - result := provider.ComputeTarget(&tt.destination) + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("azure_servicebus"), + testutil.DestinationFactory.WithConfig(tt.config), + testutil.DestinationFactory.WithCredentials(tt.credentials), + ) + result := provider.ComputeTarget(&destination) assert.Equal(t, tt.expectedTarget, result.Target) assert.Empty(t, result.TargetURL) // TargetURL should always be empty for now }) @@ -149,6 +146,7 @@ func TestParseNamespaceFromConnectionString(t *testing.T) { require.NoError(t, err) dest := models.Destination{ + Type: "azure_servicebus", Config: map[string]string{ "name": "test-entity", }, @@ -166,3 +164,87 @@ func TestParseNamespaceFromConnectionString(t *testing.T) { }) } } + +func TestValidate(t *testing.T) { + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + require.NoError(t, err) + + tests := []struct { + name string + config map[string]string + credentials map[string]string + wantErr bool + errContains string + }{ + { + name: "valid destination", + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + wantErr: false, + }, + { + name: "missing name", + config: map[string]string{}, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + wantErr: true, + errContains: "config.name", + }, + { + name: "missing connection string", + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{}, + wantErr: true, + errContains: "credentials.connection_string", + }, + { + name: "invalid name pattern", + config: map[string]string{ + "name": "my queue with spaces", // Invalid - should fail pattern validation + }, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + wantErr: true, + errContains: "config.name", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("azure_servicebus"), + testutil.DestinationFactory.WithConfig(tt.config), + testutil.DestinationFactory.WithCredentials(tt.credentials), + ) + ctx := context.Background() + err := provider.Validate(ctx, &destination) + if tt.wantErr { + require.Error(t, err) + if tt.errContains != "" { + var validationErr *destregistry.ErrDestinationValidation + require.ErrorAs(t, err, &validationErr) + require.NotEmpty(t, validationErr.Errors) + // Check that at least one error contains the expected field + found := false + for _, e := range validationErr.Errors { + if strings.Contains(e.Field, tt.errContains) { + found = true + break + } + } + assert.True(t, found, "Expected error field containing %q, but got %+v", tt.errContains, validationErr.Errors) + } + } else { + require.NoError(t, err) + } + }) + } +} From 3b2f5d85b926bce6b45e905e6cfc6055dddec72f Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 15 Jul 2025 12:13:26 +0700 Subject: [PATCH 06/12] chore: register destazureservicebus & local test --- build/dev/azure/config.json | 9 ++ cmd/destinations/azureservicebus/main.go | 129 +++++++++++++++++++++ internal/destregistry/providers/default.go | 7 ++ 3 files changed, 145 insertions(+) create mode 100644 cmd/destinations/azureservicebus/main.go diff --git a/build/dev/azure/config.json b/build/dev/azure/config.json index de7cff6d..5681d17c 100644 --- a/build/dev/azure/config.json +++ b/build/dev/azure/config.json @@ -41,6 +41,15 @@ } ] }, + // test destination + { + "Name": "destination-test", + "Subscriptions": [ + { + "Name": "destination-test-sub" + } + ] + }, // tests { diff --git a/cmd/destinations/azureservicebus/main.go b/cmd/destinations/azureservicebus/main.go new file mode 100644 index 00000000..55280c42 --- /dev/null +++ b/cmd/destinations/azureservicebus/main.go @@ -0,0 +1,129 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +const ( + TOPIC_NAME = "destination-test" + SUBSCRIPTION_NAME = "destination-test-sub" +) + +func main() { + if err := run(); err != nil { + panic(err) + } +} + +func run() error { + // Get connection string from environment or use default + connectionString := os.Getenv("AZURE_SERVICEBUS_CONNECTION_STRING") + if connectionString == "" { + // Default for local development - update as needed + connectionString = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" + } + + // Create client + client, err := azservicebus.NewClientFromConnectionString(connectionString, nil) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + defer client.Close(context.Background()) + + // Create receiver for the subscription + receiver, err := client.NewReceiverForSubscription(TOPIC_NAME, SUBSCRIPTION_NAME, nil) + if err != nil { + return fmt.Errorf("failed to create receiver: %w", err) + } + defer receiver.Close(context.Background()) + + // Set up signal handling for graceful shutdown + termChan := make(chan os.Signal, 1) + signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) + + // Start consuming messages + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + if err != nil { + if ctx.Err() != nil { + // Context cancelled, exit gracefully + return + } + log.Printf("[x] Error receiving messages: %v", err) + continue + } + + for _, msg := range messages { + // Log message details + log.Printf("[x] Received message:") + log.Printf(" Message ID: %s", msg.MessageID) + log.Printf(" Sequence Number: %d", *msg.SequenceNumber) + + // Log application properties (metadata) + if len(msg.ApplicationProperties) > 0 { + log.Printf(" Metadata:") + for k, v := range msg.ApplicationProperties { + log.Printf(" %s: %v", k, v) + } + } + + // Log message body + var data interface{} + if err := json.Unmarshal(msg.Body, &data); err == nil { + // Pretty print JSON + pretty, _ := json.MarshalIndent(data, " ", " ") + log.Printf(" Body (JSON):\n %s", string(pretty)) + } else { + // Raw body + log.Printf(" Body (Raw): %s", string(msg.Body)) + } + + // Complete the message + if err := receiver.CompleteMessage(ctx, msg, nil); err != nil { + log.Printf("[x] Error completing message: %v", err) + } + } + } + }() + + // Log configuration + log.Printf("[*] Azure Service Bus Consumer") + log.Printf("[*] Topic: %s", TOPIC_NAME) + log.Printf("[*] Subscription: %s", SUBSCRIPTION_NAME) + log.Printf("[*] Namespace: %s", extractNamespace(connectionString)) + log.Printf("[*] Ready to receive messages. Press Ctrl+C to exit.") + + // Wait for termination signal + <-termChan + log.Printf("[*] Shutting down...") + cancel() + + return nil +} + +func extractNamespace(connStr string) string { + // Simple extraction for display purposes + start := len("Endpoint=sb://") + if len(connStr) > start { + end := start + for end < len(connStr) && connStr[end] != '.' { + end++ + } + if end > start { + return connStr[start:end] + } + } + return "unknown" +} diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index 4f14de0a..9b643c14 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -4,6 +4,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry" "github.com/hookdeck/outpost/internal/destregistry/providers/destawskinesis" "github.com/hookdeck/outpost/internal/destregistry/providers/destawssqs" + "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" "github.com/hookdeck/outpost/internal/destregistry/providers/desthookdeck" "github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq" "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" @@ -84,6 +85,12 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina } registry.RegisterProvider("aws_kinesis", awsKinesis) + azureServiceBus, err := destazureservicebus.New(loader) + if err != nil { + return err + } + registry.RegisterProvider("azure_servicebus", azureServiceBus) + rabbitmq, err := destrabbitmq.New(loader) if err != nil { return err From 3a1fdd8f3e9aa84f6a349dac24af9c674078aedf Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 15 Jul 2025 12:19:21 +0700 Subject: [PATCH 07/12] chore: azure svg --- .../metadata/providers/azure_servicebus/metadata.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/destregistry/metadata/providers/azure_servicebus/metadata.json b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json index 153264e4..b9699c44 100644 --- a/internal/destregistry/metadata/providers/azure_servicebus/metadata.json +++ b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json @@ -23,5 +23,5 @@ "label": "Azure Service Bus", "link": "https://azure.microsoft.com/en-us/services/service-bus/", "description": "Send events to Azure Service Bus topics for reliable cloud messaging between applications and services.", - "icon": "" + "icon": "" } From 26e61d27d449f2ca1d90a54298a8ba10285f0196 Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 15 Jul 2025 12:53:33 +0700 Subject: [PATCH 08/12] chore: refactor local destination consumer for easier testing --- cmd/destinations/azureservicebus/main.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/cmd/destinations/azureservicebus/main.go b/cmd/destinations/azureservicebus/main.go index 55280c42..1ece3f36 100644 --- a/cmd/destinations/azureservicebus/main.go +++ b/cmd/destinations/azureservicebus/main.go @@ -12,9 +12,11 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" ) +// local const ( TOPIC_NAME = "destination-test" SUBSCRIPTION_NAME = "destination-test-sub" + CONNECTION_STRING = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" ) func main() { @@ -24,15 +26,8 @@ func main() { } func run() error { - // Get connection string from environment or use default - connectionString := os.Getenv("AZURE_SERVICEBUS_CONNECTION_STRING") - if connectionString == "" { - // Default for local development - update as needed - connectionString = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" - } - // Create client - client, err := azservicebus.NewClientFromConnectionString(connectionString, nil) + client, err := azservicebus.NewClientFromConnectionString(CONNECTION_STRING, nil) if err != nil { return fmt.Errorf("failed to create client: %w", err) } @@ -102,7 +97,7 @@ func run() error { log.Printf("[*] Azure Service Bus Consumer") log.Printf("[*] Topic: %s", TOPIC_NAME) log.Printf("[*] Subscription: %s", SUBSCRIPTION_NAME) - log.Printf("[*] Namespace: %s", extractNamespace(connectionString)) + log.Printf("[*] Namespace: %s", extractNamespace(CONNECTION_STRING)) log.Printf("[*] Ready to receive messages. Press Ctrl+C to exit.") // Wait for termination signal From 19d0595dd3dac0debc61bd301406b0fef7c8afcd Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Tue, 15 Jul 2025 23:43:04 +0700 Subject: [PATCH 09/12] docs: azure_servicebus destination design & steps to test --- .../azure_servicebus/configuration.md | 105 ++++++++++++++++++ .../azure_servicebus/test-destination.md | 68 ++++++++++++ 2 files changed, 173 insertions(+) create mode 100644 contributing/destinations/azure_servicebus/configuration.md create mode 100644 contributing/destinations/azure_servicebus/test-destination.md diff --git a/contributing/destinations/azure_servicebus/configuration.md b/contributing/destinations/azure_servicebus/configuration.md new file mode 100644 index 00000000..7d279490 --- /dev/null +++ b/contributing/destinations/azure_servicebus/configuration.md @@ -0,0 +1,105 @@ +# AzureServiceBus Destination configuration + +Here's a rough document explaining how AzureServiceBus works and how the destination is implemented with Outpost. + +# PubSub vs Queue + +Azure ServiceBus supports both PubSub (Topic & Subscription) and Queue. From the Publisher (Azure's term is Sender) perspective, it doesn't really care whether it's publishing to a Topic or to a Queue. So, from the destination config, all we need is a single "name" field. + +## Authentication + +For authentication, we currently support "connection_string" which generally have access to the full Namespace. So if the end-user wants to ensure Outpost only has access to their desired queue or topic, they should create a new Namespace just for Outpost. + +## Message + +Whether it's publishing to Topic or Queue, the Publisher needs to send an Azure's Message. Here's the full Golang SDK Message struct: + +```golang +// Message is a message with a body and commonly used properties. +// Properties that are pointers are optional. +type Message struct { + // ApplicationProperties can be used to store custom metadata for a message. + ApplicationProperties map[string]any + + // Body corresponds to the first []byte array in the Data section of an AMQP message. + Body []byte + + // ContentType describes the payload of the message, with a descriptor following + // the format of Content-Type, specified by RFC2045 (ex: "application/json"). + ContentType *string + + // CorrelationID allows an application to specify a context for the message for the purposes of + // correlation, for example reflecting the MessageID of a message that is being + // replied to. + CorrelationID *string + + // MessageID is an application-defined value that uniquely identifies + // the message and its payload. The identifier is a free-form string. + // + // If enabled, the duplicate detection feature identifies and removes further submissions + // of messages with the same MessageId. + MessageID *string + + // PartitionKey is used with a partitioned entity and enables assigning related messages + // to the same internal partition. This ensures that the submission sequence order is correctly + // recorded. The partition is chosen by a hash function in Service Bus and cannot be chosen + // directly. + // + // For session-aware entities, the ReceivedMessage.SessionID overrides this value. + PartitionKey *string + + // ReplyTo is an application-defined value specify a reply path to the receiver of the message. When + // a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic + // it expects the reply to be sent to. + ReplyTo *string + + // ReplyToSessionID augments the ReplyTo information and specifies which SessionId should + // be set for the reply when sent to the reply entity. + ReplyToSessionID *string + + // ScheduledEnqueueTime specifies a time when a message will be enqueued. The message is transferred + // to the broker but will not available until the scheduled time. + ScheduledEnqueueTime *time.Time + + // SessionID is used with session-aware entities and associates a message with an application-defined + // session ID. Note that an empty string is a valid session identifier. + // Messages with the same session identifier are subject to summary locking and enable + // exact in-order processing and demultiplexing. For session-unaware entities, this value is ignored. + SessionID *string + + // Subject enables an application to indicate the purpose of the message, similar to an email subject line. + Subject *string + + // TimeToLive is the duration after which the message expires, starting from the instant the + // message has been accepted and stored by the broker, found in the ReceivedMessage.EnqueuedTime + // property. + // + // When not set explicitly, the assumed value is the DefaultTimeToLive for the queue or topic. + // A message's TimeToLive cannot be longer than the entity's DefaultTimeToLive is silently + // adjusted if it does. + TimeToLive *time.Duration + + // To is reserved for future use in routing scenarios but is not currently used by Service Bus. + // Applications can use this value to indicate the logical destination of the message. + To *string +} +``` + +Here are a few notable configuration, especially on the destination level that we may want to support: + +- MessageID --> MessageIDTemplate, similar to AWS Kinesis Parition Key approach +- CorrelationID --> CorrelationIDTemplate, similar to AWS Kinesis Parition Key approach +- PartitionKey --> PartitionKeyTemplate, similar to AWS Kinesis Parition Key approach + +- ScheduledEnqueueTime +- TimeToLive + +The current implementation doesn't support any of these. So when create destination, it's super straightforward: + +```golang +type Config struct { + Name string +} +``` + +If we want to support these, we can either add them to Config, such as `Config.TTL`, or we can also add a suffix like `Config.MessageTTL` to specify that these config would apply to the Message. diff --git a/contributing/destinations/azure_servicebus/test-destination.md b/contributing/destinations/azure_servicebus/test-destination.md new file mode 100644 index 00000000..1309ec75 --- /dev/null +++ b/contributing/destinations/azure_servicebus/test-destination.md @@ -0,0 +1,68 @@ +# Test AzureServiceBus Destination + +Assuming you have an Azure account & an authenticated Azure CLI, here are the steps you can do to set up a test ServiceBus Topic & Subscription for testing. + +Here are the resources you'll set up: + +1. Resource Group - A container that holds related Azure resources + - Name: outpost-demo-rg +2. Service Bus Namespace - The messaging service container (must be globally unique) + - Name: outpost-demo-sb-[RANDOM] (e.g., outpost-demo-sb-a3f2b1) +3. Topic - A message distribution mechanism (pub/sub pattern) + - Name: destination-test +4. Subscription - A receiver for messages sent to the topic + - Name: destination-test-sub + +Step 1: Create a Resource Group + +az group create \ + --name outpost-demo-rg \ + --location eastus + +Step 2: Create a Service Bus Namespace + +# Generate a random suffix for uniqueness +RANDOM_SUFFIX=$(openssl rand -hex 3) + +# Create the namespace +az servicebus namespace create \ + --resource-group outpost-demo-rg \ + --name outpost-demo-sb-${RANDOM_SUFFIX} \ + --location eastus \ + --sku Standard + +Note: The namespace name must be globally unique. The Standard SKU is required for topics (Basic only +supports queues). + +Step 3: Create a Topic + +az servicebus topic create \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --name destination-test + +Step 4: Create a Subscription + +az servicebus topic subscription create \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --topic-name destination-test \ + --name destination-test-sub + +Step 5: Get the Connection String + +az servicebus namespace authorization-rule keys list \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --name RootManageSharedAccessKey \ + --query primaryConnectionString \ + --output tsv + +Example Output: +Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;Sh +aredAccessKey=abcd1234... + +You can then create an Azure destination with: +- name: "destination-test" +- connection string: "Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;Sh +aredAccessKey=abcd1234..." From b437e54dd44336e6b642a860ea78f139891dfa5a Mon Sep 17 00:00:00 2001 From: Alex Luong Date: Wed, 16 Jul 2025 09:53:08 +0700 Subject: [PATCH 10/12] docs: improve permission consideration for azure connection string --- .../azure_servicebus/configuration.md | 74 +++++++++++++++++-- 1 file changed, 69 insertions(+), 5 deletions(-) diff --git a/contributing/destinations/azure_servicebus/configuration.md b/contributing/destinations/azure_servicebus/configuration.md index 7d279490..c42a7813 100644 --- a/contributing/destinations/azure_servicebus/configuration.md +++ b/contributing/destinations/azure_servicebus/configuration.md @@ -6,10 +6,6 @@ Here's a rough document explaining how AzureServiceBus works and how the destina Azure ServiceBus supports both PubSub (Topic & Subscription) and Queue. From the Publisher (Azure's term is Sender) perspective, it doesn't really care whether it's publishing to a Topic or to a Queue. So, from the destination config, all we need is a single "name" field. -## Authentication - -For authentication, we currently support "connection_string" which generally have access to the full Namespace. So if the end-user wants to ensure Outpost only has access to their desired queue or topic, they should create a new Namespace just for Outpost. - ## Message Whether it's publishing to Topic or Queue, the Publisher needs to send an Azure's Message. Here's the full Golang SDK Message struct: @@ -18,7 +14,7 @@ Whether it's publishing to Topic or Queue, the Publisher needs to send an Azure' // Message is a message with a body and commonly used properties. // Properties that are pointers are optional. type Message struct { - // ApplicationProperties can be used to store custom metadata for a message. + // ApplicationProperties can be used to store custom metadata for a message. ApplicationProperties map[string]any // Body corresponds to the first []byte array in the Data section of an AMQP message. @@ -103,3 +99,71 @@ type Config struct { ``` If we want to support these, we can either add them to Config, such as `Config.TTL`, or we can also add a suffix like `Config.MessageTTL` to specify that these config would apply to the Message. + +## Authentication + +For authentication, we currently support "connection_string" which by default have access to the full Namespace. + +## Creating Topic/Queue-Specific Access Policy + +### For a Topic (Send-only access): + +Create a Send-only policy for a specific topic + +az servicebus topic authorization-rule create \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --topic-name events \ + --name SendOnlyPolicy \ + --rights Send + +Get the Topic-Specific Connection String: + +az servicebus topic authorization-rule keys list \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --topic-name events \ + --name SendOnlyPolicy \ + --query primaryConnectionString \ + --output tsv + +This returns a connection string that can only send to the events topic: +Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=Send +OnlyPolicy;SharedAccessKey=xyz789...;EntityPath=events + +### For Queues (similar approach): + +Create a Send-only policy for a specific queue +az servicebus queue authorization-rule create \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --queue-name myqueue \ + --name SendOnlyPolicy \ + --rights Send + +Available Permission Rights: + +- Send - Can only send messages +- Listen - Can only receive messages +- Manage - Full control (send, receive, manage) + +You can combine multiple rights: +--rights Send Listen # Can both send and receive + +Benefits of Entity-Level Access: + +1. Security: Limits blast radius if credentials are compromised +2. Principle of Least Privilege: Outpost only needs Send permission +3. Audit Trail: Can track which policy is being used +4. Rotation: Can rotate entity-specific keys without affecting other services + +Important Notes: + +- Entity-level connection strings include EntityPath parameter +- These policies are scoped to a single topic/queue +- Perfect for production where you want to limit Outpost to only sending to specific +topics +- The connection string format is the same, just with limited scope + +This is the recommended approach for production use - give Outpost only the minimum +permissions it needs (Send) and only to the specific topic/queue it should access. \ No newline at end of file From ced3740febe2fee6585d9c7ffaf20542ce3ab6d5 Mon Sep 17 00:00:00 2001 From: Phil Leggetter Date: Mon, 21 Jul 2025 16:24:50 +0100 Subject: [PATCH 11/12] chore: add instructions.md for Service Bus destination --- .../azure_servicebus/instructions.md | 163 +++++++++++++++++- .../providers/azure_servicebus/metadata.json | 2 +- .../ConfigurationModal.scss | 12 ++ 3 files changed, 175 insertions(+), 2 deletions(-) diff --git a/internal/destregistry/metadata/providers/azure_servicebus/instructions.md b/internal/destregistry/metadata/providers/azure_servicebus/instructions.md index 2115ebca..5b47db13 100644 --- a/internal/destregistry/metadata/providers/azure_servicebus/instructions.md +++ b/internal/destregistry/metadata/providers/azure_servicebus/instructions.md @@ -1 +1,162 @@ -# Azure ServiceBus Configuration Instructions +# Azure Service Bus Configuration Instructions + +The Service Bus destination can be a topic or a queue. + +## Configuration + +- **Queue or Topic Name**: The name of the Service Bus topic or queue. +- **Connection String**: The connection string for the Service Bus instance. + +--- + +## How to set up Azure Service Bus as an event destination using the Azure CLI + +Here are the resources you'll need to set up: + +1. Resource Group - A container that holds related Azure resources +2. Service Bus Namespace - The messaging service container (must be globally unique) +3. Topic or a Queue: + - Topic: A message distribution mechanism (pub/sub pattern) + - Queue: A message queue for point-to-point communication + +### Prerequisites + +Assuming you have an Azure account and an authenticated Azure CLI, here are the steps to set up a Service Bus Topic or Queue as an Event Destination. + +Install [Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli). + +Log in to Azure: + +```bash +az login +``` + +### 1. Create Resource Group + +Set variables: + +```bash +RESOURCE_GROUP="outpost-rg" +LOCATION="eastus" +``` + +Create resource group: + +```bash +az group create --name $RESOURCE_GROUP --location $LOCATION +``` + +#### 2. Create Service Bus Namespace + +Generate a unique namespace name (must be globally unique). + +```bash +RANDOM_SUFFIX=$(openssl rand -hex 4) +NAMESPACE_NAME="outpost-servicebus-$RANDOM_SUFFIX" +``` + +Create Service Bus namespace: + +```bash +az servicebus namespace create \ + --resource-group $RESOURCE_GROUP \ + --name $NAMESPACE_NAME \ + --location $LOCATION \ + --sku Standard +``` + +### 3. Create Topic/Queue-Specific Access Policy + +Choose either a Topic or a Queue based on your requirements. Below are examples for both. + +#### For a Topic + +Set a topic name: + +```bash +TOPIC_NAME="destination-test" +``` + +Create a topic: + +```bash +az servicebus topic create \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --name $TOPIC_NAME +``` + +Create a send-only policy for the topic. + +```bash +az servicebus topic authorization-rule create \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --topic-name $TOPIC_NAME \ + --name SendOnlyPolicy \ + --rights Send +``` + +Get the Topic-Specific Connection String: + +```bash +az servicebus topic authorization-rule keys list \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --topic-name $TOPIC_NAME \ + --name SendOnlyPolicy \ + --query primaryConnectionString \ + --output tsv +``` + +This returns a connection string that can only send to the Service Bus topic. Use this connection string in the **Connection String** field of the Event Destination configuration. + +``` +Endpoint=sb://outpost-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=SendOnlyPolicy;SharedAccessKey=xyz789...;EntityPath=events +``` + +#### For a Queue + +Set a queue name: + +```bash +QUEUE_NAME="myqueue" +``` + +Create a queue: + +```bash +az servicebus queue create \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --name $QUEUE_NAME +``` + +Create a send-only policy for the queue. + +```bash +az servicebus queue authorization-rule create \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --queue-name $QUEUE_NAME \ + --name SendOnlyPolicy \ + --rights Send +``` + +Get the Queue-Specific Connection String: + +```bash +az servicebus queue authorization-rule keys list \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --queue-name $QUEUE_NAME \ + --name SendOnlyPolicy \ + --query primaryConnectionString \ + --output tsv +``` + +This returns a connection string that can only send to the Service Bus queue. Use this connection string in the **Connection String** field of the Event Destination configuration. + +``` +Endpoint=sb://outpost-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=SendOnlyPolicy;SharedAccessKey=xyz789...;EntityPath=myqueue +``` diff --git a/internal/destregistry/metadata/providers/azure_servicebus/metadata.json b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json index b9699c44..41aa704b 100644 --- a/internal/destregistry/metadata/providers/azure_servicebus/metadata.json +++ b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json @@ -23,5 +23,5 @@ "label": "Azure Service Bus", "link": "https://azure.microsoft.com/en-us/services/service-bus/", "description": "Send events to Azure Service Bus topics for reliable cloud messaging between applications and services.", - "icon": "" + "icon": "" } diff --git a/internal/portal/src/common/ConfigurationModal/ConfigurationModal.scss b/internal/portal/src/common/ConfigurationModal/ConfigurationModal.scss index 031b2d01..23ba9b4a 100644 --- a/internal/portal/src/common/ConfigurationModal/ConfigurationModal.scss +++ b/internal/portal/src/common/ConfigurationModal/ConfigurationModal.scss @@ -35,6 +35,18 @@ line-height: var(--line-height-m); } + h3 { + font-size: var(--font-size-s); + font-weight: 600; + line-height: var(--line-height-s); + } + + h4 { + font-size: var(--font-size-xs); + font-weight: 600; + line-height: var(--line-height-xs); + } + p, li { font-size: var(--font-size-m); line-height: var(--line-height-m); From 4bec835419d12417f8f04efb345c19db1be31691 Mon Sep 17 00:00:00 2001 From: Phil Leggetter Date: Mon, 21 Jul 2025 16:41:52 +0100 Subject: [PATCH 12/12] chore(docs): update Azure Service Bus references --- docs/pages/concepts.mdx | 11 ++++++----- docs/pages/features/destinations.mdx | 4 ++-- docs/pages/references/roadmap.mdx | 17 +++++++++++------ 3 files changed, 19 insertions(+), 13 deletions(-) diff --git a/docs/pages/concepts.mdx b/docs/pages/concepts.mdx index 9772e694..7c8c60eb 100644 --- a/docs/pages/concepts.mdx +++ b/docs/pages/concepts.mdx @@ -29,10 +29,10 @@ Outpost consists of 3 services that can either be run as a single deployment or - Redis 6.0+ or wire-compatible alternative (RBD or AOF strongly recommended) - One of the supported message queues: + - Azure Service Bus - AWS SQS - - RabbitMQ - GCP Pub/Sub - - Azure Service Bus + - RabbitMQ ### Log Service @@ -46,11 +46,12 @@ Required for log storage. Event destination types belonging to Outpost tenants where events are delivered. - **Webhooks** -- **RabbitMQ** -- **AWS SQS** - **Hookdeck Event Gateway** +- **Azure Service Bus** +- **AWS Kinesis** +- **AWS SQS** +- **RabbitMQ** - **[Amazon EventBridge (planned)](https://github.com/hookdeck/outpost/issues/201)** -- **[Azure Service Bus (planned)](https://github.com/hookdeck/outpost/issues/241)** - **[GCP Pub/Sub (planned)](https://github.com/hookdeck/outpost/issues/140)** - **[Kafka (planned)](https://github.com/hookdeck/outpost/issues/141)** diff --git a/docs/pages/features/destinations.mdx b/docs/pages/features/destinations.mdx index 9a880735..7ff52157 100644 --- a/docs/pages/features/destinations.mdx +++ b/docs/pages/features/destinations.mdx @@ -6,14 +6,14 @@ Outpost supports multiple event destination types. Each tenant can have multiple - Webhooks - Hookdeck Event Gateway -- AWS SQS - AWS Kinesis +- AWS SQS +- Azure Service Bus - RabbitMQ (AMQP) Plans for additional event destination types include: - GCP Pub/Sub -- Azure Service Bus - Amazon EventBridge - Kafka diff --git a/docs/pages/references/roadmap.mdx b/docs/pages/references/roadmap.mdx index 06a9d488..eb2dd83d 100644 --- a/docs/pages/references/roadmap.mdx +++ b/docs/pages/references/roadmap.mdx @@ -8,14 +8,9 @@ title: "Outpost Roadmap" - [Postgres partition management](https://github.com/hookdeck/outpost/issues/249) -### Publish Message Queues - -- [Azure Service Bus](https://github.com/hookdeck/outpost/issues/139) - ### Destination Types - [GCP Pub/Sub](https://github.com/hookdeck/outpost/issues/140) -- [Azure Service Bus](https://github.com/hookdeck/outpost/issues/241) - [Amazon EventBridge](https://github.com/hookdeck/outpost/issues/201) - [Kafka](https://github.com/hookdeck/outpost/issues/141) - [S3](https://github.com/orgs/hookdeck/projects/21/views/1?filterQuery=s3&pane=issue&itemId=113373337&issue=hookdeck%7Coutpost%7C418) @@ -28,11 +23,21 @@ title: "Outpost Roadmap" ## Previous Notable Milestones +### v0.4.0 + +### Publish Message Queues + +- ✅ [Azure Service Bus](https://github.com/hookdeck/outpost/pull/435) + +### Destination Types + +- ✅ [Azure Service Bus](https://github.com/hookdeck/outpost/issues/241) + ### v0.3.0 #### Internal Message Queues -- ✅ [Azure Service Bus](https://github.com/orgs/hookdeck/projects/21?pane=issue&itemId=101269468&issue=hookdeck%7Coutpost%7C139) +- ✅ [Azure Service Bus](https://github.com/hookdeck/outpost/pull/425) ### v0.1.0