|
| 1 | +--- |
| 2 | +title: Stream Data from Postgres with Debezium |
| 3 | +description: Learn how to adapt the Debezium CDC template to stream data from your PostgreSQL database to ClickHouse. |
| 4 | +--- |
| 5 | + |
| 6 | +# Stream Data from Postgres with Debezium |
| 7 | + |
| 8 | +This guide shows you how to use the **Debezium CDC Template** with your own application. You will learn how to connect the pipeline to your PostgreSQL database and send your tables to ClickHouse for real-time analytics. |
| 9 | + |
| 10 | +## Architecture Overview |
| 11 | + |
| 12 | +At a high level, the pipeline works like this: |
| 13 | +```txt |
| 14 | +PostgreSQL -> Kafka -> ClickHouse |
| 15 | +``` |
| 16 | + |
| 17 | +* **Debezium** acts as the bridge between PostgreSQL and Kafka. It watches for changes in your database and publishes them to Kafka topics. |
| 18 | +* **MooseStack** acts as the bridge between Kafka and ClickHouse. It serves as your "pipeline-as-code" layer where you define your ClickHouse tables, your Kafka streams, and the transformation logic that connects them. |
| 19 | + |
| 20 | +This template uses two Kafka topics for each table: one for the raw data and one for the clean, processed data. The data flow is as follows: |
| 21 | + |
| 22 | +1. Change happens in PostgreSQL |
| 23 | +2. Debezium publishes the change to Kafka (auto-creating a topic for each table) |
| 24 | +3. Raw events are consumed from each Debezium-managed topic and transformed into a format that can be stored in ClickHouse |
| 25 | +4. The transformed data is published to a second Moose Stream (the sink stream) |
| 26 | +5. Data from the sink stream is synced into your ClickHouse table |
| 27 | +6. Rows are deduplicated and versioned in the background in ClickHouse |
| 28 | + |
| 29 | +## Project Structure |
| 30 | + |
| 31 | +Here are the key files in the template you should know about: |
| 32 | + |
| 33 | +``` |
| 34 | +cdc-pipeline/ |
| 35 | +├── 1-sources/ # Defines Kafka topics from Debezium |
| 36 | +├── 2-transforms/ # Sanitizes CDC events & maps to destination |
| 37 | +├── 3-destinations/ # Defines ClickHouse tables & streams |
| 38 | +docker-compose.dev.override.yaml # Infrastructure (Kafka Connect, Redpanda) |
| 39 | +setup-cdc.ts # Script that registers the connector |
| 40 | +moose.config.toml # Project config (enables streaming) |
| 41 | +``` |
| 42 | + |
| 43 | +## Step 0: Clone the Template |
| 44 | + |
| 45 | +Make sure you clone the [Debezium CDC Template](https://github.com/514labs/debezium-cdc) and install the dependencies: |
| 46 | + |
| 47 | +```bash |
| 48 | +git clone https://github.com/514labs/debezium-cdc.git |
| 49 | +cd debezium-cdc |
| 50 | +pnpm install |
| 51 | +``` |
| 52 | + |
| 53 | +## Step 1: Configure Your Environment |
| 54 | + |
| 55 | +The template uses environment variables for database passwords and connector settings. |
| 56 | + |
| 57 | +1. Copy the `.env.example` file: |
| 58 | + |
| 59 | + ```bash |
| 60 | + cp .env.example .env.dev |
| 61 | + ``` |
| 62 | + |
| 63 | +2. Open `.env.dev` and customize the values for your environment. |
| 64 | + |
| 65 | + **Database Connection:** |
| 66 | + Set these to point to your source PostgreSQL database. |
| 67 | + ```properties |
| 68 | + DB_HOST=your_postgres_host |
| 69 | + DB_PORT=your_postgres_port |
| 70 | + DB_NAME=your_postgres_db |
| 71 | + DB_USER=your_postgres_user |
| 72 | + DB_PASSWORD=your_postgres_password |
| 73 | + ``` |
| 74 | + |
| 75 | + **CDC Configuration:** |
| 76 | + Choose which tables you want to capture. |
| 77 | + ```properties |
| 78 | + # List of tables to capture (schema.table), separated by commas |
| 79 | + CDC_TABLE_INCLUDE_LIST=public.* |
| 80 | + |
| 81 | + # Prefix for the Kafka topics (default: pg-cdc) |
| 82 | + CDC_TOPIC_PREFIX=pg-cdc |
| 83 | + ``` |
| 84 | + |
| 85 | +## Step 2: Prepare Your Database |
| 86 | + |
| 87 | +Debezium needs PostgreSQL's logical replication to work. |
| 88 | + |
| 89 | +1. **Check `wal_level`**: |
| 90 | + Run this SQL command on your source database: |
| 91 | + ```sql |
| 92 | + SHOW wal_level; |
| 93 | + ``` |
| 94 | + It must be `logical`. If not, update your `postgresql.conf` and restart Postgres. |
| 95 | + |
| 96 | +2. **Create a Replication User**: |
| 97 | + It is best to use a separate user for this. Run these commands: |
| 98 | + ```sql |
| 99 | + CREATE USER cdc_user WITH PASSWORD 'secure_password'; |
| 100 | + ALTER USER cdc_user WITH REPLICATION; |
| 101 | + GRANT USAGE ON SCHEMA public TO cdc_user; |
| 102 | + GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user; |
| 103 | + ``` |
| 104 | + (Update your `.env.dev` file with this user's details). |
| 105 | + |
| 106 | +## Step 3: Start the Pipeline |
| 107 | + |
| 108 | +Start the development environment. The Moose CLI will start the infrastructure and run a script to register the Debezium connector. |
| 109 | + |
| 110 | +```bash |
| 111 | +moose dev |
| 112 | +``` |
| 113 | + |
| 114 | +Check the logs for these messages: |
| 115 | +- Infrastructure starting (Redpanda, Kafka Connect, ClickHouse). |
| 116 | +- `setup-cdc.ts` running. |
| 117 | +- `✅ Connector registered!` |
| 118 | + |
| 119 | +Note: Moose does not manage Debezium or Kafka Connect by default. However, this template uses `docker-compose.dev.override.yaml` to add them. The example file starts Kafka Connect and includes a test database. If you want to use your own database, comment out the test database in that file and update `.env.dev`. See [Local Development](/moose/local-dev) for more details. |
| 120 | + |
| 121 | +## Step 4: Customize the Pipelines for Your Application |
| 122 | + |
| 123 | +The template comes set up for the provided test database. Follow these steps to change it for your own tables. |
| 124 | + |
| 125 | +> **Note:** These examples use the `customer_addresses` table from the template. Replace `CustomerAddress` with the names of your own tables (like `Users` or `Orders`). |
| 126 | +
|
| 127 | +### 1. Import the Topics |
| 128 | +When the connector runs and a change happens, Debezium automatically creates a topic in Redpanda if it hasn't seen an event for that table before. Since Debezium manages these topics, you need to import their definitions into your project: |
| 129 | + |
| 130 | +```bash |
| 131 | +# Pulls topic definitions into cdc-pipeline/1-sources/externalTopics.ts |
| 132 | +moose-cli kafka pull localhost:19092 --path cdc-pipeline/1-sources |
| 133 | +``` |
| 134 | + |
| 135 | +### 2. Define Source Schemas |
| 136 | +Moose imports the raw data streams as generic objects without types. You need to define what your data looks like so you when you transform the raw events you have complete type safety. |
| 137 | + |
| 138 | +#### Option A: Using your ORM Models (Recommended) |
| 139 | +If you already use an ORM like Drizzle, you can reuse your existing models. |
| 140 | + |
| 141 | +The template uses Drizzle, and the models are in `postgres/src/schema.ts`. You can export the inferred type in `cdc-pipeline/oltp/schema.ts`: |
| 142 | + |
| 143 | +```typescript |
| 144 | +import { customerAddresses } from "../../postgres/src/schema"; |
| 145 | + |
| 146 | +// Automatically infers: { id: number, first_name: string, ... } |
| 147 | +export type CustomerAddress = typeof customerAddresses.$inferSelect; |
| 148 | +``` |
| 149 | + |
| 150 | +Then, in your pipeline code, import the type and apply it to your stream: |
| 151 | +```typescript |
| 152 | +import { CustomerAddress } from "../../oltp/schema"; |
| 153 | + |
| 154 | +export const cdcCustomerAddresses = PgCdcPublicCustomerAddressesStream as Stream< |
| 155 | + GenericCDCEvent<CustomerAddress> |
| 156 | +>; |
| 157 | +``` |
| 158 | + |
| 159 | +#### Option B: Using Generation Tools |
| 160 | +If you don't use an ORM, tools like [kanel](https://github.com/kristiandupont/kanel) or `pg-to-ts` can generate TypeScript interfaces from your database for you. |
| 161 | + |
| 162 | +```bash |
| 163 | +# Example with kanel |
| 164 | +npx kanel --connectionString $DATABASE_URL --output ./cdc-pipeline/generated-models |
| 165 | +``` |
| 166 | + |
| 167 | +### 3. Model the Incoming Data (Create a Typed Topic) |
| 168 | +This step models the raw data coming from Debezium. These events are complex objects that contain metadata, the "before" state of the row, and the "after" state. |
| 169 | + |
| 170 | +`GenericCDCEvent<T>` (in `cdc-pipeline/models.ts`) matches this structure. By wrapping the raw topic with this type, your code knows exactly what the data looks like. |
| 171 | + |
| 172 | +```typescript |
| 173 | +export type GenericCDCEvent<T> = { |
| 174 | + before: T | null; // The row before the change (null for inserts) |
| 175 | + after: T | null; // The row after the change (null for deletes) |
| 176 | + source: { // Debezium metadata |
| 177 | + lsn: number; // Log Sequence Number (for ordering) |
| 178 | + ts_ms: number; // Timestamp of the change |
| 179 | + table: string; |
| 180 | + }; |
| 181 | + op: "c" | "u" | "d" | "r"; // Create, Update, Delete, Read |
| 182 | + ts_ms: number; |
| 183 | +}; |
| 184 | +``` |
| 185 | + |
| 186 | +Update `cdc-pipeline/1-sources/typed-topics.ts` to export the typed stream. |
| 187 | + |
| 188 | +**Example:** |
| 189 | + |
| 190 | +```typescript |
| 191 | +import { Stream } from "@514labs/moose-lib"; |
| 192 | +import { PgCdcPublicCustomerAddressesStream } from "./externalTopics"; // Generated by kafka pull |
| 193 | +import { GenericCDCEvent } from "../models"; |
| 194 | +import { CustomerAddress } from "../../oltp/schema"; |
| 195 | + |
| 196 | +export const cdcCustomerAddresses = PgCdcPublicCustomerAddressesStream as Stream< |
| 197 | + GenericCDCEvent<CustomerAddress> |
| 198 | +>; |
| 199 | +``` |
| 200 | + |
| 201 | +<details> |
| 202 | +<summary>✨ **Suggested Copilot Prompt**</summary> |
| 203 | + |
| 204 | +You can use this prompt to tell your AI assistant to generate the typed topics for all your tables at once. Open `cdc-pipeline/1-sources/typed-topics.ts` and ask: |
| 205 | + |
| 206 | +> "Import all the raw stream classes from `./externalTopics.ts` and all the OLTP types from `../../oltp/schema.ts`. For each table, export a new const named `cdc<TableName>` that casts the raw stream to `Stream<GenericCDCEvent<TableName>>`. Follow the pattern of the existing exports." |
| 207 | +
|
| 208 | +</details> |
| 209 | + |
| 210 | +### 4. Model the Destination Data (Flatten the Payload) |
| 211 | +This step models the clean data that goes into ClickHouse. |
| 212 | + |
| 213 | +While the incoming data is nested (Step 3), the destination table should look just like your Postgres table. You need to "flatten" the structure so that `after.id` becomes just `id` in ClickHouse. |
| 214 | + |
| 215 | +You also need to add a few fields (`_is_deleted`, `lsn`, `ts_ms`) to handle updates and deletes correctly. |
| 216 | + |
| 217 | +Update `cdc-pipeline/3-destinations/olap-tables.ts`: |
| 218 | + |
| 219 | +```typescript |
| 220 | +import { OlapTable, ClickHouseEngines, UInt64, UInt8 } from "@514labs/moose-lib"; |
| 221 | +import { CustomerAddress } from "../../oltp/schema"; |
| 222 | + |
| 223 | +// 1. Define the OLAP Schema |
| 224 | +// Take the fields from Postgres and add metadata |
| 225 | +export type CdcFields = { |
| 226 | + _is_deleted: UInt8; |
| 227 | + ts_ms: UInt64; |
| 228 | + lsn: UInt64; |
| 229 | +}; |
| 230 | + |
| 231 | +export type OlapCustomerAddress = CustomerAddress & CdcFields; |
| 232 | + |
| 233 | +// 2. Define the ClickHouse Table |
| 234 | +export const olapCustomerAddresses = new OlapTable<OlapCustomerAddress>( |
| 235 | + "customer_addresses", |
| 236 | + { |
| 237 | + engine: ClickHouseEngines.ReplacingMergeTree, |
| 238 | + ver: "lsn", |
| 239 | + isDeleted: "_is_deleted", |
| 240 | + orderByFields: ["id"], |
| 241 | + } |
| 242 | +); |
| 243 | +``` |
| 244 | + |
| 245 | +You also need a sink stream. This acts as a buffer between your transformation and the final table. |
| 246 | + |
| 247 | +Update `cdc-pipeline/3-destinations/sink-topics.ts`: |
| 248 | + |
| 249 | +```typescript |
| 250 | +import { Stream } from "@514labs/moose-lib"; |
| 251 | +import { OlapCustomerAddress } from "../models"; |
| 252 | +import { olapCustomerAddresses } from "./olap-tables"; |
| 253 | + |
| 254 | +// 3. Define the Destination Stream (The "Processed" Topic) |
| 255 | +export const processedCustomerAddresses = new Stream<OlapCustomerAddress>( |
| 256 | + "ProcessedCustomerAddresses", |
| 257 | + { destination: olapCustomerAddresses } |
| 258 | +); |
| 259 | +``` |
| 260 | + |
| 261 | +### 5. Create the Transform |
| 262 | +Write the function that maps the Source Stream to the Sink Stream. It cleans the data and converts types where needed. |
| 263 | + |
| 264 | +Create `cdc-pipeline/2-transforms/customer-addresses.ts`: |
| 265 | + |
| 266 | +```typescript |
| 267 | +import { cdcCustomerAddresses } from "../1-sources/typed-topics"; |
| 268 | +import { processedCustomerAddresses } from "../3-destinations/sink-topics"; |
| 269 | +import { handleCDCPayload } from "./payload-handler"; // Helper from the template |
| 270 | +import { GenericCDCEvent, OlapCustomerAddress } from "../models"; |
| 271 | +import { CustomerAddress } from "../../oltp/schema"; |
| 272 | + |
| 273 | +// Connect Source Stream -> Destination Stream |
| 274 | +cdcCustomerAddresses.addTransform( |
| 275 | + processedCustomerAddresses, |
| 276 | + (message: GenericCDCEvent<CustomerAddress>) => { |
| 277 | + // Use the helper function to clean the payload |
| 278 | + const result = handleCDCPayload<CustomerAddress>(message); |
| 279 | + |
| 280 | + // Return the clean data |
| 281 | + return result as unknown as OlapCustomerAddress; |
| 282 | + } |
| 283 | +); |
| 284 | +``` |
| 285 | + |
| 286 | +The `handleCDCPayload` function is a helper included in the template. It handles the logic for cleaning the data and managing deletes. You pass it the type of your source row, and it handles the rest. |
| 287 | + |
| 288 | +## Verification |
| 289 | + |
| 290 | +The pipeline is running! Any change in your Postgres `customer_addresses` table will instantly appear in ClickHouse. |
| 291 | + |
| 292 | +Check it by querying ClickHouse with the Moose CLI: |
| 293 | + |
| 294 | +```bash |
| 295 | +moose query "SELECT * FROM customer_addresses" |
| 296 | +``` |
| 297 | + |
| 298 | +## Advanced: Optimizing for ClickHouse |
| 299 | + |
| 300 | +The setup above uses your Postgres types directly. To make your analytics faster and cheaper, you should optimize your ClickHouse schema. |
| 301 | + |
| 302 | +This involves using special column types like: |
| 303 | +* **LowCardinality**: For columns with a finite number (10,000 or less) of unique values (e.g. countries, states, etc.). |
| 304 | +* **UInt64**: For IDs and timestamps. |
| 305 | +* **ClickHouseDefault**: To handle empty (null) values efficiently. |
| 306 | + |
| 307 | +Here is a preview of what an optimized schema looks like: |
| 308 | + |
| 309 | +```typescript |
| 310 | +export type OlapCustomerAddress = Omit< |
| 311 | + CustomerAddress, |
| 312 | + "id" | "country" | "state" | "work_address" |
| 313 | +> & |
| 314 | + CdcFields & { |
| 315 | + // Optimized types |
| 316 | + id: UInt64; |
| 317 | + country: string & LowCardinality; |
| 318 | + state: string & LowCardinality; |
| 319 | + work_address: string & ClickHouseDefault<"''">; |
| 320 | + }; |
| 321 | +``` |
| 322 | + |
| 323 | +For a full guide on how to optimize your tables, see [Optimizing ClickHouse Schemas](/guides/clickhouse-optimization). |
| 324 | + |
| 325 | +## Next Steps: Transitioning to Production |
0 commit comments