diff --git a/build/dev/azure/config.json b/build/dev/azure/config.json index a8419d68..5681d17c 100644 --- a/build/dev/azure/config.json +++ b/build/dev/azure/config.json @@ -41,6 +41,15 @@ } ] }, + // test destination + { + "Name": "destination-test", + "Subscriptions": [ + { + "Name": "destination-test-sub" + } + ] + }, // tests { @@ -53,6 +62,17 @@ "Rules": [] } ] + }, + { + "Name": "TestDestinationAzureServiceBusSuite-topic", + "Properties": {}, + "Subscriptions": [ + { + "Name": "TestDestinationAzureServiceBusSuite-subscription", + "Properties": {}, + "Rules": [] + } + ] } ] } diff --git a/cmd/destinations/azureservicebus/main.go b/cmd/destinations/azureservicebus/main.go new file mode 100644 index 00000000..1ece3f36 --- /dev/null +++ b/cmd/destinations/azureservicebus/main.go @@ -0,0 +1,124 @@ +package main + +import ( + "context" + "encoding/json" + "fmt" + "log" + "os" + "os/signal" + "syscall" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" +) + +// local +const ( + TOPIC_NAME = "destination-test" + SUBSCRIPTION_NAME = "destination-test-sub" + CONNECTION_STRING = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;" +) + +func main() { + if err := run(); err != nil { + panic(err) + } +} + +func run() error { + // Create client + client, err := azservicebus.NewClientFromConnectionString(CONNECTION_STRING, nil) + if err != nil { + return fmt.Errorf("failed to create client: %w", err) + } + defer client.Close(context.Background()) + + // Create receiver for the subscription + receiver, err := client.NewReceiverForSubscription(TOPIC_NAME, SUBSCRIPTION_NAME, nil) + if err != nil { + return fmt.Errorf("failed to create receiver: %w", err) + } + defer receiver.Close(context.Background()) + + // Set up signal handling for graceful shutdown + termChan := make(chan os.Signal, 1) + signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM) + + // Start consuming messages + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go func() { + for { + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + if err != nil { + if ctx.Err() != nil { + // Context cancelled, exit gracefully + return + } + log.Printf("[x] Error receiving messages: %v", err) + continue + } + + for _, msg := range messages { + // Log message details + log.Printf("[x] Received message:") + log.Printf(" Message ID: %s", msg.MessageID) + log.Printf(" Sequence Number: %d", *msg.SequenceNumber) + + // Log application properties (metadata) + if len(msg.ApplicationProperties) > 0 { + log.Printf(" Metadata:") + for k, v := range msg.ApplicationProperties { + log.Printf(" %s: %v", k, v) + } + } + + // Log message body + var data interface{} + if err := json.Unmarshal(msg.Body, &data); err == nil { + // Pretty print JSON + pretty, _ := json.MarshalIndent(data, " ", " ") + log.Printf(" Body (JSON):\n %s", string(pretty)) + } else { + // Raw body + log.Printf(" Body (Raw): %s", string(msg.Body)) + } + + // Complete the message + if err := receiver.CompleteMessage(ctx, msg, nil); err != nil { + log.Printf("[x] Error completing message: %v", err) + } + } + } + }() + + // Log configuration + log.Printf("[*] Azure Service Bus Consumer") + log.Printf("[*] Topic: %s", TOPIC_NAME) + log.Printf("[*] Subscription: %s", SUBSCRIPTION_NAME) + log.Printf("[*] Namespace: %s", extractNamespace(CONNECTION_STRING)) + log.Printf("[*] Ready to receive messages. Press Ctrl+C to exit.") + + // Wait for termination signal + <-termChan + log.Printf("[*] Shutting down...") + cancel() + + return nil +} + +func extractNamespace(connStr string) string { + // Simple extraction for display purposes + start := len("Endpoint=sb://") + if len(connStr) > start { + end := start + for end < len(connStr) && connStr[end] != '.' { + end++ + } + if end > start { + return connStr[start:end] + } + } + return "unknown" +} diff --git a/contributing/destinations/azure_servicebus/configuration.md b/contributing/destinations/azure_servicebus/configuration.md new file mode 100644 index 00000000..c42a7813 --- /dev/null +++ b/contributing/destinations/azure_servicebus/configuration.md @@ -0,0 +1,169 @@ +# AzureServiceBus Destination configuration + +Here's a rough document explaining how AzureServiceBus works and how the destination is implemented with Outpost. + +# PubSub vs Queue + +Azure ServiceBus supports both PubSub (Topic & Subscription) and Queue. From the Publisher (Azure's term is Sender) perspective, it doesn't really care whether it's publishing to a Topic or to a Queue. So, from the destination config, all we need is a single "name" field. + +## Message + +Whether it's publishing to Topic or Queue, the Publisher needs to send an Azure's Message. Here's the full Golang SDK Message struct: + +```golang +// Message is a message with a body and commonly used properties. +// Properties that are pointers are optional. +type Message struct { + // ApplicationProperties can be used to store custom metadata for a message. + ApplicationProperties map[string]any + + // Body corresponds to the first []byte array in the Data section of an AMQP message. + Body []byte + + // ContentType describes the payload of the message, with a descriptor following + // the format of Content-Type, specified by RFC2045 (ex: "application/json"). + ContentType *string + + // CorrelationID allows an application to specify a context for the message for the purposes of + // correlation, for example reflecting the MessageID of a message that is being + // replied to. + CorrelationID *string + + // MessageID is an application-defined value that uniquely identifies + // the message and its payload. The identifier is a free-form string. + // + // If enabled, the duplicate detection feature identifies and removes further submissions + // of messages with the same MessageId. + MessageID *string + + // PartitionKey is used with a partitioned entity and enables assigning related messages + // to the same internal partition. This ensures that the submission sequence order is correctly + // recorded. The partition is chosen by a hash function in Service Bus and cannot be chosen + // directly. + // + // For session-aware entities, the ReceivedMessage.SessionID overrides this value. + PartitionKey *string + + // ReplyTo is an application-defined value specify a reply path to the receiver of the message. When + // a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic + // it expects the reply to be sent to. + ReplyTo *string + + // ReplyToSessionID augments the ReplyTo information and specifies which SessionId should + // be set for the reply when sent to the reply entity. + ReplyToSessionID *string + + // ScheduledEnqueueTime specifies a time when a message will be enqueued. The message is transferred + // to the broker but will not available until the scheduled time. + ScheduledEnqueueTime *time.Time + + // SessionID is used with session-aware entities and associates a message with an application-defined + // session ID. Note that an empty string is a valid session identifier. + // Messages with the same session identifier are subject to summary locking and enable + // exact in-order processing and demultiplexing. For session-unaware entities, this value is ignored. + SessionID *string + + // Subject enables an application to indicate the purpose of the message, similar to an email subject line. + Subject *string + + // TimeToLive is the duration after which the message expires, starting from the instant the + // message has been accepted and stored by the broker, found in the ReceivedMessage.EnqueuedTime + // property. + // + // When not set explicitly, the assumed value is the DefaultTimeToLive for the queue or topic. + // A message's TimeToLive cannot be longer than the entity's DefaultTimeToLive is silently + // adjusted if it does. + TimeToLive *time.Duration + + // To is reserved for future use in routing scenarios but is not currently used by Service Bus. + // Applications can use this value to indicate the logical destination of the message. + To *string +} +``` + +Here are a few notable configuration, especially on the destination level that we may want to support: + +- MessageID --> MessageIDTemplate, similar to AWS Kinesis Parition Key approach +- CorrelationID --> CorrelationIDTemplate, similar to AWS Kinesis Parition Key approach +- PartitionKey --> PartitionKeyTemplate, similar to AWS Kinesis Parition Key approach + +- ScheduledEnqueueTime +- TimeToLive + +The current implementation doesn't support any of these. So when create destination, it's super straightforward: + +```golang +type Config struct { + Name string +} +``` + +If we want to support these, we can either add them to Config, such as `Config.TTL`, or we can also add a suffix like `Config.MessageTTL` to specify that these config would apply to the Message. + +## Authentication + +For authentication, we currently support "connection_string" which by default have access to the full Namespace. + +## Creating Topic/Queue-Specific Access Policy + +### For a Topic (Send-only access): + +Create a Send-only policy for a specific topic + +az servicebus topic authorization-rule create \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --topic-name events \ + --name SendOnlyPolicy \ + --rights Send + +Get the Topic-Specific Connection String: + +az servicebus topic authorization-rule keys list \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --topic-name events \ + --name SendOnlyPolicy \ + --query primaryConnectionString \ + --output tsv + +This returns a connection string that can only send to the events topic: +Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=Send +OnlyPolicy;SharedAccessKey=xyz789...;EntityPath=events + +### For Queues (similar approach): + +Create a Send-only policy for a specific queue +az servicebus queue authorization-rule create \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --queue-name myqueue \ + --name SendOnlyPolicy \ + --rights Send + +Available Permission Rights: + +- Send - Can only send messages +- Listen - Can only receive messages +- Manage - Full control (send, receive, manage) + +You can combine multiple rights: +--rights Send Listen # Can both send and receive + +Benefits of Entity-Level Access: + +1. Security: Limits blast radius if credentials are compromised +2. Principle of Least Privilege: Outpost only needs Send permission +3. Audit Trail: Can track which policy is being used +4. Rotation: Can rotate entity-specific keys without affecting other services + +Important Notes: + +- Entity-level connection strings include EntityPath parameter +- These policies are scoped to a single topic/queue +- Perfect for production where you want to limit Outpost to only sending to specific +topics +- The connection string format is the same, just with limited scope + +This is the recommended approach for production use - give Outpost only the minimum +permissions it needs (Send) and only to the specific topic/queue it should access. \ No newline at end of file diff --git a/contributing/destinations/azure_servicebus/test-destination.md b/contributing/destinations/azure_servicebus/test-destination.md new file mode 100644 index 00000000..1309ec75 --- /dev/null +++ b/contributing/destinations/azure_servicebus/test-destination.md @@ -0,0 +1,68 @@ +# Test AzureServiceBus Destination + +Assuming you have an Azure account & an authenticated Azure CLI, here are the steps you can do to set up a test ServiceBus Topic & Subscription for testing. + +Here are the resources you'll set up: + +1. Resource Group - A container that holds related Azure resources + - Name: outpost-demo-rg +2. Service Bus Namespace - The messaging service container (must be globally unique) + - Name: outpost-demo-sb-[RANDOM] (e.g., outpost-demo-sb-a3f2b1) +3. Topic - A message distribution mechanism (pub/sub pattern) + - Name: destination-test +4. Subscription - A receiver for messages sent to the topic + - Name: destination-test-sub + +Step 1: Create a Resource Group + +az group create \ + --name outpost-demo-rg \ + --location eastus + +Step 2: Create a Service Bus Namespace + +# Generate a random suffix for uniqueness +RANDOM_SUFFIX=$(openssl rand -hex 3) + +# Create the namespace +az servicebus namespace create \ + --resource-group outpost-demo-rg \ + --name outpost-demo-sb-${RANDOM_SUFFIX} \ + --location eastus \ + --sku Standard + +Note: The namespace name must be globally unique. The Standard SKU is required for topics (Basic only +supports queues). + +Step 3: Create a Topic + +az servicebus topic create \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --name destination-test + +Step 4: Create a Subscription + +az servicebus topic subscription create \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --topic-name destination-test \ + --name destination-test-sub + +Step 5: Get the Connection String + +az servicebus namespace authorization-rule keys list \ + --resource-group outpost-demo-rg \ + --namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \ + --name RootManageSharedAccessKey \ + --query primaryConnectionString \ + --output tsv + +Example Output: +Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;Sh +aredAccessKey=abcd1234... + +You can then create an Azure destination with: +- name: "destination-test" +- connection string: "Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;Sh +aredAccessKey=abcd1234..." diff --git a/docs/pages/concepts.mdx b/docs/pages/concepts.mdx index 9772e694..7c8c60eb 100644 --- a/docs/pages/concepts.mdx +++ b/docs/pages/concepts.mdx @@ -29,10 +29,10 @@ Outpost consists of 3 services that can either be run as a single deployment or - Redis 6.0+ or wire-compatible alternative (RBD or AOF strongly recommended) - One of the supported message queues: + - Azure Service Bus - AWS SQS - - RabbitMQ - GCP Pub/Sub - - Azure Service Bus + - RabbitMQ ### Log Service @@ -46,11 +46,12 @@ Required for log storage. Event destination types belonging to Outpost tenants where events are delivered. - **Webhooks** -- **RabbitMQ** -- **AWS SQS** - **Hookdeck Event Gateway** +- **Azure Service Bus** +- **AWS Kinesis** +- **AWS SQS** +- **RabbitMQ** - **[Amazon EventBridge (planned)](https://github.com/hookdeck/outpost/issues/201)** -- **[Azure Service Bus (planned)](https://github.com/hookdeck/outpost/issues/241)** - **[GCP Pub/Sub (planned)](https://github.com/hookdeck/outpost/issues/140)** - **[Kafka (planned)](https://github.com/hookdeck/outpost/issues/141)** diff --git a/docs/pages/features/destinations.mdx b/docs/pages/features/destinations.mdx index 9a880735..7ff52157 100644 --- a/docs/pages/features/destinations.mdx +++ b/docs/pages/features/destinations.mdx @@ -6,14 +6,14 @@ Outpost supports multiple event destination types. Each tenant can have multiple - Webhooks - Hookdeck Event Gateway -- AWS SQS - AWS Kinesis +- AWS SQS +- Azure Service Bus - RabbitMQ (AMQP) Plans for additional event destination types include: - GCP Pub/Sub -- Azure Service Bus - Amazon EventBridge - Kafka diff --git a/docs/pages/references/roadmap.mdx b/docs/pages/references/roadmap.mdx index 06a9d488..eb2dd83d 100644 --- a/docs/pages/references/roadmap.mdx +++ b/docs/pages/references/roadmap.mdx @@ -8,14 +8,9 @@ title: "Outpost Roadmap" - [Postgres partition management](https://github.com/hookdeck/outpost/issues/249) -### Publish Message Queues - -- [Azure Service Bus](https://github.com/hookdeck/outpost/issues/139) - ### Destination Types - [GCP Pub/Sub](https://github.com/hookdeck/outpost/issues/140) -- [Azure Service Bus](https://github.com/hookdeck/outpost/issues/241) - [Amazon EventBridge](https://github.com/hookdeck/outpost/issues/201) - [Kafka](https://github.com/hookdeck/outpost/issues/141) - [S3](https://github.com/orgs/hookdeck/projects/21/views/1?filterQuery=s3&pane=issue&itemId=113373337&issue=hookdeck%7Coutpost%7C418) @@ -28,11 +23,21 @@ title: "Outpost Roadmap" ## Previous Notable Milestones +### v0.4.0 + +### Publish Message Queues + +- ✅ [Azure Service Bus](https://github.com/hookdeck/outpost/pull/435) + +### Destination Types + +- ✅ [Azure Service Bus](https://github.com/hookdeck/outpost/issues/241) + ### v0.3.0 #### Internal Message Queues -- ✅ [Azure Service Bus](https://github.com/orgs/hookdeck/projects/21?pane=issue&itemId=101269468&issue=hookdeck%7Coutpost%7C139) +- ✅ [Azure Service Bus](https://github.com/hookdeck/outpost/pull/425) ### v0.1.0 diff --git a/internal/destregistry/metadata/providers/azure_servicebus/instructions.md b/internal/destregistry/metadata/providers/azure_servicebus/instructions.md new file mode 100644 index 00000000..5b47db13 --- /dev/null +++ b/internal/destregistry/metadata/providers/azure_servicebus/instructions.md @@ -0,0 +1,162 @@ +# Azure Service Bus Configuration Instructions + +The Service Bus destination can be a topic or a queue. + +## Configuration + +- **Queue or Topic Name**: The name of the Service Bus topic or queue. +- **Connection String**: The connection string for the Service Bus instance. + +--- + +## How to set up Azure Service Bus as an event destination using the Azure CLI + +Here are the resources you'll need to set up: + +1. Resource Group - A container that holds related Azure resources +2. Service Bus Namespace - The messaging service container (must be globally unique) +3. Topic or a Queue: + - Topic: A message distribution mechanism (pub/sub pattern) + - Queue: A message queue for point-to-point communication + +### Prerequisites + +Assuming you have an Azure account and an authenticated Azure CLI, here are the steps to set up a Service Bus Topic or Queue as an Event Destination. + +Install [Azure CLI](https://docs.microsoft.com/en-us/cli/azure/install-azure-cli). + +Log in to Azure: + +```bash +az login +``` + +### 1. Create Resource Group + +Set variables: + +```bash +RESOURCE_GROUP="outpost-rg" +LOCATION="eastus" +``` + +Create resource group: + +```bash +az group create --name $RESOURCE_GROUP --location $LOCATION +``` + +#### 2. Create Service Bus Namespace + +Generate a unique namespace name (must be globally unique). + +```bash +RANDOM_SUFFIX=$(openssl rand -hex 4) +NAMESPACE_NAME="outpost-servicebus-$RANDOM_SUFFIX" +``` + +Create Service Bus namespace: + +```bash +az servicebus namespace create \ + --resource-group $RESOURCE_GROUP \ + --name $NAMESPACE_NAME \ + --location $LOCATION \ + --sku Standard +``` + +### 3. Create Topic/Queue-Specific Access Policy + +Choose either a Topic or a Queue based on your requirements. Below are examples for both. + +#### For a Topic + +Set a topic name: + +```bash +TOPIC_NAME="destination-test" +``` + +Create a topic: + +```bash +az servicebus topic create \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --name $TOPIC_NAME +``` + +Create a send-only policy for the topic. + +```bash +az servicebus topic authorization-rule create \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --topic-name $TOPIC_NAME \ + --name SendOnlyPolicy \ + --rights Send +``` + +Get the Topic-Specific Connection String: + +```bash +az servicebus topic authorization-rule keys list \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --topic-name $TOPIC_NAME \ + --name SendOnlyPolicy \ + --query primaryConnectionString \ + --output tsv +``` + +This returns a connection string that can only send to the Service Bus topic. Use this connection string in the **Connection String** field of the Event Destination configuration. + +``` +Endpoint=sb://outpost-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=SendOnlyPolicy;SharedAccessKey=xyz789...;EntityPath=events +``` + +#### For a Queue + +Set a queue name: + +```bash +QUEUE_NAME="myqueue" +``` + +Create a queue: + +```bash +az servicebus queue create \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --name $QUEUE_NAME +``` + +Create a send-only policy for the queue. + +```bash +az servicebus queue authorization-rule create \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --queue-name $QUEUE_NAME \ + --name SendOnlyPolicy \ + --rights Send +``` + +Get the Queue-Specific Connection String: + +```bash +az servicebus queue authorization-rule keys list \ + --resource-group $RESOURCE_GROUP \ + --namespace-name $NAMESPACE_NAME \ + --queue-name $QUEUE_NAME \ + --name SendOnlyPolicy \ + --query primaryConnectionString \ + --output tsv +``` + +This returns a connection string that can only send to the Service Bus queue. Use this connection string in the **Connection String** field of the Event Destination configuration. + +``` +Endpoint=sb://outpost-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=SendOnlyPolicy;SharedAccessKey=xyz789...;EntityPath=myqueue +``` diff --git a/internal/destregistry/metadata/providers/azure_servicebus/metadata.json b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json new file mode 100644 index 00000000..41aa704b --- /dev/null +++ b/internal/destregistry/metadata/providers/azure_servicebus/metadata.json @@ -0,0 +1,27 @@ +{ + "type": "azure_servicebus", + "config_fields": [ + { + "key": "name", + "type": "text", + "label": "Queue or Topic Name", + "description": "The name of the Azure Service Bus queue or topic to publish to", + "required": true, + "pattern": "^[a-zA-Z0-9]([a-zA-Z0-9._-]*[a-zA-Z0-9])?$" + } + ], + "credential_fields": [ + { + "key": "connection_string", + "type": "text", + "label": "Connection String", + "description": "Azure Service Bus connection string with Send permissions", + "required": true, + "sensitive": true + } + ], + "label": "Azure Service Bus", + "link": "https://azure.microsoft.com/en-us/services/service-bus/", + "description": "Send events to Azure Service Bus topics for reliable cloud messaging between applications and services.", + "icon": "" +} diff --git a/internal/destregistry/providers/default.go b/internal/destregistry/providers/default.go index 4f14de0a..9b643c14 100644 --- a/internal/destregistry/providers/default.go +++ b/internal/destregistry/providers/default.go @@ -4,6 +4,7 @@ import ( "github.com/hookdeck/outpost/internal/destregistry" "github.com/hookdeck/outpost/internal/destregistry/providers/destawskinesis" "github.com/hookdeck/outpost/internal/destregistry/providers/destawssqs" + "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" "github.com/hookdeck/outpost/internal/destregistry/providers/desthookdeck" "github.com/hookdeck/outpost/internal/destregistry/providers/destrabbitmq" "github.com/hookdeck/outpost/internal/destregistry/providers/destwebhook" @@ -84,6 +85,12 @@ func RegisterDefault(registry destregistry.Registry, opts RegisterDefaultDestina } registry.RegisterProvider("aws_kinesis", awsKinesis) + azureServiceBus, err := destazureservicebus.New(loader) + if err != nil { + return err + } + registry.RegisterProvider("azure_servicebus", azureServiceBus) + rabbitmq, err := destrabbitmq.New(loader) if err != nil { return err diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go new file mode 100644 index 00000000..5f687606 --- /dev/null +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus.go @@ -0,0 +1,222 @@ +package destazureservicebus + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/metadata" + "github.com/hookdeck/outpost/internal/models" +) + +type AzureServiceBusDestination struct { + *destregistry.BaseProvider +} + +type AzureServiceBusDestinationConfig struct { + Name string +} + +type AzureServiceBusDestinationCredentials struct { + ConnectionString string +} + +var _ destregistry.Provider = (*AzureServiceBusDestination)(nil) + +func New(loader metadata.MetadataLoader) (*AzureServiceBusDestination, error) { + base, err := destregistry.NewBaseProvider(loader, "azure_servicebus") + if err != nil { + return nil, err + } + + return &AzureServiceBusDestination{ + BaseProvider: base, + }, nil +} + +func (d *AzureServiceBusDestination) Validate(ctx context.Context, destination *models.Destination) error { + // Just use base validation - let Azure SDK handle connection string validation at runtime + return d.BaseProvider.Validate(ctx, destination) +} + +func (d *AzureServiceBusDestination) CreatePublisher(ctx context.Context, destination *models.Destination) (destregistry.Publisher, error) { + cfg, creds, err := d.resolveMetadata(ctx, destination) + if err != nil { + return nil, err + } + + return &AzureServiceBusPublisher{ + BasePublisher: &destregistry.BasePublisher{}, + connectionString: creds.ConnectionString, + queueOrTopic: cfg.Name, + }, nil +} + +func (d *AzureServiceBusDestination) ComputeTarget(destination *models.Destination) destregistry.DestinationTarget { + name, ok := destination.Config["name"] + if !ok { + return destregistry.DestinationTarget{} + } + + // Try to extract namespace from connection string + if connStr, ok := destination.Credentials["connection_string"]; ok { + namespace := parseNamespaceFromConnectionString(connStr) + if namespace != "" { + return destregistry.DestinationTarget{ + Target: fmt.Sprintf("%s/%s", namespace, name), + TargetURL: "", + } + } + } + + // Fallback to just the name if we can't parse namespace + return destregistry.DestinationTarget{ + Target: name, + TargetURL: "", + } +} + +func (d *AzureServiceBusDestination) Preprocess(newDestination *models.Destination, originalDestination *models.Destination, opts *destregistry.PreprocessDestinationOpts) error { + // No preprocessing needed for Azure Service Bus + return nil +} + +func (d *AzureServiceBusDestination) resolveMetadata(ctx context.Context, destination *models.Destination) (*AzureServiceBusDestinationConfig, *AzureServiceBusDestinationCredentials, error) { + if err := d.BaseProvider.Validate(ctx, destination); err != nil { + return nil, nil, err + } + + return &AzureServiceBusDestinationConfig{ + Name: destination.Config["name"], + }, &AzureServiceBusDestinationCredentials{ + ConnectionString: destination.Credentials["connection_string"], + }, nil +} + +type AzureServiceBusPublisher struct { + *destregistry.BasePublisher + connectionString string + queueOrTopic string + client *azservicebus.Client + sender *azservicebus.Sender +} + +func (p *AzureServiceBusPublisher) ensureSender() (*azservicebus.Sender, error) { + if p.client == nil { + client, err := azservicebus.NewClientFromConnectionString(p.connectionString, nil) + if err != nil { + return nil, fmt.Errorf("failed to create Azure Service Bus client: %w", err) + } + p.client = client + } + + if p.sender == nil { + sender, err := p.client.NewSender(p.queueOrTopic, nil) + if err != nil { + return nil, fmt.Errorf("failed to create sender for queue or topic %s: %w", p.queueOrTopic, err) + } + p.sender = sender + } + + return p.sender, nil +} + +func (p *AzureServiceBusPublisher) Format(ctx context.Context, event *models.Event) (*azservicebus.Message, error) { + dataBytes, err := json.Marshal(event.Data) + if err != nil { + return nil, fmt.Errorf("failed to marshal event data: %w", err) + } + + messageMetadata := map[string]any{} + metadata := p.BasePublisher.MakeMetadata(event, time.Now()) + for k, v := range metadata { + messageMetadata[k] = v + } + + message := &azservicebus.Message{ + Body: dataBytes, + ApplicationProperties: messageMetadata, + } + + return message, nil +} + +func (p *AzureServiceBusPublisher) Publish(ctx context.Context, event *models.Event) (*destregistry.Delivery, error) { + if err := p.BasePublisher.StartPublish(); err != nil { + return nil, err + } + defer p.BasePublisher.FinishPublish() + + message, err := p.Format(ctx, event) + if err != nil { + return nil, err + } + + sender, err := p.ensureSender() + if err != nil { + return nil, err + } + + if err := sender.SendMessage(ctx, message, nil); err != nil { + return &destregistry.Delivery{ + Status: "failed", + Code: "ERR", + Response: map[string]interface{}{ + "error": err.Error(), + }, + }, destregistry.NewErrDestinationPublishAttempt(err, "azure_servicebus", map[string]interface{}{ + "error": err.Error(), + }) + } + + return &destregistry.Delivery{ + Status: "success", + Code: "OK", + Response: map[string]interface{}{}, + }, nil +} + +func (p *AzureServiceBusPublisher) Close() error { + p.BasePublisher.StartClose() + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if p.sender != nil { + if err := p.sender.Close(ctx); err != nil { + return err + } + } + + if p.client != nil { + if err := p.client.Close(ctx); err != nil { + return err + } + } + + return nil +} + +// parseNamespaceFromConnectionString extracts the namespace from an Azure Service Bus connection string. +// Connection strings typically have the format: +// Endpoint=sb://namespace.servicebus.windows.net/;SharedAccessKeyName=...;SharedAccessKey=... +func parseNamespaceFromConnectionString(connStr string) string { + // Split by semicolons to get individual components + parts := strings.Split(connStr, ";") + for _, part := range parts { + if strings.HasPrefix(part, "Endpoint=") { + endpoint := strings.TrimPrefix(part, "Endpoint=") + // Remove protocol prefix + endpoint = strings.TrimPrefix(endpoint, "sb://") + // Extract namespace (everything before first dot) + if idx := strings.Index(endpoint, "."); idx > 0 { + return endpoint[:idx] + } + } + } + return "" +} diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go new file mode 100644 index 00000000..053e6011 --- /dev/null +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_publish_test.go @@ -0,0 +1,183 @@ +package destazureservicebus_test + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus" + "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" + testsuite "github.com/hookdeck/outpost/internal/destregistry/testing" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testinfra" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" +) + +// AzureServiceBusConsumer implements testsuite.MessageConsumer +type AzureServiceBusConsumer struct { + client *azservicebus.Client + receiver *azservicebus.Receiver + msgChan chan testsuite.Message + done chan struct{} +} + +func NewAzureServiceBusConsumer(connectionString, topic, subscription string) (*AzureServiceBusConsumer, error) { + client, err := azservicebus.NewClientFromConnectionString(connectionString, nil) + if err != nil { + return nil, err + } + + receiver, err := client.NewReceiverForSubscription(topic, subscription, nil) + if err != nil { + return nil, err + } + + c := &AzureServiceBusConsumer{ + client: client, + receiver: receiver, + msgChan: make(chan testsuite.Message), + done: make(chan struct{}), + } + go c.consume() + return c, nil +} + +func (c *AzureServiceBusConsumer) consume() { + ctx := context.Background() + for { + select { + case <-c.done: + return + default: + messages, err := c.receiver.ReceiveMessages(ctx, 1, nil) + if err != nil { + continue + } + + for _, msg := range messages { + // Parse custom properties as metadata + metadata := make(map[string]string) + for k, v := range msg.ApplicationProperties { + if strVal, ok := v.(string); ok { + metadata[k] = strVal + } + } + + // Parse the message body + var data interface{} + if err := json.Unmarshal(msg.Body, &data); err != nil { + // If JSON parsing fails, use raw bytes + data = msg.Body + } + + dataBytes, _ := json.Marshal(data) + + c.msgChan <- testsuite.Message{ + Data: dataBytes, + Metadata: metadata, + Raw: msg, + } + + // Complete the message + if err := c.receiver.CompleteMessage(ctx, msg, nil); err != nil { + // Log error but continue + continue + } + } + } + } +} + +func (c *AzureServiceBusConsumer) Consume() <-chan testsuite.Message { + return c.msgChan +} + +func (c *AzureServiceBusConsumer) Close() error { + close(c.done) + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if c.receiver != nil { + c.receiver.Close(ctx) + } + if c.client != nil { + c.client.Close(ctx) + } + return nil +} + +type AzureServiceBusAsserter struct{} + +func (a *AzureServiceBusAsserter) AssertMessage(t testsuite.TestingT, msg testsuite.Message, event models.Event) { + // Phase 1: Basic assertion structure + metadata := msg.Metadata + + // Verify system metadata + assert.NotEmpty(t, metadata["timestamp"], "timestamp should be present") + assert.Equal(t, event.ID, metadata["event-id"], "event-id should match") + assert.Equal(t, event.Topic, metadata["topic"], "topic should match") + + // Verify custom metadata + for k, v := range event.Metadata { + assert.Equal(t, v, metadata[k], "metadata key %s should match expected value", k) + } +} + +type AzureServiceBusSuite struct { + testsuite.PublisherSuite + consumer *AzureServiceBusConsumer +} + +func TestDestinationAzureServiceBusSuite(t *testing.T) { + suite.Run(t, new(AzureServiceBusSuite)) +} + +func (s *AzureServiceBusSuite) SetupSuite() { + t := s.T() + t.Cleanup(testinfra.Start(t)) + mqConfig := testinfra.GetMQAzureConfig(t, "TestDestinationAzureServiceBusSuite") + + // Create consumer + consumer, err := NewAzureServiceBusConsumer( + mqConfig.AzureServiceBus.ConnectionString, + mqConfig.AzureServiceBus.Topic, + mqConfig.AzureServiceBus.Subscription, + ) + require.NoError(t, err) + s.consumer = consumer + + // Create provider + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + require.NoError(t, err) + + // Create destination + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("azure_servicebus"), + testutil.DestinationFactory.WithConfig(map[string]string{ + "name": mqConfig.AzureServiceBus.Topic, + }), + testutil.DestinationFactory.WithCredentials(map[string]string{ + "connection_string": mqConfig.AzureServiceBus.ConnectionString, + }), + ) + + // Initialize publisher suite with custom asserter + cfg := testsuite.Config{ + Provider: provider, + Dest: &destination, + Consumer: s.consumer, + Asserter: &AzureServiceBusAsserter{}, + } + s.InitSuite(cfg) +} + +func (s *AzureServiceBusSuite) TearDownSuite() { + if s.consumer != nil { + _ = s.consumer.Close() + } +} diff --git a/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go b/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go new file mode 100644 index 00000000..ba9015e5 --- /dev/null +++ b/internal/destregistry/providers/destazureservicebus/destazureservicebus_test.go @@ -0,0 +1,250 @@ +package destazureservicebus_test + +import ( + "context" + "strings" + "testing" + + "github.com/hookdeck/outpost/internal/destregistry" + "github.com/hookdeck/outpost/internal/destregistry/providers/destazureservicebus" + "github.com/hookdeck/outpost/internal/models" + "github.com/hookdeck/outpost/internal/util/testutil" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestComputeTarget(t *testing.T) { + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + require.NoError(t, err) + + tests := []struct { + name string + config map[string]string + credentials map[string]string + expectedTarget string + }{ + { + name: "with valid connection string and name", + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + expectedTarget: "mynamespace/my-queue", + }, + { + name: "with different namespace format", + config: map[string]string{ + "name": "my-topic", + }, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://test-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz789", + }, + expectedTarget: "test-namespace/my-topic", + }, + { + name: "with missing name config", + config: map[string]string{}, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + expectedTarget: "", + }, + { + name: "with invalid connection string format", + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{ + "connection_string": "invalid-connection-string", + }, + expectedTarget: "my-queue", // Falls back to just the name + }, + { + name: "with missing connection string", + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{}, + expectedTarget: "my-queue", // Falls back to just the name + }, + { + name: "with connection string missing Endpoint", + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{ + "connection_string": "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + expectedTarget: "my-queue", // Falls back to just the name + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("azure_servicebus"), + testutil.DestinationFactory.WithConfig(tt.config), + testutil.DestinationFactory.WithCredentials(tt.credentials), + ) + result := provider.ComputeTarget(&destination) + assert.Equal(t, tt.expectedTarget, result.Target) + assert.Empty(t, result.TargetURL) // TargetURL should always be empty for now + }) + } +} + +func TestParseNamespaceFromConnectionString(t *testing.T) { + tests := []struct { + name string + connectionString string + expectedNamespace string + }{ + { + name: "standard connection string", + connectionString: "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + expectedNamespace: "mynamespace", + }, + { + name: "connection string with hyphenated namespace", + connectionString: "Endpoint=sb://my-test-namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xyz789", + expectedNamespace: "my-test-namespace", + }, + { + name: "connection string with different order", + connectionString: "SharedAccessKeyName=RootManageSharedAccessKey;Endpoint=sb://namespace123.servicebus.windows.net/;SharedAccessKey=key123", + expectedNamespace: "namespace123", + }, + { + name: "invalid connection string", + connectionString: "invalid-string", + expectedNamespace: "", + }, + { + name: "missing endpoint", + connectionString: "SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + expectedNamespace: "", + }, + { + name: "malformed endpoint", + connectionString: "Endpoint=invalid-endpoint;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + expectedNamespace: "", + }, + { + name: "empty connection string", + connectionString: "", + expectedNamespace: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // We need to test the parseNamespaceFromConnectionString function + // Since it's not exported, we test it indirectly through ComputeTarget + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + require.NoError(t, err) + + dest := models.Destination{ + Type: "azure_servicebus", + Config: map[string]string{ + "name": "test-entity", + }, + Credentials: map[string]string{ + "connection_string": tt.connectionString, + }, + } + + result := provider.ComputeTarget(&dest) + if tt.expectedNamespace == "" { + assert.Equal(t, "test-entity", result.Target) + } else { + assert.Equal(t, tt.expectedNamespace+"/test-entity", result.Target) + } + }) + } +} + +func TestValidate(t *testing.T) { + provider, err := destazureservicebus.New(testutil.Registry.MetadataLoader()) + require.NoError(t, err) + + tests := []struct { + name string + config map[string]string + credentials map[string]string + wantErr bool + errContains string + }{ + { + name: "valid destination", + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + wantErr: false, + }, + { + name: "missing name", + config: map[string]string{}, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + wantErr: true, + errContains: "config.name", + }, + { + name: "missing connection string", + config: map[string]string{ + "name": "my-queue", + }, + credentials: map[string]string{}, + wantErr: true, + errContains: "credentials.connection_string", + }, + { + name: "invalid name pattern", + config: map[string]string{ + "name": "my queue with spaces", // Invalid - should fail pattern validation + }, + credentials: map[string]string{ + "connection_string": "Endpoint=sb://mynamespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=abcd1234", + }, + wantErr: true, + errContains: "config.name", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + destination := testutil.DestinationFactory.Any( + testutil.DestinationFactory.WithType("azure_servicebus"), + testutil.DestinationFactory.WithConfig(tt.config), + testutil.DestinationFactory.WithCredentials(tt.credentials), + ) + ctx := context.Background() + err := provider.Validate(ctx, &destination) + if tt.wantErr { + require.Error(t, err) + if tt.errContains != "" { + var validationErr *destregistry.ErrDestinationValidation + require.ErrorAs(t, err, &validationErr) + require.NotEmpty(t, validationErr.Errors) + // Check that at least one error contains the expected field + found := false + for _, e := range validationErr.Errors { + if strings.Contains(e.Field, tt.errContains) { + found = true + break + } + } + assert.True(t, found, "Expected error field containing %q, but got %+v", tt.errContains, validationErr.Errors) + } + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/internal/portal/src/common/ConfigurationModal/ConfigurationModal.scss b/internal/portal/src/common/ConfigurationModal/ConfigurationModal.scss index 031b2d01..23ba9b4a 100644 --- a/internal/portal/src/common/ConfigurationModal/ConfigurationModal.scss +++ b/internal/portal/src/common/ConfigurationModal/ConfigurationModal.scss @@ -35,6 +35,18 @@ line-height: var(--line-height-m); } + h3 { + font-size: var(--font-size-s); + font-weight: 600; + line-height: var(--line-height-s); + } + + h4 { + font-size: var(--font-size-xs); + font-weight: 600; + line-height: var(--line-height-xs); + } + p, li { font-size: var(--font-size-m); line-height: var(--line-height-m); diff --git a/internal/util/testinfra/azure.go b/internal/util/testinfra/azure.go index 7fe3de4f..68549403 100644 --- a/internal/util/testinfra/azure.go +++ b/internal/util/testinfra/azure.go @@ -71,6 +71,13 @@ func GetMQAzureConfig(t *testing.T, testName string) mqs.QueueConfig { Subscription: "TestIntegrationMQ_AzureServiceBus-subscription", }, }, + "TestDestinationAzureServiceBusSuite": { + AzureServiceBus: &mqs.AzureServiceBusConfig{ + ConnectionString: connString, + Topic: "TestDestinationAzureServiceBusSuite-topic", + Subscription: "TestDestinationAzureServiceBusSuite-subscription", + }, + }, } if cfg, ok := azureSBMap[testName]; !ok {