Skip to content

Connector: FileSink refactor #1004

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Aug 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 59 additions & 17 deletions docs/connectors/sinks/amazon-s3-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pip install quixstreams[s3]

## How It Works

`FileSink` with `S3Destination` is a batching sink that writes data directly to Amazon S3.
`S3FileSink` is a batching sink that writes data directly to Amazon S3.

It batches processed records in memory per topic partition and writes them to S3 objects in a specified bucket and prefix structure. Objects are organized by topic and partition, with each batch being written to a separate object named by its starting offset.

Expand All @@ -31,30 +31,36 @@ Batches are written to S3 during the commit phase of processing. This means the

## How To Use

Create an instance of `FileSink` with `S3Destination` and pass it to the `StreamingDataFrame.sink()` method.
Create an instance of `S3FileSink` and pass it to the `StreamingDataFrame.sink()` method.

```python
import os

from quixstreams import Application
from quixstreams.sinks.community.file import FileSink
from quixstreams.sinks.community.file.destinations import S3Destination
from quixstreams.sinks.community.file.s3 import S3FileSink
from quixstreams.sinks.community.file.formats import JSONFormat


# Configure the sink to write JSON files to S3
file_sink = FileSink(
file_sink = S3FileSink(
bucket="my-bucket",
region_name="eu-west-2",

# Optional: defaults to current working directory
directory="data",

# Optional: defaults to "json"
# Available formats: "json", "parquet" or an instance of Format
format=JSONFormat(compress=True),
destination=S3Destination(
bucket="my-bucket",
# Optional: AWS credentials
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
region_name="eu-west-2",
# Optional: Additional keyword arguments are passed to the boto3 client
endpoint_url="http://localhost:4566", # for LocalStack testing
)

# Optional: AWS credentials
aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],

# Optional: For local testing
endpoint_url="http://localhost:4566",

# Optional: Additional keyword arguments are passed to boto3 client
)

app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
Expand All @@ -73,10 +79,11 @@ if __name__ == "__main__":
export AWS_ACCESS_KEY_ID="your_access_key"
export AWS_SECRET_ACCESS_KEY="your_secret_key"
export AWS_DEFAULT_REGION="eu-west-2"
export AWS_ENDPOINT_URL_S3="http://your.url.here"
```
Then you can create the destination with just the bucket name:
Then you can create the sink with just the bucket name:
```python
s3_sink = S3Destination(bucket="my-bucket")
s3_sink = S3FileSink(bucket="my-bucket")
```

## S3 Object Organization
Expand Down Expand Up @@ -105,4 +112,39 @@ Each object is named using the batch's starting offset (padded to 19 digits) and

## Delivery Guarantees

`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing.
`S3FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing.


## Testing Locally

Rather than connect to AWS, you can alternatively test your application using a local
emulated S3 host via Docker (using minio):

1. Execute in terminal:

```bash
docker run --rm -d --name minio \
-p 9000-9001:9000-9001 \
-e MINIO_ROOT_USER=admin \
-e MINIO_ROOT_PASSWORD=admin_pw \
-v /data \
quay.io/minio/minio server /data --console-address ":9001"
```

2.
- Navigate to the UI at `http://localhost:9001`
- Authenticate with `username=admin`, `password=admin_pw`
- Create a bucket.

3. Connect using the following:
```python
from quixstreams.sinks.community.file.s3 import S3FileSink

S3FileSink(
bucket="<YOUR BUCKET NAME>",
aws_access_key_id='admin',
aws_secret_access_key='admin_pw',
region_name='us-east-1',
endpoint_url='http://localhost:9000',
)
```
19 changes: 10 additions & 9 deletions docs/connectors/sinks/local-file-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,31 +11,32 @@ By default, the data will include the kafka message key, value, and timestamp.

## How It Works

`FileSink` with `LocalDestination` is a batching sink that writes data to your local filesystem.
`LocalFileSink` is a batching sink that writes data to your local filesystem.

It batches processed records in memory per topic partition and writes them to files in a specified directory structure. Files are organized by topic and partition. When append mode is disabled (default), each batch is written to a separate file named by its starting offset. When append mode is enabled, all records for a partition are written to a single file.

