Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
0.3.0
-----
* Implementation of CDCPublisher (CASSSIDECAR-243)
* Sidecar endpoint for moving a node to a new token (CASSSIDECAR-344)
* Returning JSON responses for live migration status endpoints in case of errors (CASSSIDECAR-395)
* Upgrade vertx to 4.5.23 (CASSSIDECAR-391)
Expand Down
202 changes: 202 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,208 @@ Configuring Cassandra Instance

While setting up cassandra instance, make sure the data directories of cassandra are in the path stored in sidecar.yaml file, else modify data directories path to point to the correct directories for stream APIs to work.

Change Data Capture (CDC) Configuration
---------------------------------------

Apache Cassandra Sidecar supports Change Data Capture (CDC) to stream table mutations to Apache Kafka. This section describes how to configure and run Sidecar with CDC enabled.

### Prerequisites

1. Apache Cassandra 4.0+ with CDC support
2. Apache Kafka cluster
3. Sidecar configured with schema management enabled

### Configuration Steps

#### 1. Enable CDC in Cassandra

Edit your `cassandra.yaml` configuration file and enable CDC:

```yaml
cdc_enabled: true
```

Restart your Cassandra instance for this change to take effect.

#### 2. Configure Sidecar for CDC

Edit your `sidecar.yaml` configuration file with the following settings:

```yaml
sidecar:
# Enable schema management (required for CDC)
schema:
is_enabled: true
keyspace: sidecar_internal
replication_strategy: SimpleStrategy
replication_factor: 3

# Enable CDC feature
cdc:
enabled: true
config_refresh_time: 10s
table_schema_refresh_time: 60s
segment_hardlink_cache_expiry: 1m
```

**Configuration Parameters:**
- `schema.is_enabled`: **Must be `true`** for CDC to function. Creates the `sidecar_internal` keyspace for CDC state management.
- `cdc.enabled`: Enables the CDC feature in Sidecar.
- `cdc.config_refresh_time`: How frequently CDC configuration is refreshed from the database.
- `cdc.table_schema_refresh_time`: How frequently table schemas are refreshed for CDC-enabled tables.
- `cdc.segment_hardlink_cache_expiry`: Cache expiration time for CDC segment hard links.

#### 3. Enable CDC on Tables

For each table you want to capture changes from, enable the CDC property using CQL:

```cql
-- For a new table
CREATE TABLE my_keyspace.my_table (
id text PRIMARY KEY,
name text,
value int
) WITH cdc = true;

-- For an existing table
ALTER TABLE my_keyspace.my_table WITH cdc = true;
```

#### 4. Configure CDC Service

Use the CDC configuration API endpoint to set up CDC parameters:

```bash
curl --request PUT \
--url http://localhost:9043/api/v1/services/cdc/config \
--header 'content-type: application/json' \
--data '{
"config": {
"datacenter": "datacenter1",
"env": "production",
"topic_format_type": "STATIC",
"topic": "cdc-events"
}
}'
```

**CDC Configuration Parameters:**
- `datacenter`: The datacenter name for this Sidecar instance.
- `env`: Environment identifier (e.g., `production`, `staging`, `dev`).
- `topic_format_type`: Determines how Kafka topic names are generated. Options:
- `STATIC`: Use a single fixed topic name specified in `topic` field
- `KEYSPACE`: Format as `{topic}-{keyspace}`
- `KEYSPACETABLE`: Format as `{topic}-{keyspace}-{table}`
- `TABLE`: Format as `{topic}-{table}`
- `MAP`: Use custom topic mapping (advanced)
- `topic`: Base Kafka topic name for CDC events.

#### 5. Configure Kafka Producer

Configure the Kafka producer settings using the Kafka configuration API endpoint:

```bash
curl --request PUT \
--url http://localhost:9043/api/v1/services/kafka/config \
--header 'content-type: application/json' \
--data '{
"config": {
"bootstrap.servers": "localhost:9092",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.ByteArraySerializer",
"acks": "all",
"retries": "3",
"retry.backoff.ms": "200",
"enable.idempotence": "true",
"batch.size": "16384",
"linger.ms": "5",
"buffer.memory": "33554432",
"compression.type": "snappy",
"request.timeout.ms": "30000",
"delivery.timeout.ms": "120000",
"max.in.flight.requests.per.connection": "5",
"client.id": "cdc-producer"
}
}'
```

**Key Kafka Producer Parameters:**
- `bootstrap.servers`: Comma-separated list of Kafka broker addresses.
- `key.serializer`: Serializer for the message key (use `StringSerializer`).
- `value.serializer`: Serializer for the message value (use `ByteArraySerializer` for Avro).
- `acks`: Number of acknowledgments the producer requires (`all` for maximum durability).
- `enable.idempotence`: Ensures exactly-once semantics when set to `true`.
- `compression.type`: Compression algorithm (`snappy`, `gzip`, `lz4`, `zstd`, or `none`).

