Skip to content

feat: implement destazureservicebus #437

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jul 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions build/dev/azure/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,15 @@
}
]
},
// test destination
{
"Name": "destination-test",
"Subscriptions": [
{
"Name": "destination-test-sub"
}
]
},

// tests
{
Expand All @@ -53,6 +62,17 @@
"Rules": []
}
]
},
{
"Name": "TestDestinationAzureServiceBusSuite-topic",
"Properties": {},
"Subscriptions": [
{
"Name": "TestDestinationAzureServiceBusSuite-subscription",
"Properties": {},
"Rules": []
}
]
}
]
}
Expand Down
124 changes: 124 additions & 0 deletions cmd/destinations/azureservicebus/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
)

// local
const (
TOPIC_NAME = "destination-test"
SUBSCRIPTION_NAME = "destination-test-sub"
CONNECTION_STRING = "Endpoint=sb://localhost;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;"
)
Comment on lines +16 to +20
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After provisioning your resources using the document above, you can replace these constants with your real credentials for testing.

$ go run ./cmd/destinations/azureservicebus

You can also run the Azure emulator and test against it locally too.

# to run Azure emulator
$ make up/azure


func main() {
if err := run(); err != nil {
panic(err)
}
}

func run() error {
// Create client
client, err := azservicebus.NewClientFromConnectionString(CONNECTION_STRING, nil)
if err != nil {
return fmt.Errorf("failed to create client: %w", err)
}
defer client.Close(context.Background())

// Create receiver for the subscription
receiver, err := client.NewReceiverForSubscription(TOPIC_NAME, SUBSCRIPTION_NAME, nil)
if err != nil {
return fmt.Errorf("failed to create receiver: %w", err)
}
defer receiver.Close(context.Background())

// Set up signal handling for graceful shutdown
termChan := make(chan os.Signal, 1)
signal.Notify(termChan, syscall.SIGINT, syscall.SIGTERM)

// Start consuming messages
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

go func() {
for {
messages, err := receiver.ReceiveMessages(ctx, 1, nil)
if err != nil {
if ctx.Err() != nil {
// Context cancelled, exit gracefully
return
}
log.Printf("[x] Error receiving messages: %v", err)
continue
}

for _, msg := range messages {
// Log message details
log.Printf("[x] Received message:")
log.Printf(" Message ID: %s", msg.MessageID)
log.Printf(" Sequence Number: %d", *msg.SequenceNumber)

// Log application properties (metadata)
if len(msg.ApplicationProperties) > 0 {
log.Printf(" Metadata:")
for k, v := range msg.ApplicationProperties {
log.Printf(" %s: %v", k, v)
}
}

// Log message body
var data interface{}
if err := json.Unmarshal(msg.Body, &data); err == nil {
// Pretty print JSON
pretty, _ := json.MarshalIndent(data, " ", " ")
log.Printf(" Body (JSON):\n %s", string(pretty))
} else {
// Raw body
log.Printf(" Body (Raw): %s", string(msg.Body))
}

// Complete the message
if err := receiver.CompleteMessage(ctx, msg, nil); err != nil {
log.Printf("[x] Error completing message: %v", err)
}
}
}
}()

// Log configuration
log.Printf("[*] Azure Service Bus Consumer")
log.Printf("[*] Topic: %s", TOPIC_NAME)
log.Printf("[*] Subscription: %s", SUBSCRIPTION_NAME)
log.Printf("[*] Namespace: %s", extractNamespace(CONNECTION_STRING))
log.Printf("[*] Ready to receive messages. Press Ctrl+C to exit.")

// Wait for termination signal
<-termChan
log.Printf("[*] Shutting down...")
cancel()

return nil
}

func extractNamespace(connStr string) string {
// Simple extraction for display purposes
start := len("Endpoint=sb://")
if len(connStr) > start {
end := start
for end < len(connStr) && connStr[end] != '.' {
end++
}
if end > start {
return connStr[start:end]
}
}
return "unknown"
}
169 changes: 169 additions & 0 deletions contributing/destinations/azure_servicebus/configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
# AzureServiceBus Destination configuration
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This document explains my thought process behind the destination config / credentials and also some further configuration we can support.