The sink can either create new files for each batch or append to existing files (when using formats that support appending).

## How To Use

Create an instance of `FileSink` and pass it to the `StreamingDataFrame.sink()` method.
Create an instance of `LocalFileSink` and pass it to the `StreamingDataFrame.sink()` method.

```python
from quixstreams import Application
from quixstreams.sinks.community.file import FileSink
from quixstreams.sinks.community.file.destinations import LocalDestination
from quixstreams.sinks.community.file.local import LocalFileSink
from quixstreams.sinks.community.file.formats import JSONFormat

# Configure the sink to write JSON files
file_sink = FileSink(
file_sink = LocalFileSink(
# Append can be set to True to dump to a single file per partition
append=False,

# Optional: defaults to current working directory
directory="data",

# Optional: defaults to "json"
# Available formats: "json", "parquet" or an instance of Format
format=JSONFormat(compress=True),
# Optional: defaults to LocalDestination(append=False)
destination=LocalDestination(append=True),
format=JSONFormat(compress=True)
)

app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
Expand Down Expand Up @@ -73,4 +74,4 @@ Each file is named using the batch's starting offset (padded to 19 digits) and t

## Delivery Guarantees

`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing.
`LocalFileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing.
38 changes: 29 additions & 9 deletions docs/connectors/sinks/microsoft-azure-file-sink.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ pip install quixstreams[azure-file]

## How It Works

`FileSink` with `AzureFileDestination` is a batching sink that writes data directly to Microsoft Azure.
`AzureFileSink` is a batching sink that writes data directly to Microsoft Azure.

It batches processed records in memory per topic partition and writes them to Azure objects in a specified container and prefix structure. Objects are organized by topic and partition, with each batch being written to a separate object named by its starting offset.

Expand All @@ -31,25 +31,25 @@ Batches are written to Azure during the commit phase of processing. This means t

## How To Use

Create an instance of `FileSink` with `AzureFileDestination` and pass it to the `StreamingDataFrame.sink()` method.
Create an instance of `AzureFileSink` and pass it to the `StreamingDataFrame.sink()` method.

```python
from quixstreams import Application
from quixstreams.sinks.community.file import FileSink
from quixstreams.sinks.community.file.azure import AzureFileSink
from quixstreams.sinks.community.file.destinations import AzureFileDestination


# Configure the sink to write JSON files to Azure
file_sink = FileSink(
file_sink = AzureFileSink(
azure_container="<YOUR AZURE CONTAINER NAME>",
azure_connection_string="<YOUR AZURE CONNECTION STRING>",

# Optional: defaults to current working directory
directory="data",

# Optional: defaults to "json"
# Available formats: "json", "parquet" or an instance of Format
format=JSONFormat(compress=True),
destination=AzureFileDestination(
container="<YOUR CONTAINER NAME>",
connection_string="<YOUR CONNECTION STRING>",
)
)

app = Application(broker_address='localhost:9092', auto_offset_reset="earliest")
Expand Down Expand Up @@ -88,4 +88,24 @@ Each object is named using the batch's starting offset (padded to 19 digits) and

## Delivery Guarantees

`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing.
`FileSink` provides at-least-once guarantees, and the results may contain duplicated data if there were errors during processing.


## Testing Locally

Rather than connect to Azure, you can alternatively test your application using a local
emulated Azure host via Docker:

1. Execute in terminal:

```bash
docker run --rm -d --name azurite \
-p 10000:10000 \
mcr.microsoft.com/azure-storage/azurite:latest
```

2. Set `connection_string` for `AzureOrigin` to:

```python
"DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://127.0.0.1:10000/devstoreaccount1;"
```
39 changes: 24 additions & 15 deletions docs/connectors/sources/amazon-s3-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,24 +266,33 @@ within the provided topic's folder structure.
The default topic name the Application dumps to is based on the last folder name of
the `FileSource` `directory` as: `source__<last folder name>`.

## Testing Locally

Rather than connect to AWS, you can alternatively test your application using a local
emulated S3 host via Docker:
emulated S3 host via Docker (using minio):

1. Execute in terminal:

```bash
docker run --rm -d --name s3 \
-p 4566:4566 \
-e SERVICES=s3 \
-e EDGE_PORT=4566 \
-e DEBUG=1 \
localstack/localstack:latest
docker run --rm -d --name minio \
-p 9000-9001:9000-9001 \
-e MINIO_ROOT_USER=admin \
-e MINIO_ROOT_PASSWORD=admin_pw \
-v /data \
quay.io/minio/minio server /data --console-address ":9001"
```

2. Set `endpoint_url` for `S3Origin` _OR_ the `AWS_ENDPOINT_URL_S3`
environment variable to `http://localhost:4566`

3. Set all other `aws_` parameters for `S3Origin` to _any_ string.
They will not be used, but they must still be populated!
2.
- Navigate to the UI at `http://localhost:9001`
- Authenticate with `username=admin`, `password=admin_pw`
- Create a bucket.

3. Connect using the following:
```python
from quixstreams.sources.community.file.s3 import S3FileSource

S3FileSource(
bucket="<YOUR BUCKET NAME>",
aws_access_key_id='admin',
aws_secret_access_key='admin_pw',
region_name='us-east-1',
endpoint_url='http://localhost:9000',
)
```
4 changes: 0 additions & 4 deletions quixstreams/sinks/community/file/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +0,0 @@
# ruff: noqa: F403
from .destinations import *
from .formats import *
from .sink import *
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import logging
from typing import Optional

from quixstreams.sinks import SinkBatch
from quixstreams.sinks.community.file.destinations.base import Destination
from typing import Optional, Union

try:
from azure.core.exceptions import HttpResponseError
Expand All @@ -14,8 +11,16 @@
'run "pip install quixstreams[azure-file]" to use AzureFileDestination'
) from exc

