Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions python/destinations/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# Kafka Replicator Sink

[This connector](https://github.com/quixio/quix-samples/tree/main/python/destinations/kafka) demonstrates how to consume data from a Quix topic and produce it to an external Kafka cluster.

This sink uses the `KafkaReplicatorSink` to serialize and produce messages to an external Kafka cluster, making it easy to replicate data between Kafka clusters or export data from Quix to other Kafka-based systems.

## How to run

Create a [Quix](https://portal.cloud.quix.io/signup?utm_campaign=github) account or log-in and visit the `Connectors` tab to use this connector.

Clicking `Set up connector` allows you to enter your connection details and runtime parameters.

Then either:
* click `Test connection & deploy` to deploy the pre-built and configured container into Quix.

* or click `Customise connector` to inspect or alter the code before deployment.

## Requirements / Prerequisites

You'll need to have an external Kafka cluster accessible either locally or in the cloud.

## Environment Variables

The connector uses the following environment variables:

### Required
- **input**: Name of the input topic to listen to.
- **SINK_OUTPUT_TOPIC**: The target Kafka topic name to produce to on the external Kafka cluster.
- **SINK_BOOTSTRAP_SERVERS**: The external Kafka broker address (e.g., localhost:9092 or broker.example.com:9092).

### Optional
- **CONSUMER_GROUP**: Name of the consumer group for consuming from Quix. Default: "kafka_sink"
- **SINK_KEY_SERIALIZER**: Serializer to use for the message key. Options: json, bytes, string, double, integer. Default: "bytes"
- **SINK_VALUE_SERIALIZER**: Serializer to use for the message value. Options: json, bytes, string, double, integer. Default: "json"
- **SINK_AUTO_CREATE_TOPIC**: Whether to attempt to create the sink topic upon startup. Default: "true"

### Authentication (Optional)
- **SINK_SECURITY_PROTOCOL**: Protocol used to communicate with brokers. Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
- **SINK_SASL_MECHANISM**: SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM
- **SINK_SASL_USERNAME**: SASL username for external Kafka authentication.
- **SINK_SASL_PASSWORD**: SASL password for external Kafka authentication.
- **SINK_SSL_CA_LOCATION**: Path to the SSL CA certificate file for secure connections.

## Contribute

Submit forked projects to the Quix [GitHub](https://github.com/quixio/quix-samples) repo. Any new project that we accept will be attributed to you and you'll receive $200 in Quix credit.

## Open source

This project is open source under the Apache 2.0 license and available in our [GitHub](https://github.com/quixio/quix-samples) repo.

Please star us and mention us on social to show your appreciation.
28 changes: 28 additions & 0 deletions python/destinations/kafka/dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
FROM python:3.12.5-slim-bookworm

# Set environment variables for non-interactive setup and unbuffered output
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONUNBUFFERED=1 \
PYTHONIOENCODING=UTF-8 \
PYTHONPATH="/app"

# Build argument for setting the main app path
ARG MAINAPPPATH=.

# Set working directory inside the container
WORKDIR /app

# Copy requirements to leverage Docker cache
COPY "${MAINAPPPATH}/requirements.txt" "${MAINAPPPATH}/requirements.txt"

# Install dependencies without caching
RUN pip install --no-cache-dir -r "${MAINAPPPATH}/requirements.txt"

# Copy entire application into container
COPY . .

# Set working directory to main app path
WORKDIR "/app/${MAINAPPPATH}"

# Define the container's startup command
ENTRYPOINT ["python3", "main.py"]
113 changes: 113 additions & 0 deletions python/destinations/kafka/library.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
{
"libraryItemId": "kafka-replicator-sink",
"name": "Kafka Replicator Sink",
"language": "Python",
"tags": {
"Pipeline Stage": ["Destination"],
"Type": ["Connectors"],
"Category": ["Data streaming"]
},
"shortDescription": "Consume data from a Quix topic and produce it to an external Kafka cluster",
"DefaultFile": "main.py",
"EntryPoint": "dockerfile",
"RunEntryPoint": "main.py",
"IconFile": "icon.png",
"Variables": [
{
"Name": "input",
"Type": "EnvironmentVariable",
"InputType": "InputTopic",
"Description": "Name of the input topic to listen to.",
"Required": true
},
{
"Name": "SINK_AUTO_CREATE_TOPIC",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used",
"defaultValue": true,
"Required": false
},
{
"Name": "CONSUMER_GROUP",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Name of the consumer group",
"DefaultValue": "kafka_sink",
"Required": false
},
{
"Name": "SINK_OUTPUT_TOPIC",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The target Kafka topic name to produce to on the external Kafka cluster",
"Required": true
},
{
"Name": "SINK_BOOTSTRAP_SERVERS",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "The external Kafka broker address (e.g., localhost:9092 or broker.example.com:9092)",
"Required": true
},
{
"Name": "SINK_KEY_SERIALIZER",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Serializer to use for the message key. Options: json, bytes, string, double, integer",
"DefaultValue": "bytes",
"Required": false
},
{
"Name": "SINK_VALUE_SERIALIZER",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Serializer to use for the message value. Options: json, bytes, string, double, integer",
"DefaultValue": "json",
"Required": false
},
{
"Name": "SINK_SECURITY_PROTOCOL",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Protocol used to communicate with brokers. Options: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL",
"Required": false
},
{
"Name": "SINK_SASL_MECHANISM",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "SASL mechanism for authentication. Options: PLAIN, SCRAM-SHA-256, SCRAM-SHA-512, GSSAPI, OAUTHBEARER, AWS_MSK_IAM",
"Required": false
},
{
"Name": "SINK_SASL_USERNAME",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "SASL username for external Kafka authentication",
"Required": false
},
{
"Name": "SINK_SASL_PASSWORD",
"Type": "EnvironmentVariable",
"InputType": "Secret",
"Description": "SASL password for external Kafka authentication",
"Required": false
},
{
"Name": "SINK_SSL_CA_LOCATION",
"Type": "EnvironmentVariable",
"InputType": "FreeText",
"Description": "Path to the SSL CA certificate file for secure connections. If not provided, system default CA certificates will be used",
"Required": false
}
],
"DeploySettings": {
"DeploymentType": "Service",
"CpuMillicores": 200,
"MemoryInMb": 200,
"Replicas": 1,
"PublicAccess": false,
"ValidateConnection": true
}
}
72 changes: 72 additions & 0 deletions python/destinations/kafka/main.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import os
from typing import Tuple, Type

from pydantic_settings import (
BaseSettings as PydanticBaseSettings,
PydanticBaseSettingsSource,
SettingsConfigDict
)

from quixstreams import Application
from quixstreams.kafka.configuration import ConnectionConfig

from sink import KafkaReplicatorSink
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPDATE IMPORT once sink added to quixstreams



class SinkConnectionConfig(ConnectionConfig):
"""
A ConnectionConfig subclass that reads configuration from environment variables
with a SINK_ prefix.

This allows users to configure the sink's Kafka connection using environment
variables like SINK_BOOTSTRAP_SERVERS, SINK_SASL_USERNAME, etc.

Example:
export SINK_BOOTSTRAP_SERVERS=kafka:9092
export SINK_SECURITY_PROTOCOL=SASL_SSL
export SINK_SASL_MECHANISM=PLAIN
export SINK_SASL_USERNAME=myuser
export SINK_SASL_PASSWORD=mypass

# Then create the config
config = SinkConnectionConfig()
sink = KafkaSink(broker_address=config, topic_name="output-topic")
"""

model_config = SettingsConfigDict(
env_prefix="SINK_",
)

@classmethod
def settings_customise_sources(
cls,
settings_cls: Type[PydanticBaseSettings],
init_settings: PydanticBaseSettingsSource,
env_settings: PydanticBaseSettingsSource,
dotenv_settings: PydanticBaseSettingsSource,
file_secret_settings: PydanticBaseSettingsSource,
) -> Tuple[PydanticBaseSettingsSource, ...]:
"""
Enable reading values from environment variables with SINK_ prefix.
"""
return init_settings, env_settings


app = Application(
consumer_group=os.environ["CONSUMER_GROUP"],
auto_offset_reset="earliest",
)
input_topic = app.topic(os.environ['input'])
kafka_sink = KafkaReplicatorSink(
broker_address=SinkConnectionConfig(),
topic_name=os.environ["SINK_OUTPUT_TOPIC"],
key_serializer=os.getenv("SINK_KEY_SERIALIZER", "bytes"),
value_serializer=os.getenv("SINK_VALUE_SERIALIZER", "json"),
origin_topic=input_topic,
auto_create_sink_topic=os.getenv("SINK_AUTO_CREATE_TOPIC", "true").lower() == "true",
)
app.dataframe(input_topic).sink(kafka_sink)


if __name__ == '__main__':
app.run()
2 changes: 2 additions & 0 deletions python/destinations/kafka/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
quixstreams==3.23.1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

UPDATE VERSION once sink added to quixstreams

python-dotenv
Loading