Here's a rough document explaining how AzureServiceBus works and how the destination is implemented with Outpost.

# PubSub vs Queue

Azure ServiceBus supports both PubSub (Topic & Subscription) and Queue. From the Publisher (Azure's term is Sender) perspective, it doesn't really care whether it's publishing to a Topic or to a Queue. So, from the destination config, all we need is a single "name" field.

## Message

Whether it's publishing to Topic or Queue, the Publisher needs to send an Azure's Message. Here's the full Golang SDK Message struct:

```golang
// Message is a message with a body and commonly used properties.
// Properties that are pointers are optional.
type Message struct {
// ApplicationProperties can be used to store custom metadata for a message.
ApplicationProperties map[string]any

// Body corresponds to the first []byte array in the Data section of an AMQP message.
Body []byte

// ContentType describes the payload of the message, with a descriptor following
// the format of Content-Type, specified by RFC2045 (ex: "application/json").
ContentType *string

// CorrelationID allows an application to specify a context for the message for the purposes of
// correlation, for example reflecting the MessageID of a message that is being
// replied to.
CorrelationID *string

// MessageID is an application-defined value that uniquely identifies
// the message and its payload. The identifier is a free-form string.
//
// If enabled, the duplicate detection feature identifies and removes further submissions
// of messages with the same MessageId.
MessageID *string

// PartitionKey is used with a partitioned entity and enables assigning related messages
// to the same internal partition. This ensures that the submission sequence order is correctly
// recorded. The partition is chosen by a hash function in Service Bus and cannot be chosen
// directly.
//
// For session-aware entities, the ReceivedMessage.SessionID overrides this value.
PartitionKey *string

// ReplyTo is an application-defined value specify a reply path to the receiver of the message. When
// a sender expects a reply, it sets the value to the absolute or relative path of the queue or topic
// it expects the reply to be sent to.
ReplyTo *string

// ReplyToSessionID augments the ReplyTo information and specifies which SessionId should
// be set for the reply when sent to the reply entity.
ReplyToSessionID *string

// ScheduledEnqueueTime specifies a time when a message will be enqueued. The message is transferred
// to the broker but will not available until the scheduled time.
ScheduledEnqueueTime *time.Time

// SessionID is used with session-aware entities and associates a message with an application-defined
// session ID. Note that an empty string is a valid session identifier.
// Messages with the same session identifier are subject to summary locking and enable
// exact in-order processing and demultiplexing. For session-unaware entities, this value is ignored.
SessionID *string

// Subject enables an application to indicate the purpose of the message, similar to an email subject line.
Subject *string

// TimeToLive is the duration after which the message expires, starting from the instant the
// message has been accepted and stored by the broker, found in the ReceivedMessage.EnqueuedTime
// property.
//
// When not set explicitly, the assumed value is the DefaultTimeToLive for the queue or topic.
// A message's TimeToLive cannot be longer than the entity's DefaultTimeToLive is silently
// adjusted if it does.
TimeToLive *time.Duration

// To is reserved for future use in routing scenarios but is not currently used by Service Bus.
// Applications can use this value to indicate the logical destination of the message.
To *string
}
```

Here are a few notable configuration, especially on the destination level that we may want to support:

- MessageID --> MessageIDTemplate, similar to AWS Kinesis Parition Key approach
- CorrelationID --> CorrelationIDTemplate, similar to AWS Kinesis Parition Key approach
- PartitionKey --> PartitionKeyTemplate, similar to AWS Kinesis Parition Key approach

- ScheduledEnqueueTime
- TimeToLive

The current implementation doesn't support any of these. So when create destination, it's super straightforward:

```golang
type Config struct {
Name string
}
```

If we want to support these, we can either add them to Config, such as `Config.TTL`, or we can also add a suffix like `Config.MessageTTL` to specify that these config would apply to the Message.
Comment on lines +84 to +101
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alexluong let's be consistent across destinations.

Let's also be explicit in our naming so it's clear where the configuration is applied.


## Authentication

For authentication, we currently support "connection_string" which by default have access to the full Namespace.

## Creating Topic/Queue-Specific Access Policy

### For a Topic (Send-only access):

Create a Send-only policy for a specific topic

az servicebus topic authorization-rule create \
--resource-group outpost-demo-rg \
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
--topic-name events \
--name SendOnlyPolicy \
--rights Send

Get the Topic-Specific Connection String:

az servicebus topic authorization-rule keys list \
--resource-group outpost-demo-rg \
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
--topic-name events \
--name SendOnlyPolicy \
--query primaryConnectionString \
--output tsv

This returns a connection string that can only send to the events topic:
Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=Send
OnlyPolicy;SharedAccessKey=xyz789...;EntityPath=events

### For Queues (similar approach):

Create a Send-only policy for a specific queue
az servicebus queue authorization-rule create \
--resource-group outpost-demo-rg \
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
--queue-name myqueue \
--name SendOnlyPolicy \
--rights Send

Available Permission Rights:

- Send - Can only send messages
- Listen - Can only receive messages
- Manage - Full control (send, receive, manage)

You can combine multiple rights:
--rights Send Listen # Can both send and receive

Benefits of Entity-Level Access:

1. Security: Limits blast radius if credentials are compromised
2. Principle of Least Privilege: Outpost only needs Send permission
3. Audit Trail: Can track which policy is being used
4. Rotation: Can rotate entity-specific keys without affecting other services

Important Notes:

- Entity-level connection strings include EntityPath parameter
- These policies are scoped to a single topic/queue
- Perfect for production where you want to limit Outpost to only sending to specific
topics
- The connection string format is the same, just with limited scope

This is the recommended approach for production use - give Outpost only the minimum
permissions it needs (Send) and only to the specific topic/queue it should access.
68 changes: 68 additions & 0 deletions contributing/destinations/azure_servicebus/test-destination.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Test AzureServiceBus Destination

Assuming you have an Azure account & an authenticated Azure CLI, here are the steps you can do to set up a test ServiceBus Topic & Subscription for testing.

Here are the resources you'll set up:

1. Resource Group - A container that holds related Azure resources
- Name: outpost-demo-rg
2. Service Bus Namespace - The messaging service container (must be globally unique)
- Name: outpost-demo-sb-[RANDOM] (e.g., outpost-demo-sb-a3f2b1)
3. Topic - A message distribution mechanism (pub/sub pattern)
- Name: destination-test
4. Subscription - A receiver for messages sent to the topic
- Name: destination-test-sub

Step 1: Create a Resource Group

az group create \
--name outpost-demo-rg \
--location eastus

Step 2: Create a Service Bus Namespace

# Generate a random suffix for uniqueness
RANDOM_SUFFIX=$(openssl rand -hex 3)

# Create the namespace
az servicebus namespace create \
--resource-group outpost-demo-rg \
--name outpost-demo-sb-${RANDOM_SUFFIX} \
--location eastus \
--sku Standard

Note: The namespace name must be globally unique. The Standard SKU is required for topics (Basic only
supports queues).

Step 3: Create a Topic

az servicebus topic create \
--resource-group outpost-demo-rg \
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
--name destination-test

Step 4: Create a Subscription

az servicebus topic subscription create \
--resource-group outpost-demo-rg \
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
--topic-name destination-test \
--name destination-test-sub

Step 5: Get the Connection String

az servicebus namespace authorization-rule keys list \
--resource-group outpost-demo-rg \
--namespace-name outpost-demo-sb-${RANDOM_SUFFIX} \
--name RootManageSharedAccessKey \
--query primaryConnectionString \
--output tsv

Example Output:
Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;Sh
aredAccessKey=abcd1234...

You can then create an Azure destination with:
- name: "destination-test"
- connection string: "Endpoint=sb://outpost-demo-sb-a3f2b1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;Sh
aredAccessKey=abcd1234..."
Loading