from quixstreams.sinks import (
ClientConnectFailureCallback,
ClientConnectSuccessCallback,
SinkBatch,
)

from .base import FileSink
from .formats import Format, FormatName

__all__ = ("AzureFileDestination",)
__all__ = ("AzureFileSink",)


logger = logging.getLogger(__name__)
Expand All @@ -29,7 +34,7 @@ class AzureContainerAccessDeniedError(Exception):
"""Raised when the specified Azure File container access is denied."""


class AzureFileDestination(Destination):
class AzureFileSink(FileSink):
"""
A destination that writes data to Microsoft Azure File.

Expand All @@ -39,20 +44,40 @@ class AzureFileDestination(Destination):

def __init__(
self,
connection_string: str,
container: str,
azure_connection_string: str,
azure_container: str,
directory: str = "",
format: Union[FormatName, Format] = "json",
on_client_connect_success: Optional[ClientConnectSuccessCallback] = None,
on_client_connect_failure: Optional[ClientConnectFailureCallback] = None,
) -> None:
"""
Initialize the Azure File destination.

:param connection_string: Azure client authentication string.
:param container: Azure container name.
:param azure_connection_string: Azure client authentication string.
:param azure_container: Azure container name.
:param directory: Base directory path for storing files. Defaults to
current directory.
:param format: Data serialization format, either as a string
("json", "parquet") or a Format instance.
:param on_client_connect_success: An optional callback made after successful
client authentication, primarily for additional logging.
:param on_client_connect_failure: An optional callback made after failed
client authentication (which should raise an Exception).
Callback should accept the raised Exception as an argument.
Callback must resolve (or propagate/re-raise) the Exception.

:raises AzureContainerNotFoundError: If the specified container doesn't exist.
:raises AzureContainerAccessDeniedError: If access to the container is denied.
"""
self._container = container
self._auth = connection_string
super().__init__(
directory=directory,
format=format,
on_client_connect_success=on_client_connect_success,
on_client_connect_failure=on_client_connect_failure,
)
self._container = azure_container
self._auth = azure_connection_string
self._client: Optional[ContainerClient] = None

def _get_client(self) -> ContainerClient:
Expand Down Expand Up @@ -91,7 +116,7 @@ def setup(self):
self._client = self._get_client()
self._validate_container()

def write(self, data: bytes, batch: SinkBatch) -> None:
def _write(self, data: bytes, batch: SinkBatch) -> None:
"""
Write data to Azure.

Expand Down
Loading