For a complete list of Kafka producer configurations, see the [Apache Kafka Producer Configuration Documentation](https://kafka.apache.org/documentation/#producerconfigs).

### Data Format and Serialization

CDC events are serialized in **Apache Avro** format. Sidecar includes a built-in schema store (`CachingSchemaStore`) that:
- Automatically tracks CDC-enabled table schemas
- Converts CQL schemas to Avro schemas
- Refreshes schemas based on `table_schema_refresh_time` configuration
- Caches Avro schemas for performance

Each CDC event published to Kafka contains:
- **Key**: Table identifier (keyspace + table name)
- **Value**: Avro-serialized mutation data containing:
- Partition key
- Clustering key (if applicable)
- Mutation type (INSERT, UPDATE, DELETE)
- Column values
- Timestamp

### Verification

After completing the configuration:

1. **Check Sidecar Logs**: Verify CDC is enabled and connected to Kafka:
```
grep -i "cdc" /path/to/sidecar.log
```

2. **Verify Configuration**: Retrieve current CDC and Kafka configurations:
```bash
# Get CDC configuration
curl http://localhost:9043/api/v1/services/cdc/config

# Get Kafka configuration
curl http://localhost:9043/api/v1/services/kafka/config

# Get all service configurations
curl http://localhost:9043/api/v1/services
```

### Advanced Configuration

#### Custom Schema Registry Integration

While Sidecar includes a built-in schema store, you can integrate with external schema registries by:
1. Implementing a custom `SchemaStore` interface
2. Registering your implementation via Guice dependency injection
3. Configuring your schema registry connection details in the Kafka producer configuration


### Troubleshooting

**CDC not starting:**
- Verify `schema.is_enabled: true` in `sidecar.yaml`
- Check Cassandra has `cdc_enabled: true`
- Ensure `sidecar_internal` keyspace exists and is accessible

**No messages in Kafka:**
- Verify tables have `cdc = true` property
- Check Kafka connectivity and broker availability
- Review Sidecar logs for errors: `grep -i "kafka\|cdc" /path/to/sidecar.log`
- Verify CDC and Kafka configurations are set via API endpoints

**Schema errors:**
- Ensure table schemas are stable (avoid frequent schema changes during CDC)
- Check `table_schema_refresh_time` is appropriate for your use case
- Review Sidecar logs for schema conversion errors


Testing
-------

Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -194,13 +194,16 @@ startScripts {
}
}

apply from: "${project.rootDir}/gradle/common/java11Options.gradle"

run {
def confFile = System.getProperty("sidecar.config", "file:" + File.separator + File.separator + "$projectDir/conf/sidecar.yaml")
println "Sidecar configuration file $confFile"
jvmArgs = ["-Dsidecar.logdir=./logs",
"-Dsidecar.config=" + confFile,
"-Dlogback.configurationFile=./conf/logback.xml",
"-Dvertx.logger-delegate-factory-class-name=io.vertx.core.logging.SLF4JLogDelegateFactory"]
jvmArgs += project.ext.JDK11_OPTIONS
}

distributions {
Expand Down
3 changes: 2 additions & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ jakartaWsRsVersion=3.1.0
swaggerVersion=2.2.21
# Cdc dependencies
kryoVersion=4.0.2
analyticsVersion=0.1.0
# OSHI dependencies
oshiVersion=6.9.0
analyticsVersion=0.2.0
kafkaClientVersion=3.7.0
15 changes: 14 additions & 1 deletion server/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ configurations {
containerTestImplementation.extendsFrom testImplementation
}

apply from: "${project.rootDir}/gradle/common/java11Options.gradle"

dependencies {
compileOnly('org.jetbrains:annotations:23.0.0')
testCompileOnly('org.jetbrains:annotations:23.0.0')
Expand Down Expand Up @@ -144,7 +146,13 @@ dependencies {
implementation("jakarta.ws.rs:jakarta.ws.rs-api:${project.jakartaWsRsVersion}")

implementation(group: "org.apache.cassandra", name: "cassandra-analytics-common", version: "${[project.analyticsVersion]}")
implementation(group: "org.apache.cassandra", name: "cassandra-analytics-core_spark3_2.12", version: "${[project.analyticsVersion]}")
implementation(group: "org.apache.cassandra", name: "cassandra-analytics-cdc-codec_spark3_2.12", version: "${[project.analyticsVersion]}")
implementation(group: "org.apache.cassandra", name: "cassandra-avro-converter_spark3_2.12", version: "${[project.analyticsVersion]}")
implementation(group: "org.apache.cassandra", name: "cassandra-analytics-cdc_spark3_2.12", version: "${[project.analyticsVersion]}")
implementation(group: "org.apache.cassandra", name: "cassandra-analytics-cdc-sidecar_spark3_2.12", version: "${[project.analyticsVersion]}")

implementation "org.apache.kafka:kafka-clients:${project.kafkaClientVersion}"
implementation "com.esotericsoftware:kryo-shaded:${kryoVersion}"

// OSHI core library for fetching system information
Expand Down Expand Up @@ -197,10 +205,16 @@ dependencies {
}

test {
if (JavaVersion.current().isJava11Compatible()) {
jvmArgs(project.ext.JDK11_OPTIONS)
println("JVM arguments for $project.name are $allJvmArgs")
}

systemProperty "vertxweb.environment", "dev"
systemProperty "vertx.logger-delegate-factory-class-name", "io.vertx.core.logging.SLF4JLogDelegateFactory"
// There is no native lib (JNR) for getting time for testing
systemProperty "com.datastax.driver.USE_NATIVE_CLOCK", "false"

// ordinarily we don't need integration tests
// see the integrationTest task
useJUnitPlatform()
Expand All @@ -219,7 +233,6 @@ test {
}

apply from: "${project.rootDir}/gradle/common/integrationTestTask.gradle"
apply from: "${project.rootDir}/gradle/common/java11Options.gradle"

tasks.register("containerTest", Test) {
if (JavaVersion.current().isJava11Compatible()) {
Expand Down
Loading