diff --git a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py index 9f4babcb..67d88f59 100644 --- a/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py +++ b/product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py @@ -1,24 +1,32 @@ # Databricks notebook source # MAGIC %md # MAGIC -# MAGIC # Implement CDC: Change Data Capture -# MAGIC ## Use-case: Synchronize your SQL Database with your Lakehouse +# MAGIC # CDC Pipeline Demo: Change Data Capture with Serverless Compute +# MAGIC ## Step-by-Step Guide to Building a Cost-Effective CDC Pipeline # MAGIC -# MAGIC Delta Lake is an open-source storage layer with Transactional capabilities and increased Performances. +# MAGIC This demo shows you how to build a **Change Data Capture (CDC)** pipeline using **Databricks Serverless Compute** for cost-effective, auto-scaling data processing. # MAGIC -# MAGIC Delta lake is designed to support CDC workload by providing support for UPDATE / DELETE and MERGE operation. +# MAGIC ### What You'll Learn: +# MAGIC 1. **šŸ„‰ Step 1**: Set up CDC data simulation +# MAGIC 2. **🄈 Step 2**: Build Bronze layer with Auto Loader +# MAGIC 3. **šŸ„‡ Step 3**: Create Silver layer with MERGE operations +# MAGIC 4. **šŸš€ Step 4**: Implement Gold layer with Change Data Feed (CDF) +# MAGIC 5. **šŸ“Š Step 5**: Continuous CDC Data # MAGIC -# MAGIC In addition, Delta table can support CDC to capture internal changes and propagate the changes downstream. # MAGIC -# MAGIC Note that this is a fairly advaned demo. Before going into this content, we recommend you get familiar with Delta Lake `dbdemos.install('delta-lake')`. +# MAGIC ### Key Benefits of Serverless CDC: +# MAGIC - šŸ’° **Cost-effective**: Pay only for compute time used +# MAGIC - šŸš€ **Auto-scaling**: Automatically scales based on workload +# MAGIC - ⚔ **Fast processing**: Optimized for batch processing with `availableNow` triggers +# MAGIC - šŸ”„ **Incremental**: Only processes new/changed data # MAGIC -# MAGIC ## Simplifying CDC with Spark Declarative Pipelines +# MAGIC ### Prerequisites: +# MAGIC - Basic understanding of Delta Lake: `dbdemos.install('delta-lake')` +# MAGIC - Familiarity with Structured Streaming concepts # MAGIC -# MAGIC As you'll see, implementing a CDC pipeline from scratch is slightly advanced. +# MAGIC --- # MAGIC -# MAGIC To simplify these operation & implement a full CDC flow with SQL expression, we strongly advise to use Spark Declarative Pipelines with `APPLY CHANGES`: `dbdemos.install('pipeline-bike')` (including native SCDT2 support) -# MAGIC -# MAGIC As you'll see, `APPLY CHANGES` handles the MERGE INTO + DEDUPLICATION complexity for you. +# MAGIC **šŸ’” Alternative Approach**: For production CDC pipelines, consider using **Delta Live Tables** with `APPLY CHANGES` for simplified implementation: `dbdemos.install('delta-live-table')` # MAGIC # MAGIC # MAGIC @@ -30,92 +38,298 @@ # COMMAND ---------- +# DBTITLE 1,Import Required Functions +from pyspark.sql.functions import current_timestamp, col + +# COMMAND ---------- + +# DBTITLE 1,Configure Schema Evolution for CDC Processing +# Enable automatic schema merging for all Delta operations to handle schema changes +# Schema evolution is handled automatically by mergeSchema=true in writeStream operations +# Schema inference is handled automatically by Auto Loader with cloudFiles.inferColumnTypes=true + +# COMMAND ---------- + # MAGIC %md # MAGIC Delta Lake Change Data Feed # COMMAND ---------- # MAGIC %md -# MAGIC ## CDC flow +# MAGIC ## šŸ“‹ CDC Pipeline Architecture Overview # MAGIC -# MAGIC Here is the flow we'll implement, consuming CDC data from an external database. Note that the incoming could be any format, including message queue such as Kafka. +# MAGIC Here's the complete CDC pipeline we'll build using **Serverless Compute**: # MAGIC -# MAGIC Make all your data ready for BI and ML +# MAGIC CDC Pipeline Architecture +# MAGIC +# MAGIC ### Pipeline Flow: +# MAGIC 1. **šŸ“„ Data Source**: CDC events from external database (simulated) +# MAGIC 2. **šŸ„‰ Bronze Layer**: Raw CDC data ingestion with Auto Loader +# MAGIC 3. **🄈 Silver Layer**: Cleaned, deduplicated data with MERGE operations +# MAGIC 4. **šŸ„‡ Gold Layer**: Business-ready data with Change Data Feed (CDF) +# MAGIC 5. **šŸ“Š Analytics**: Real-time insights and reporting +# MAGIC +# MAGIC **šŸ’” Note**: The incoming data could be any format, including message queues like Kafka. # COMMAND ---------- -# MAGIC %md-sandbox -# MAGIC ## Bronze: Incremental data loading using Auto Loader +# MAGIC %md +# MAGIC ## šŸ„‰ Step 1: Bronze Layer - Raw Data Ingestion # MAGIC -# MAGIC Make all your data ready for BI and ML +# MAGIC Bronze Layer # MAGIC -# MAGIC Working with external system can be challenging due to schema update. The external database can have schema update, adding or modifying columns, and our system must be robust against these changes. +# MAGIC ### What We're Building: +# MAGIC - **Purpose**: Ingest raw CDC data from external sources +# MAGIC - **Technology**: Auto Loader with serverless compute +# MAGIC - **Benefits**: Automatic schema evolution and incremental processing # MAGIC -# MAGIC Databricks Autoloader (`cloudFiles`) handles schema inference and evolution out of the box. +# MAGIC ### Key Features: +# MAGIC - šŸ”„ **Schema Evolution**: Handles database schema changes automatically +# MAGIC - šŸ“ˆ **Incremental Processing**: Only processes new files +# MAGIC - ⚔ **Serverless Scaling**: Auto-scales based on data volume +# MAGIC - šŸ’° **Cost Efficient**: Pay only for processing time # MAGIC -# MAGIC For more details on Auto Loader, run `dbdemos.install('auto-loader')` +# MAGIC **šŸ’” Learn More**: For detailed Auto Loader concepts, run `dbdemos.install('auto-loader')` + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Step 1.1: Explore Incoming CDC Data # COMMAND ---------- -# DBTITLE 1,Let's explore our incoming data. We receive CSV files with client information +print("šŸ” Exploring our incoming CDC data structure...") cdc_raw_data = spark.read.option('header', "true").csv(raw_data_location+'/user_csv') display(cdc_raw_data) # COMMAND ---------- -# DBTITLE 1,Our CDC is sending 3 type of operation: APPEND, DELETE and UPDATE +# MAGIC %md +# MAGIC ## Step 1.2: Understand CDC Operation Types + +# COMMAND ---------- + +print("šŸ” Understanding CDC operation types...") +print("Our CDC system sends 3 types of operations:") display(cdc_raw_data.dropDuplicates(['operation'])) # COMMAND ---------- -# DBTITLE 1,We need to keep the cdc information, however csv isn't a efficient storage. Let's put that in a Delta table instead: +# MAGIC %md +# MAGIC ## Step 1.3: Set Up Continuous CDC Data Simulation +# MAGIC +# MAGIC To demonstrate serverless compute capabilities, we'll create a data generator that simulates incoming CDC events every 60 seconds. +# MAGIC +# MAGIC ### Why This Matters: +# MAGIC - šŸš€ **Auto-scaling**: Shows how serverless scales with workload +# MAGIC - šŸ’° **Cost Efficiency**: Demonstrates `availableNow` trigger benefits +# MAGIC - šŸ”„ **Real-world Simulation**: Mimics continuous CDC scenarios +# MAGIC - šŸ“Š **Monitoring**: Enables table growth visualization + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Step 1.4: CDC Data Generator Implementation + +# COMMAND ---------- + +import threading +import time +import random +from datetime import datetime +import pandas as pd + +# Global variable to control the data generator +generator_running = False + +def generate_cdc_record(operation_type="UPDATE", user_id=None): + """Generate a single CDC record""" + if user_id is None: + user_id = random.randint(1, 1000) + + operations = { + "INSERT": { + "id": user_id, + "name": f"User_{user_id}_{random.randint(1,99)}", + "address": f"Address_{random.randint(1,999)} Street", + "email": f"user{user_id}@company{random.randint(1,10)}.com", + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "INSERT" + }, + "UPDATE": { + "id": user_id, + "name": f"Updated_User_{user_id}", + "address": f"New_Address_{random.randint(1,999)} Avenue", + "email": f"updated.user{user_id}@newcompany{random.randint(1,5)}.com", + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "UPDATE" + }, + "DELETE": { + "id": user_id, + "name": None, + "address": None, + "email": None, + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "DELETE" + } + } + return operations[operation_type] + +def continuous_cdc_generator(): + """Background function that generates CDC data every 120 seconds""" + global generator_running + file_counter = 0 + + while generator_running: + try: + # Generate 3-5 random CDC events + num_events = random.randint(3, 5) + cdc_events = [] + + for _ in range(num_events): + # Random operation type with weighted probability + operation = random.choices( + ["INSERT", "UPDATE", "DELETE"], + weights=[50, 40, 10] # More inserts/updates than deletes + )[0] + cdc_events.append(generate_cdc_record(operation)) + + # Create DataFrame and save as CSV + df = pd.DataFrame(cdc_events) + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"cdc_events_{timestamp}_{file_counter}.csv" + file_path = f"{raw_data_location}/user_csv/{filename}" + + # Convert to Spark DataFrame and save + spark_df = spark.createDataFrame(df) + spark_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(file_path) + + print(f"Generated {num_events} CDC events at {datetime.now()}: {filename}") + file_counter += 1 + + # Wait 60 seconds before next batch + time.sleep(60) + + except Exception as e: + print(f"Error in CDC generator: {e}") + time.sleep(60) # Continue even if there's an error + +def start_cdc_generator(): + """Start the CDC data generator in background""" + global generator_running + if not generator_running: + generator_running = True + generator_thread = threading.Thread(target=continuous_cdc_generator, daemon=True) + generator_thread.start() + print("šŸš€ CDC Data Generator started! New data will arrive every 60 seconds.") + print("šŸ’” This simulates continuous CDC events for serverless processing demonstration.") + return generator_thread + else: + print("CDC Generator is already running!") + return None + +def stop_cdc_generator(): + """Stop the CDC data generator""" + global generator_running + generator_running = False + print("šŸ›‘ CDC Data Generator stopped.") + +# Start the data generator for continuous simulation +data_generator_thread = start_cdc_generator() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## 🄈 Step 2: Create Bronze Delta Table With Auto Loader +# MAGIC + +# COMMAND ---------- + +# Drop existing table if it exists to avoid schema conflicts +try: + spark.sql("DROP TABLE IF EXISTS clients_cdc") + print("šŸ”„ Dropped existing clients_cdc table to avoid schema conflicts") +except: + pass + bronzeDF = (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "csv") - #.option("cloudFiles.maxFilesPerTrigger", "1") #Simulate streaming, remove in production .option("cloudFiles.inferColumnTypes", "true") .option("cloudFiles.schemaLocation", raw_data_location+"/stream/schema_cdc_raw") .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") + .option("cloudFiles.useNotifications", "false") # Optimized for serverless + .option("cloudFiles.includeExistingFiles", "true") # Process all files on first run .load(raw_data_location+'/user_csv')) -(bronzeDF.withColumn("file_name", col("_metadata.file_path")).writeStream +(bronzeDF.withColumn("file_name", col("_metadata.file_path")) + .withColumn("processing_time", current_timestamp()) # Add processing timestamp + .writeStream .option("checkpointLocation", raw_data_location+"/stream/checkpoint_cdc_raw") - .trigger(processingTime='10 seconds') - #.trigger(availableNow=True) --use this trigger on serverless + .option("mergeSchema", "true") # Enable schema evolution + .trigger(availableNow=True) # Serverless trigger for cost-effective processing .table("clients_cdc")) -time.sleep(20) +time.sleep(10) # COMMAND ---------- # MAGIC %sql -# MAGIC -- let's make sure our table has the proper compaction settings to support streaming -# MAGIC ALTER TABLE clients_cdc SET TBLPROPERTIES (delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true); +# MAGIC -- Optimize table properties for serverless streaming and performance +# MAGIC ALTER TABLE clients_cdc SET TBLPROPERTIES ( +# MAGIC delta.autoOptimize.optimizeWrite = true, +# MAGIC delta.autoOptimize.autoCompact = true, +# MAGIC delta.targetFileSize = '128MB', +# MAGIC delta.tuneFileSizesForRewrites = true +# MAGIC ); # MAGIC # MAGIC SELECT * FROM clients_cdc order by id asc ; # COMMAND ---------- -# MAGIC %md-sandbox -# MAGIC ## Silver: Materialize the table +# MAGIC %md +# MAGIC ## 🄈 Step 3: Silver Layer - Data Cleaning and Deduplication # MAGIC -# MAGIC Make all your data ready for BI and ML +# MAGIC Silver Layer # MAGIC -# MAGIC The silver `retail_client_silver` table will contains the most up to date view. It'll be a replicat of the original MYSQL table. +# MAGIC ### What We're Building: +# MAGIC - **Purpose**: Clean, deduplicate, and standardize CDC data +# MAGIC - **Technology**: Delta MERGE operations with serverless compute +# MAGIC - **Benefits**: Idempotent processing and data quality # MAGIC -# MAGIC Because we'll propagate the `MERGE` operations downstream to the `GOLD` layer, we need to enable Delta Lake CDF: `delta.enableChangeDataFeed = true` +# MAGIC ### Key Features: +# MAGIC - šŸ”„ **Idempotent**: Safe to run multiple times +# MAGIC - ⚔ **Serverless**: Auto-scales with data volume +# MAGIC - šŸ’° **Cost Efficient**: Only processes new/changed data +# MAGIC - šŸ“Š **CDF Enabled**: Tracks changes for downstream processing +# MAGIC +# MAGIC **šŸ’” Note**: We enable Change Data Feed (CDF) to track modifications for the Gold layer. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Step 3.1: Create Silver Table With Change Data Feed Enabled # COMMAND ---------- -# DBTITLE 1,We can now create our client table using standard SQL command # MAGIC %sql -# MAGIC -- we can add NOT NULL in our ID field (or even more advanced constraint) +# MAGIC -- Create silver table with optimized settings for serverless and CDC # MAGIC CREATE TABLE IF NOT EXISTS retail_client_silver (id BIGINT NOT NULL, name STRING, address STRING, email STRING, operation STRING) -# MAGIC TBLPROPERTIES (delta.enableChangeDataFeed = true, delta.autoOptimize.optimizeWrite = true, delta.autoOptimize.autoCompact = true); +# MAGIC TBLPROPERTIES ( +# MAGIC delta.enableChangeDataFeed = true, +# MAGIC delta.autoOptimize.optimizeWrite = true, +# MAGIC delta.autoOptimize.autoCompact = true, +# MAGIC delta.targetFileSize = '128MB', +# MAGIC delta.tuneFileSizesForRewrites = true +# MAGIC ); + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Step 3.2: Implement MERGE Operations # COMMAND ---------- -# DBTITLE 1,And run our MERGE statement the upsert the CDC information in our final table #for each batch / incremental update from the raw cdc table, we'll run a MERGE on the silver table def merge_stream(df, i): df.createOrReplaceTempView("clients_cdc_microbatch") @@ -137,8 +351,8 @@ def merge_stream(df, i): .writeStream .foreachBatch(merge_stream) .option("checkpointLocation", raw_data_location+"/stream/checkpoint_clients_cdc") - .trigger(processingTime='10 seconds') - #.trigger(availableNow=True) --use this trigger on serverless + .option("mergeSchema", "true") # Enable schema evolution for silver layer + .trigger(availableNow=True) # Serverless trigger for cost-effective processing .start()) time.sleep(20) @@ -151,12 +365,11 @@ def merge_stream(df, i): # COMMAND ---------- # MAGIC %md -# MAGIC ### Testing the first CDC layer +# MAGIC ### Step 3.3: Test Merge Operations In Silver Layer # MAGIC Let's send a new CDC entry to simulate an update and a DELETE for the ID 1 and 2 # COMMAND ---------- -# DBTITLE 1,Let's UPDATE id=1 and DELETE the row with id=2 # MAGIC %sql # MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values # MAGIC (1000, "Quentin", "Paris 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null), @@ -170,19 +383,28 @@ def merge_stream(df, i): # COMMAND ---------- -# DBTITLE 1,Wait a few seconds for the stream to catch the new entry in the CDC table and check the results in the main table +# DBTITLE 1,🄈 Step 2.5: Verify CDC Processing Results # MAGIC %sql # MAGIC select * from retail_client_silver where id in (1000, 2000); # MAGIC -- Note that ID 1000 has been updated, and ID 2000 is deleted # COMMAND ---------- -# MAGIC %md-sandbox -# MAGIC ## Gold: capture and propagate Silver modifications downstream +# MAGIC %md +# MAGIC ## šŸš€ Step 4: Gold Layer - Business-Ready Data with Change Data Feed # MAGIC -# MAGIC Make all your data ready for BI and ML +# MAGIC Gold Layer # MAGIC -# MAGIC We need to add a final Gold layer based on the data from the Silver table. If a row is DELETED or UPDATED in the SILVER layer, we want to apply the same modification in the GOLD layer. +# MAGIC ### What We're Building: +# MAGIC - **Purpose**: Create business-ready data from Silver layer changes +# MAGIC - **Technology**: Change Data Feed (CDF) with serverless compute +# MAGIC - **Benefits**: Real-time propagation of Silver layer modifications +# MAGIC +# MAGIC ### How It Works: +# MAGIC - šŸ“Š **CDF Tracking**: Monitors all changes in Silver table +# MAGIC - šŸ”„ **Real-time Sync**: Applies DELETEs and UPDATEs to Gold layer +# MAGIC - ⚔ **Serverless**: Auto-scales based on change volume +# MAGIC - šŸ’° **Cost Efficient**: Only processes actual changes # MAGIC # MAGIC To do so, we need to capture all the tables changes from the SILVER layer and incrementally replicate the changes to the GOLD layer. # MAGIC @@ -193,7 +415,23 @@ def merge_stream(df, i): # COMMAND ---------- # MAGIC %md -# MAGIC ### Working with Delta Lake CDF +# MAGIC ### Step 4.1: Understanding Change Data Feed (CDF) vs Non-CDF Processing +# MAGIC +# MAGIC **šŸ” Key Difference**: CDF only processes **actual changes**, while non-CDF processes **all data**. +# MAGIC +# MAGIC #### **Non-CDF Approach (Inefficient)**: +# MAGIC - šŸ“Š **Processes**: Entire table every time +# MAGIC - šŸ’° **Cost**: High - reprocesses unchanged data +# MAGIC - ā±ļø **Time**: Slow - scans all records +# MAGIC - šŸ”„ **Example**: If table has 1M records, processes all 1M even for 1 change +# MAGIC +# MAGIC #### **CDF Approach (Efficient)**: +# MAGIC - šŸ“Š **Processes**: Only changed records +# MAGIC - šŸ’° **Cost**: Low - only pays for actual changes +# MAGIC - ā±ļø **Time**: Fast - processes only deltas +# MAGIC - šŸ”„ **Example**: If table has 1M records but only 5 changed, processes only 5 records +# MAGIC +# MAGIC **šŸ’” CDF Benefits**: Up to 99%+ reduction in processing volume for incremental changes! # COMMAND ---------- @@ -206,7 +444,7 @@ def merge_stream(df, i): # COMMAND ---------- -# MAGIC %md #### Delta CDF table_changes output +# MAGIC %md # MAGIC Table Changes provides back 4 cdc types in the "_change_type" column: # MAGIC # MAGIC | CDC Type | Description | @@ -220,22 +458,63 @@ def merge_stream(df, i): # COMMAND ---------- -# DBTITLE 1,Getting the last modifications with the Python API +# MAGIC %md +# MAGIC ### Step 4.2: Demonstrate CDF vs Non-CDF Processing Volume +# MAGIC +# MAGIC Let's show the actual difference in processing volume between CDF and non-CDF approaches. + +# COMMAND ---------- + from delta.tables import * -#Let's get the last table version to only see the last update mofications +# Let's demonstrate the processing volume difference +print("šŸ” Demonstrating CDF vs Non-CDF Processing Volume") +print("=" * 60) + +# Get total records in silver table +total_silver_records = spark.sql("SELECT COUNT(*) as count FROM retail_client_silver").collect()[0]['count'] +print(f"šŸ“Š Total records in Silver table: {total_silver_records:,}") + +# Get latest table version last_version = str(DeltaTable.forName(spark, "retail_client_silver").history(1).head()["version"]) -print(f"our Delta table last version is {last_version}, let's select the last changes to see our DELETE and UPDATE operations (last 2 versions):") +print(f"šŸ“ˆ Latest table version: {last_version}") +# Show what CDF would process (only changes from last 2 versions) +print(f"\nšŸ”„ CDF Processing (Efficient):") changes = spark.read.format("delta") \ - .option("readChangeData", "true") \ + .option("readChangeFeed", "true") \ .option("startingVersion", int(last_version) -1) \ .table("retail_client_silver") -display(changes) + +cdf_records = changes.count() +print(f" šŸ“Š Records to process: {cdf_records:,}") +print(f" šŸ’° Processing efficiency: {((total_silver_records - cdf_records) / total_silver_records * 100):.1f}% reduction") +print(f" ⚔ Speed improvement: {total_silver_records / max(cdf_records, 1):.1f}x faster") + +# Show what non-CDF would process (entire table) +print(f"\nšŸ”„ Non-CDF Processing (Inefficient):") +print(f" šŸ“Š Records to process: {total_silver_records:,}") +print(f" šŸ’° Processing efficiency: 0% reduction (processes everything)") +print(f" ⚔ Speed improvement: 1x (baseline)") + +print(f"\nšŸ’” Key Insight: CDF processes {cdf_records:,} records instead of {total_silver_records:,} records") +print(f" That's a {((total_silver_records - cdf_records) / total_silver_records * 100):.1f}% reduction in processing volume!") + +# Display the actual changes +print(f"\nšŸ“‹ Actual Changes Detected:") +display(changes.select("_change_type", "id", "name", "email").orderBy("id")) # COMMAND ---------- -# MAGIC %md ### Synchronizing our downstream GOLD table based from the Silver changes +# MAGIC %md +# MAGIC ### Step 4.3: Gold Layer Processing with CDF Efficiency +# MAGIC +# MAGIC Now let's implement the Gold layer using CDF to demonstrate the efficiency gains: +# MAGIC +# MAGIC **šŸŽÆ What We're Building**: Gold layer that only processes **actual changes** from Silver layer +# MAGIC **šŸ“Š Processing Volume**: Only changed records, not entire table +# MAGIC **šŸ’° Cost Impact**: Significant reduction in compute costs +# MAGIC **⚔ Performance**: Much faster processing times # MAGIC # MAGIC Let's now say that we want to perform another table enhancement and propagate these changes downstream. # MAGIC @@ -247,26 +526,48 @@ def merge_stream(df, i): # COMMAND ---------- -# DBTITLE 1,Let's create or final GOLD table: retail_client_gold +# DBTITLE 1,Step 4.4: Create Gold Table with Processing Volume Tracking # MAGIC %sql -# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (id BIGINT NOT NULL, name STRING, address STRING, email STRING, gold_data STRING); +# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (id BIGINT NOT NULL, name STRING, address STRING, email STRING, gold_data STRING) +# MAGIC TBLPROPERTIES ( +# MAGIC delta.autoOptimize.optimizeWrite = true, +# MAGIC delta.autoOptimize.autoCompact = true, +# MAGIC delta.targetFileSize = '128MB', +# MAGIC delta.tuneFileSizesForRewrites = true +# MAGIC ); # COMMAND ---------- from pyspark.sql.window import Window -from pyspark.sql.functions import dense_rank, regexp_replace, lit, col +from pyspark.sql.functions import dense_rank, regexp_replace, lit, col, current_timestamp -#Function to upsert `microBatchOutputDF` into Delta table using MERGE +# Function to upsert `microBatchOutputDF` into Delta table using MERGE +# This function demonstrates CDF efficiency by processing only changed records def upsertToDelta(data, batchId): - #First we need to deduplicate based on the id and take the most recent update + print(f"šŸ”„ Processing batch {batchId} with CDF efficiency...") + + # Count records being processed + records_to_process = data.count() + print(f" šŸ“Š Records in this batch: {records_to_process:,}") + + # First we need to deduplicate based on the id and take the most recent update windowSpec = Window.partitionBy("id").orderBy(col("_commit_version").desc()) - #Select only the first value - #getting the latest change is still needed if the cdc contains multiple time the same id. We can rank over the id and get the most recent _commit_version + # Select only the first value + # getting the latest change is still needed if the cdc contains multiple time the same id. We can rank over the id and get the most recent _commit_version data_deduplicated = data.withColumn("rank", dense_rank().over(windowSpec)).where("rank = 1 and _change_type!='update_preimage'").drop("_commit_version", "rank") - #Add some data cleaning for the gold layer to remove quotes from the address + # Add some data cleaning for the gold layer to remove quotes from the address data_deduplicated = data_deduplicated.withColumn("address", regexp_replace(col("address"), "\"", "")) + # Count deduplicated records + deduplicated_count = data_deduplicated.count() + print(f" šŸ“Š Records after deduplication: {deduplicated_count:,}") + + # Show processing efficiency + if records_to_process > 0: + efficiency = ((records_to_process - deduplicated_count) / records_to_process * 100) + print(f" šŸ’° Deduplication efficiency: {efficiency:.1f}% reduction") + #run the merge in the gold table directly (DeltaTable.forName(spark, "retail_client_gold").alias("target") .merge(data_deduplicated.alias("source"), "source.id = target.id") @@ -275,56 +576,278 @@ def upsertToDelta(data, batchId): .whenNotMatchedInsertAll("source._change_type != 'delete'") .execute()) + print(f" āœ… Batch {batchId} completed - processed {deduplicated_count:,} records efficiently") + + +# Start the CDF stream with processing volume tracking +print("šŸš€ Starting Gold layer CDF stream with processing volume tracking...") +print("šŸ’” This will show you exactly how many records are processed vs. total table size") (spark.readStream - .option("readChangeData", "true") + .option("readChangeFeed", "true") # Updated to use correct option name .option("startingVersion", 1) .table("retail_client_silver") .withColumn("gold_data", lit("Delta CDF is Awesome")) .writeStream .foreachBatch(upsertToDelta) .option("checkpointLocation", raw_data_location+"/stream/checkpoint_clients_gold") - .trigger(processingTime='10 seconds') - #.trigger(availableNow=True) --use this trigger on serverless - .start()) - -time.sleep(20) + .option("mergeSchema", "true") # Enable schema evolution for gold layer + .trigger(availableNow=True) # Serverless trigger for cost-effective processing + .start() + .awaitTermination()) # COMMAND ---------- -# MAGIC %sql SELECT * FROM retail_client_gold +# MAGIC %sql +# MAGIC -- Show the final Gold table results +# MAGIC SELECT * FROM retail_client_gold ORDER BY id; # COMMAND ---------- -# MAGIC %md-sandbox -# MAGIC ### Support for data sharing and Datamesh organization -# MAGIC -# MAGIC -# MAGIC As we've seen during this demo, you can track all the changes (INSERT/UPDATE/DELETE) from any Detlta table using the CDC option. +# MAGIC %md +# MAGIC ### Step 4.5: CDF Processing Volume Summary # MAGIC -# MAGIC It's then easy to subscribe the table modifications as an incremental process. +# MAGIC **šŸŽÆ What We Just Demonstrated**: +# MAGIC - **CDF Processing**: Only processed actual changes from Silver layer +# MAGIC - **Volume Efficiency**: Dramatically reduced processing volume +# MAGIC - **Cost Savings**: Significant reduction in compute costs +# MAGIC - **Performance**: Much faster processing times # MAGIC -# MAGIC This makes the Data Mesh implementation easy: each Mesh can publish a set of tables, and other meshes can subscribe the original changes. +# MAGIC **šŸ“Š Key Metrics**: +# MAGIC - **Total Silver Records**: Shows full table size +# MAGIC - **CDF Records Processed**: Shows only changed records +# MAGIC - **Efficiency Gain**: Percentage reduction in processing volume +# MAGIC - **Speed Improvement**: Multiplier for processing speed # MAGIC -# MAGIC They are then in charge of propagating the changes (ex GDPR DELETE) to their own Data Mesh +# MAGIC **šŸ’” Real-World Impact**: In production, this can mean processing 1,000 records instead of 1,000,000 records for incremental updates! # COMMAND ---------- -# MAGIC %md-sandbox -# MAGIC ## Data is now ready for BI & ML use-case ! +# MAGIC %md +# MAGIC ## šŸ“Š Step 5: Continuous Serverless Incremental Processing # MAGIC -# MAGIC Make all your data ready for BI and ML +# MAGIC With the data generator running, you can now demonstrate continuous serverless CDC processing. The pipeline is designed to process **only newly arrived data** using checkpoints and streaming offsets. +# MAGIC +# MAGIC **Key Incremental Processing Features:** +# MAGIC - āœ… **Auto Loader Checkpoints**: Only new files since last processing +# MAGIC - āœ… **Streaming Offsets**: Only new CDC records since last checkpoint +# MAGIC - āœ… **Change Data Feed**: Only new changes since last processed version +# MAGIC - āœ… **Efficient Processing**: No reprocessing of historical data +# MAGIC - āœ… **Cost Optimization**: Pay only for new data processing + +# COMMAND ---------- + +def trigger_cdc_pipeline(): + """ + Trigger all CDC streams to process new data with serverless compute. + This function can be called periodically (every minute, 5 minutes, etc.) + """ + print(f"šŸ”„ Triggering CDC pipeline at {datetime.now()}") + + # Enable automatic schema merging for MERGE operations + # Schema evolution is handled automatically by mergeSchema=true in writeStream operations + + # Stop any existing streams first + DBDemos.stop_all_streams() + time.sleep(5) + + # Restart bronze layer (Auto Loader) - only process new files since last checkpoint + print(" šŸ”„ Processing new files for bronze layer...") + bronzeDF = (spark.readStream + .format("cloudFiles") + .option("cloudFiles.format", "csv") + .option("cloudFiles.inferColumnTypes", "true") + .option("cloudFiles.schemaLocation", raw_data_location+"/stream/schema_cdc_raw") + .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") + .option("cloudFiles.useNotifications", "false") + .option("cloudFiles.includeExistingFiles", "false") # Only new files after checkpoint + .option("cloudFiles.maxFilesPerTrigger", "10") # Process in batches for efficiency + .load(raw_data_location+'/user_csv')) + + (bronzeDF.withColumn("file_name", col("_metadata.file_path")) + .withColumn("processing_time", current_timestamp()) # Track when processed + .writeStream + .option("checkpointLocation", raw_data_location+"/stream/checkpoint_cdc_raw") + .option("mergeSchema", "true") # Enable schema evolution for new columns + .trigger(availableNow=True) # Process only available new data + .table("clients_cdc") + .awaitTermination()) + + # Restart silver layer (MERGE operations) - only process new CDC records + print(" šŸ”„ Processing new CDC records for silver layer...") + (spark.readStream + .table("clients_cdc") + .writeStream + .foreachBatch(merge_stream) + .option("checkpointLocation", raw_data_location+"/stream/checkpoint_clients_cdc") + .option("mergeSchema", "true") # Enable schema evolution for silver layer + .trigger(availableNow=True) # Process only new CDC records since last checkpoint + .start() + .awaitTermination()) + + # Restart gold layer (CDF processing) - only process new changes since last checkpoint + print(" šŸ”„ Processing new changes for gold layer using Change Data Feed...") + (spark.readStream + .option("readChangeFeed", "true") + # No startingVersion specified - will automatically start from checkpoint + .table("retail_client_silver") + .withColumn("gold_data", lit("Delta CDF is Awesome")) + .withColumn("cdf_processing_time", current_timestamp()) # Track CDF processing time + .writeStream + .foreachBatch(upsertToDelta) + .option("checkpointLocation", raw_data_location+"/stream/checkpoint_clients_gold") + .option("mergeSchema", "true") # Enable schema evolution for gold layer + .trigger(availableNow=True) # Process only new changes since last checkpoint + .start() + .awaitTermination()) + + print("āœ… CDC pipeline completed processing available data") + +# COMMAND ---------- + +print("šŸŽÆ Running one iteration of serverless CDC processing...") +print("šŸ’” In production, schedule this via Databricks Jobs every few minutes") + +# Give the data generator time to create some files +print("ā³ Waiting 65 seconds for data generator to create new files...") +time.sleep(65) + +# Process any new data +trigger_cdc_pipeline() + +# Show results with table growth monitoring +print("\nšŸ“Š Monitoring table growth over time...") +print("šŸ’” Watch how serverless compute handles growing data volumes efficiently") + +# Function to get table sizes +def get_table_sizes(): + sizes = {} + try: + sizes['bronze'] = spark.sql("SELECT COUNT(*) as count FROM clients_cdc").collect()[0]['count'] + except: + sizes['bronze'] = 0 + try: + sizes['silver'] = spark.sql("SELECT COUNT(*) as count FROM retail_client_silver").collect()[0]['count'] + except: + sizes['silver'] = 0 + try: + sizes['gold'] = spark.sql("SELECT COUNT(*) as count FROM retail_client_gold").collect()[0]['count'] + except: + sizes['gold'] = 0 + return sizes + +# Monitor table growth over multiple iterations +print("šŸ” Table Size Growth Monitoring:") +print("=" * 60) + +for iteration in range(1, 4): # Monitor 3 iterations + print(f"\nšŸ“ˆ Iteration {iteration} - {datetime.now().strftime('%H:%M:%S')}") + + # Get current sizes + sizes = get_table_sizes() + print(f"šŸ„‰ Bronze (Raw CDC): {sizes['bronze']:,} records") + print(f"🄈 Silver (Materialized): {sizes['silver']:,} records") + print(f"šŸ„‡ Gold (Enhanced): {sizes['gold']:,} records") + + # Calculate growth if not first iteration + if iteration > 1: + growth_bronze = sizes['bronze'] - previous_sizes['bronze'] + growth_silver = sizes['silver'] - previous_sizes['silver'] + growth_gold = sizes['gold'] - previous_sizes['gold'] + + print(f" šŸ“Š Growth: Bronze +{growth_bronze}, Silver +{growth_silver}, Gold +{growth_gold}") + + # Show recent records + print(" šŸ” Latest Records:") + try: + latest_bronze = spark.sql(""" + SELECT operation, COUNT(*) as count + FROM clients_cdc + GROUP BY operation + ORDER BY operation + """).collect() + operations_summary = {row['operation']: row['count'] for row in latest_bronze} + print(f" šŸ“ Operations: {operations_summary}") + + # Show latest silver records + latest_silver = spark.sql(""" + SELECT id, name, email + FROM retail_client_silver + ORDER BY id DESC + LIMIT 3 + """).collect() + if latest_silver: + print(" šŸ“ Latest Silver Records:") + for row in latest_silver: + print(f" ID: {row['id']}, Name: {row['name']}, Email: {row['email']}") + except Exception as e: + print(f" āš ļø Error showing details: {e}") + + previous_sizes = sizes + + # Wait for next iteration (except on last one) + if iteration < 3: + print(f" ā³ Waiting 65 seconds for more CDC data...") + print(" šŸ’° Serverless compute: No costs during wait time!") + time.sleep(65) + + # Process new data + print(f" šŸ”„ Processing new data (Iteration {iteration + 1})...") + trigger_cdc_pipeline() + +print("\n" + "=" * 60) +print("āœ… Table growth monitoring completed!") +print("šŸ“ˆ Key Observations:") +print(" šŸ”¹ Tables grow incrementally with each CDC batch") +print(" šŸ”¹ Serverless compute scales automatically with data volume") +print(" šŸ”¹ Cost efficiency: Pay only during processing, not waiting") +print(" šŸ”¹ Real-time CDC processing with delta architecture") + +# COMMAND ---------- + +# DBTITLE 1,Cleanup and Stop Data Generator +stop_cdc_generator() +DBDemos.stop_all_streams() + +print("šŸŽ‰ Demo completed! You've seen how serverless compute handles continuous CDC processing:") +print("āœ… Cost-effective: Pay only for actual processing time") +print("āœ… Auto-scaling: Automatically scales based on data volume") +print("āœ… Simplified ops: No cluster management required") +print("āœ… Reliable: Built-in fault tolerance and automatic restarts") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Data Ready for BI & ML Use Cases # MAGIC -# MAGIC We now have our final table, updated based on the initial CDC information we receive. +# MAGIC BI and ML Ready # MAGIC -# MAGIC As next step, we can leverage Databricks Lakehouse platform to start creating SQL queries / dashboards or ML models +# MAGIC ### What's Available: +# MAGIC - šŸ“Š **Business Intelligence**: Create SQL queries and dashboards +# MAGIC - šŸ¤– **Machine Learning**: Build ML models on clean, up-to-date data +# MAGIC - šŸ”„ **Real-time Analytics**: Access to latest data changes +# MAGIC - šŸ“ˆ **Data Quality**: Clean, deduplicated, and validated data # COMMAND ---------- # MAGIC %md -# MAGIC Next step: [Implement a CDC pipeline for multiple tables]($./02-CDC-CDF-full-multi-tables) +# MAGIC ## Data Sharing and Datamesh Organization +# MAGIC +# MAGIC +# MAGIC +# MAGIC ### Key Benefits: +# MAGIC - šŸ”„ **Change Tracking**: Track all INSERT/UPDATE/DELETE operations from any Delta table +# MAGIC - šŸ“” **Incremental Processing**: Subscribe to table modifications as incremental processes +# MAGIC - šŸ—ļø **Data Mesh Ready**: Each mesh can publish tables, others can subscribe to changes +# MAGIC - šŸ›”ļø **GDPR Compliance**: Propagate changes (e.g., GDPR DELETE) across data meshes # COMMAND ---------- -# DBTITLE 1,Make sure we stop all actives streams -DBDemos.stop_all_streams() +# MAGIC %md +# MAGIC ## Next Steps +# MAGIC +# MAGIC ### Continue Your CDC Journey: +# MAGIC - šŸ”— **[Multi-Table CDC Pipeline]($./02-CDC-CDF-full-multi-tables)**: Scale to multiple tables +# MAGIC - šŸ—ļø **[Delta Live Tables]($./dlt-cdc)**: Simplified CDC with `APPLY CHANGES` +# MAGIC - šŸ“š **[Delta Lake Demo]($./delta-lake)**: Deep dive into Delta Lake features +# MAGIC - šŸš€ **[Auto Loader Demo]($./auto-loader)**: Advanced file ingestion patterns \ No newline at end of file diff --git a/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py b/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py index ef81014f..8a3b4229 100644 --- a/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py +++ b/product_demos/cdc-pipeline/02-CDC-CDF-full-multi-tables.py @@ -1,20 +1,34 @@ # Databricks notebook source # MAGIC %md # MAGIC -# MAGIC # Full demo: Change Data Capture on multiple tables -# MAGIC ## Use-case: Synchronize all your ELT tables with your Lakehouse +# MAGIC # Multi-Table CDC Pipeline Demo: Change Data Capture with Serverless Compute +# MAGIC ## Step-by-Step Guide to Building a Scalable Multi-Table CDC Pipeline # MAGIC -# MAGIC We previously saw how to synchronize a single table. However, real use-case typically includes multiple tables that we need to ingest and synch. +# MAGIC This demo shows you how to build a **multi-table Change Data Capture (CDC)** pipeline using **Databricks Serverless Compute** for cost-effective, auto-scaling data processing. # MAGIC -# MAGIC These tables are stored on different folder having the following layout: +# MAGIC ### What You'll Learn: +# MAGIC 1. **šŸ”„ Step 1**: Set up multi-table CDC data simulation +# MAGIC 2. **šŸ„‰ Step 2**: Build parallel Bronze layers with Auto Loader +# MAGIC 3. **🄈 Step 3**: Create parallel Silver layers with MERGE operations +# MAGIC 4. **šŸš€ Step 4**: Implement Gold layer with Change Data Feed (CDF) +# MAGIC 5. **šŸ“Š Step 5**: Test Continuous multi-table CDC Data processing # MAGIC -# MAGIC # MAGIC -# MAGIC **A note on Spark Declarative Pipelines**:
-# MAGIC *Spark Declarative Pipelines has been designed to simplify this process and handle concurrent execution properly, without having you to start multiple stream in parallel.*
-# MAGIC *We strongly advise to have a look at the SDP CDC demo to simplify such pipeline implementation: `dbdemos.instal('dlt-cdc')`* # MAGIC -# MAGIC In this notebook, we'll see how this can be done using Python & standard streaming APIs (without SDP). +# MAGIC ### Key Benefits of Serverless Multi-Table CDC: +# MAGIC - šŸ’° **Cost-effective**: Pay only for compute time used across all tables +# MAGIC - šŸš€ **Auto-scaling**: Automatically scales based on total workload +# MAGIC - ⚔ **Parallel Processing**: Process multiple tables simultaneously +# MAGIC - šŸ”„ **Incremental**: Only processes new/changed data per table +# MAGIC - šŸ“Š **Monitoring**: Track processing across all tables +# MAGIC +# MAGIC ### Prerequisites: +# MAGIC - Completed the single-table CDC demo: `01-CDC-CDF-simple-pipeline.py` +# MAGIC - Understanding of parallel processing concepts +# MAGIC +# MAGIC --- +# MAGIC +# MAGIC **šŸ’” Alternative Approach**: For production multi-table CDC pipelines, consider using **Delta Live Tables** with `APPLY CHANGES` for simplified implementation: `dbdemos.install('dlt-cdc')` # MAGIC # MAGIC # MAGIC @@ -25,82 +39,346 @@ # COMMAND ---------- +# DBTITLE 1,Import Required Functions +from pyspark.sql.functions import current_timestamp, col + +# COMMAND ---------- + # MAGIC %md -# MAGIC ## Running the streams in parallel +# MAGIC ## šŸ“‹ Multi-Table CDC Pipeline Architecture Overview +# MAGIC +# MAGIC Here's the complete multi-table CDC pipeline we'll build using **Serverless Compute**: # MAGIC -# MAGIC Each table will be save as a distinct table, using a distinct Spark Structured Streaming strem. +# MAGIC # MAGIC -# MAGIC To implement an efficient pipeline, we should process multiple streams at the same time. To do that, we'll use a ThreadPoolExecutor and start multiple thread, each of them processing and waiting for a stream. +# MAGIC ### Pipeline Flow: +# MAGIC 1. **šŸ“„ Data Sources**: Multiple CDC streams from different tables +# MAGIC 2. **šŸ„‰ Bronze Layers**: Parallel raw data ingestion with Auto Loader +# MAGIC 3. **🄈 Silver Layers**: Parallel data cleaning and deduplication +# MAGIC 4. **šŸ“Š Analytics**: Real-time insights across all tables # MAGIC -# MAGIC We're using Trigger Once to refresh all the tables once and then shutdown the cluster, typically every hour. For lower latencies we can keep the streams running (depending of the number of tables & cluster size), or keep the Trigger Once but loop forever. +# MAGIC ### Key Serverless Benefits: +# MAGIC - šŸ’° **Cost Efficiency**: Pay only for actual compute time used +# MAGIC - šŸš€ **Auto-scaling**: Serverless automatically scales resources based on workload +# MAGIC - ⚔ **Parallel Processing**: Process multiple tables simultaneously +# MAGIC - šŸ”„ **Batch Processing**: Process all available data efficiently without continuous resource usage # MAGIC -# MAGIC *Note that for a real workload the exact number of streams depends of the total number of tables, table sizes and cluster size. We can also use several clusters to split the load if required* +# MAGIC **šŸ’” Note**: For scheduled processing (e.g., hourly), trigger this notebook via Databricks Jobs or Workflows. # COMMAND ---------- # MAGIC %md -# MAGIC ## Schema evolution -# MAGIC -# MAGIC By organizing the raw incoming cdc files with 1 folder by table, we can easily iterate over the folders and pickup any new tables without modification. -# MAGIC -# MAGIC Schema evolution will be handled my the Autoloader and Delta `mergeSchema` option at the bronze layer. Schema evolution for MERGE (Silver Layer) are supported using `spark.databricks.delta.schema.autoMerge.enabled` -# MAGIC -# MAGIC Using these options, we'll be able to capture new tables and table schema evolution without having to change our code. +# MAGIC ## šŸ”„ Step 1: Set up multi-table CDC data simulation # MAGIC -# MAGIC *Note: that autoloader will trigger an error in a stream if a schema change happens, and will automatically recover during the next run. See Autoloader demo for a complete example* -# MAGIC -# MAGIC *Note: another common pattern is to redirect all the CDC events to a single message queue (the table name being a message attribute), and then dispatch the message in different Silver Tables* # COMMAND ---------- -# DBTITLE 1,Let's explore our raw cdc data. We have 2 tables we want to sync (transactions and users) +# MAGIC %md +# MAGIC ### Step 1.1: Explore Multi-Table CDC Data Structure + +# COMMAND ---------- + +print("šŸ” Exploring our multi-table CDC data structure...") +print("We have 2 tables we want to sync: transactions and users") base_folder = f"{raw_data_location}/cdc" display(dbutils.fs.ls(base_folder)) # COMMAND ---------- -# MAGIC %md ## Silver and bronze transformations +# MAGIC %md +# MAGIC ### Step 1.2: Set Up Data Simulation +# MAGIC +# MAGIC To demonstrate serverless processing of multiple CDC streams simultaneously, we'll create data generators for multiple tables that simulate incoming CDC events every 60 seconds. +# MAGIC +# MAGIC ### Why This Matters: +# MAGIC - šŸš€ **Parallel Processing**: Shows how serverless handles multiple streams simultaneously +# MAGIC - šŸ’° **Cost Efficiency**: Demonstrates auto-scaling for varying workloads +# MAGIC - šŸ”„ **Real-world Simulation**: Mimics multi-table CDC scenarios +# MAGIC - šŸ“Š **Monitoring**: Enables cross-table processing visualization + +# COMMAND ---------- + +# DBTITLE 1,šŸŽÆ Step 1.2: Multi-Table CDC Data Generator Implementation +import threading +import time +import random +from datetime import datetime +import pandas as pd + +# Global variable to control the data generators +generators_running = False + +def generate_user_cdc_record(operation_type="UPDATE", user_id=None): + """Generate a single user CDC record""" + if user_id is None: + user_id = random.randint(1, 500) + + operations = { + "INSERT": { + "id": user_id, + "name": f"user_{user_id}_{random.randint(1,99)}", + "email": f"user{user_id}@company{random.randint(1,10)}.com", + "address": f"Address_{random.randint(1,999)} Street", + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "INSERT" + }, + "UPDATE": { + "id": user_id, + "name": f"updated_user_{user_id}", + "email": f"updated.user{user_id}@newcompany{random.randint(1,5)}.com", + "address": f"Updated_Address_{random.randint(1,999)} Avenue", + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "UPDATE" + }, + "DELETE": { + "id": user_id, + "name": None, + "email": None, + "address": None, + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "DELETE" + } + } + return operations[operation_type] + +def generate_transaction_cdc_record(operation_type="INSERT", transaction_id=None): + """Generate a single transaction CDC record""" + if transaction_id is None: + transaction_id = random.randint(1000, 9999) + + user_id = random.randint(1, 500) # Reference to users table + + operations = { + "INSERT": { + "id": transaction_id, + "user_id": user_id, + "amount": round(random.uniform(10.0, 1000.0), 2), + "currency": random.choice(["USD", "EUR", "GBP"]), + "transaction_type": random.choice(["purchase", "refund", "transfer"]), + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "INSERT" + }, + "UPDATE": { + "id": transaction_id, + "user_id": user_id, + "amount": round(random.uniform(10.0, 1000.0), 2), + "currency": random.choice(["USD", "EUR", "GBP"]), + "transaction_type": random.choice(["purchase", "refund", "transfer", "adjustment"]), + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "UPDATE" + }, + "DELETE": { + "id": transaction_id, + "user_id": None, + "amount": None, + "currency": None, + "transaction_type": None, + "operation_date": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "operation": "DELETE" + } + } + return operations[operation_type] + +def continuous_multi_table_generator(): + """Background function that generates CDC data for multiple tables every 60 seconds""" + global generators_running + file_counter = 0 + + while generators_running: + try: + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # Generate user CDC events + user_events = [] + num_user_events = random.randint(2, 4) + for _ in range(num_user_events): + operation = random.choices( + ["INSERT", "UPDATE", "DELETE"], + weights=[40, 50, 10] + )[0] + user_events.append(generate_user_cdc_record(operation)) + + # Generate transaction CDC events + transaction_events = [] + num_transaction_events = random.randint(3, 6) + for _ in range(num_transaction_events): + operation = random.choices( + ["INSERT", "UPDATE", "DELETE"], + weights=[70, 25, 5] # More inserts for transactions + )[0] + transaction_events.append(generate_transaction_cdc_record(operation)) + + # Save user events + user_df = pd.DataFrame(user_events) + user_filename = f"users_cdc_{timestamp}_{file_counter}.csv" + user_file_path = f"{base_folder}/users/{user_filename}" + + spark_user_df = spark.createDataFrame(user_df) + spark_user_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(user_file_path) + + # Save transaction events + transaction_df = pd.DataFrame(transaction_events) + transaction_filename = f"transactions_cdc_{timestamp}_{file_counter}.csv" + transaction_file_path = f"{base_folder}/transactions/{transaction_filename}" + + spark_transaction_df = spark.createDataFrame(transaction_df) + spark_transaction_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(transaction_file_path) + + print(f"Generated CDC events at {datetime.now()}:") + print(f" šŸ“ Users: {num_user_events} events -> {user_filename}") + print(f" šŸ“ Transactions: {num_transaction_events} events -> {transaction_filename}") + + file_counter += 1 + + # Wait 60 seconds before next batch + time.sleep(60) + + except Exception as e: + print(f"Error in multi-table CDC generator: {e}") + time.sleep(60) + +def start_multi_table_generators(): + """Start the multi-table CDC data generators in background""" + global generators_running + if not generators_running: + generators_running = True + generator_thread = threading.Thread(target=continuous_multi_table_generator, daemon=True) + generator_thread.start() + print("šŸš€ Multi-Table CDC Data Generators started!") + print("šŸ“Š Users and Transactions CDC events will arrive every 60 seconds.") + print("šŸ’” This simulates continuous multi-table CDC for serverless processing demo.") + return generator_thread + else: + print("Multi-Table CDC Generators are already running!") + return None + +def stop_multi_table_generators(): + """Stop the multi-table CDC data generators""" + global generators_running + generators_running = False + print("šŸ›‘ Multi-Table CDC Data Generators stopped.") + +# Start the data generators for continuous multi-table simulation +print("Starting multi-table CDC simulation...") +multi_table_generator_thread = start_multi_table_generators() + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## **šŸ„‰ Step 2**: Build parallel Bronze layers with Auto Loader # COMMAND ---------- -# DBTITLE 1,let's reset all checkpoints +# DBTITLE 1,šŸ„‰ Step 2.1: Reset Checkpoints dbutils.fs.rm(f"{raw_data_location}/cdc_full", True) # COMMAND ---------- -# DBTITLE 1,Bronze ingestion with autoloader +# DBTITLE 1,šŸ„‰ Step 2.2: Bronze Ingestion with Auto Loader -#Stream using the autoloader to ingest raw files and load them in a delta table +# Stream using Auto Loader to ingest raw files and load them into Delta tables with serverless compute def update_bronze_layer(path, bronze_table): - print(f"ingesting RAW cdc data for {bronze_table} and building bronze layer...") + print(f"Ingesting RAW CDC data for {bronze_table} and building bronze layer with serverless...") + + # Drop existing table if it exists to avoid schema conflicts + try: + spark.sql(f"DROP TABLE IF EXISTS {bronze_table}") + print(f"šŸ”„ Dropped existing {bronze_table} table to avoid schema conflicts") + except: + pass + (spark.readStream .format("cloudFiles") .option("cloudFiles.format", "csv") .option("cloudFiles.schemaLocation", f"{raw_data_location}/cdc_full/schemas/{bronze_table}") .option("cloudFiles.schemaHints", "id bigint, operation_date timestamp") .option("cloudFiles.inferColumnTypes", "true") + .option("cloudFiles.useNotifications", "false") # Optimized for serverless + .option("cloudFiles.includeExistingFiles", "false") # Only new files after checkpoint + .option("cloudFiles.maxFilesPerTrigger", "10") # Process in batches for efficiency .load(path) .withColumn("file_name", col("_metadata.file_path")) + .withColumn("processing_time", current_timestamp()) # Track when processed .writeStream .option("checkpointLocation", f"{raw_data_location}/cdc_full/checkpoints/{bronze_table}") - .option("mergeSchema", "true") - #.trigger(processingTime='10 seconds') - .trigger(availableNow=True) + .option("mergeSchema", "true") # Enable schema evolution for new columns + .trigger(availableNow=True) # Process only new data since last checkpoint .table(bronze_table).awaitTermination()) # COMMAND ---------- -# DBTITLE 1,Silver step: materialize tables with MERGE based on CDC events -#Stream incrementally loading new data from the bronze CDC table and merging them in the Silver table +# MAGIC %md +# MAGIC ## **🄈 Step 3**: Create parallel Silver layers with MERGE operations + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### 3.1 Understanding CDF vs Non-CDF Processing in Multi-Table Scenarios +# MAGIC +# MAGIC **šŸ” Key Difference**: CDF only processes **actual changes** per table, while non-CDF processes **all data** across all tables. +# MAGIC +# MAGIC #### **Non-CDF Multi-Table Approach (Inefficient)**: +# MAGIC - šŸ“Š **Processes**: Entire tables every time +# MAGIC - šŸ’° **Cost**: Very High - reprocesses unchanged data across all tables +# MAGIC - ā±ļø **Time**: Slow - scans all records in all tables +# MAGIC - šŸ”„ **Example**: If you have 5 tables with 1M records each, processes all 5M even for 1 change in 1 table +# MAGIC +# MAGIC #### **CDF Multi-Table Approach (Efficient)**: +# MAGIC - šŸ“Š **Processes**: Only changed records per table +# MAGIC - šŸ’° **Cost**: Low - only pays for actual changes per table +# MAGIC - ā±ļø **Time**: Fast - processes only deltas per table +# MAGIC - šŸ”„ **Example**: If you have 5 tables with 1M records each but only 1 table has 5 changes, processes only 5 records +# MAGIC +# MAGIC **šŸ’” Multi-Table CDF Benefits**: Up to 99.9%+ reduction in processing volume for incremental changes across multiple tables! +# MAGIC +# MAGIC ### 3.2 Silver Layer with MERGE Operations +# MAGIC + +# COMMAND ---------- + +# Stream incrementally loading new data from the bronze CDC table and merging them in the Silver table +# This function demonstrates CDF efficiency by processing only changed records per table def update_silver_layer(bronze_table, silver_table): - print(f"ingesting {bronze_table} update and materializing silver layer using a MERGE statement...") - #First create the silver table if it doesn't exists: + print(f"šŸ”„ Processing {bronze_table} updates with CDF efficiency...") + + # Get total records in bronze table to show processing volume + try: + total_bronze_records = spark.sql(f"SELECT COUNT(*) as count FROM {bronze_table}").collect()[0]['count'] + print(f" šŸ“Š Total records in {bronze_table}: {total_bronze_records:,}") + except: + total_bronze_records = 0 + print(f" šŸ“Š Total records in {bronze_table}: {total_bronze_records:,}") + + # First create the silver table if it doesn't exist with optimized properties: if not spark.catalog.tableExists(silver_table): - print(f"Table {silver_table} doesn't exist, creating it using the same schema as the bronze one...") + print(f" šŸ—ļø Creating {silver_table} with optimized properties...") + # Create table with sample schema and then optimize properties spark.read.table(bronze_table).drop("operation", "operation_date", "_rescued_data", "file_name").write.saveAsTable(silver_table) + # Add optimized properties for serverless and performance + spark.sql(f""" + ALTER TABLE {silver_table} SET TBLPROPERTIES ( + delta.enableChangeDataFeed = true, + delta.autoOptimize.optimizeWrite = true, + delta.autoOptimize.autoCompact = true, + delta.targetFileSize = '128MB', + delta.tuneFileSizesForRewrites = true + ) + """) + + # Process only new records since last checkpoint (CDF efficiency) + print(f" šŸ”„ Processing only new records from {bronze_table}...") #for each batch / incremental update from the raw cdc table, we'll run a MERGE on the silver table def merge_stream(updates, i): + records_in_batch = updates.count() + print(f" šŸ“Š Batch {i}: Processing {records_in_batch:,} records") + + if records_in_batch > 0 and total_bronze_records > 0: + # Show processing efficiency + efficiency = ((total_bronze_records - records_in_batch) / total_bronze_records * 100) + print(f" šŸ’° Processing efficiency: {efficiency:.1f}% reduction vs full table scan") + print(f" ⚔ Speed improvement: {total_bronze_records / max(records_in_batch, 1):.1f}x faster") + #First we need to deduplicate based on the id and take the most recent update windowSpec = Window.partitionBy("id").orderBy(col("operation_date").desc()) #Select only the first value @@ -116,18 +394,38 @@ def merge_stream(updates, i): .whenNotMatchedInsert("updates.operation != 'DELETE'", values=columns_to_update) \ .execute() + print(f" āœ… Batch {i} completed - processed {records_in_batch:,} records efficiently") + + print(f"šŸš€ Starting {silver_table} processing with CDF efficiency...") (spark.readStream .table(bronze_table) .writeStream .foreachBatch(merge_stream) .option("checkpointLocation", f"{raw_data_location}/cdc_full/checkpoints/{silver_table}") - #.trigger(processingTime='10 seconds') - .trigger(availableNow=True) + .option("mergeSchema", "true") # Enable schema evolution for silver layer + .trigger(availableNow=True) # Process only new data since last checkpoint .start().awaitTermination()) # COMMAND ---------- -# MAGIC %md ## Starting all the streams +# MAGIC %md +# MAGIC ### 3.3 Multi-Table CDF Processing Volume Summary +# MAGIC +# MAGIC **šŸŽÆ What We Just Demonstrated**: +# MAGIC - **CDF Processing**: Only processed actual changes per table +# MAGIC - **Volume Efficiency**: Dramatically reduced processing volume across multiple tables +# MAGIC - **Cost Savings**: Significant reduction in compute costs per table +# MAGIC - **Performance**: Much faster processing times per table +# MAGIC +# MAGIC **šŸ“Š Key Metrics Per Table**: +# MAGIC - **Total Bronze Records**: Shows full table size per table +# MAGIC - **CDF Records Processed**: Shows only changed records per table +# MAGIC - **Efficiency Gain**: Percentage reduction in processing volume per table +# MAGIC - **Speed Improvement**: Multiplier for processing speed per table +# MAGIC +# MAGIC **šŸ’” Multi-Table Impact**: In production, this can mean processing 1,000 records across 5 tables instead of 5,000,000 records for incremental updates! +# MAGIC +# MAGIC ### 3.4 Starting all the streams # MAGIC # MAGIC We can now iterate over the folders to start the bronze & silver streams for each table. @@ -138,28 +436,54 @@ def merge_stream(updates, i): from delta.tables import * def refresh_cdc_table(table): + """ + Process a single CDC table using serverless compute. + Updates both bronze and silver layers with optimized settings. + """ try: - #update the bronze table + # Update the bronze table using Auto Loader with serverless optimization bronze_table = f'bronze_{table}' + print(f"Processing table: {table} -> {bronze_table}") update_bronze_layer(f"{base_folder}/{table}", bronze_table) - #then refresh the silver layer + # Then refresh the silver layer with MERGE operations silver_table = f'silver_{table}' + print(f"Materializing silver table: {silver_table}") update_silver_layer(bronze_table, silver_table) + + print(f"Successfully processed table: {table}") except Exception as e: - #prod workload should properly process errors - print(f"couldn't properly process {bronze_table}") - raise e + # Production workloads should implement comprehensive error handling + error_msg = f"Failed to process table {table}: {str(e)}" + print(error_msg) + # In production, consider: + # - Logging to external monitoring systems + # - Sending alerts/notifications + # - Continuing with other tables vs stopping entire pipeline + raise Exception(error_msg) from e -#Enable Schema evolution during merges (to capture new columns) -#spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true") +# Schema evolution is handled automatically by: +# - Auto Loader with mergeSchema=true option +# - Delta table mergeSchema=true in writeStream operations +# - No additional configuration needed for modern Databricks Runtime -#iterate over all the tables folders +# Iterate over all table folders and process them in parallel using serverless compute tables = [table_path.name[:-1] for table_path in dbutils.fs.ls(base_folder)] -#Let's start 3 CDC flow at the same time in 3 different thread to speed up ingestion -with ThreadPoolExecutor(max_workers=3) as executor: +print(f"Found {len(tables)} tables to process: {tables}") + +# Process multiple CDC flows simultaneously using ThreadPoolExecutor +# Serverless compute automatically scales resources based on workload +max_parallel_tables = min(len(tables), 3) # Adjust based on your data volume and processing requirements +print(f"Processing {max_parallel_tables} tables in parallel with serverless compute...") + +with ThreadPoolExecutor(max_workers=max_parallel_tables) as executor: deque(executor.map(refresh_cdc_table, tables)) - print(f"Database refreshed!") + print(f"Successfully refreshed all {len(tables)} tables using serverless compute!") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### 3.3 Check the Resulting Silver Tables # COMMAND ---------- @@ -178,15 +502,280 @@ def refresh_cdc_table(table): # MAGIC %md # MAGIC ## What's next # MAGIC -# MAGIC All our silver tables are now materialized using the CDC events! We can then work extra transformation (gold layer) based on your business requirement. +# MAGIC All our silver tables are now materialized using CDC events with **Serverless Compute**! You can now build additional transformations (gold layer) based on your business requirements. +# MAGIC +# MAGIC ### Production readiness with Serverless +# MAGIC +# MAGIC **Error Handling Strategies**: +# MAGIC - Capture and handle exceptions in each stream properly +# MAGIC - Send notifications when a table encounters errors while continuing to process others +# MAGIC - Define table processing priorities and dependencies +# MAGIC - Use Databricks Jobs/Workflows for orchestration and monitoring +# MAGIC +# MAGIC **Serverless Production Benefits**: +# MAGIC - **Cost Optimization**: Pay only for actual processing time +# MAGIC - **Auto-scaling**: Automatically scales based on data volume +# MAGIC - **Reliability**: Built-in fault tolerance and automatic restarts +# MAGIC - **Monitoring**: Integrated with Databricks monitoring and alerting +# MAGIC +# MAGIC **Scheduling Options**: +# MAGIC - Use Databricks Jobs to schedule this notebook regularly (hourly, daily) +# MAGIC - Trigger via external orchestration tools (Apache Airflow, etc.) +# MAGIC - Event-driven execution using file arrival notifications +# MAGIC +# MAGIC ### Delta Live Tables +# MAGIC To simplify these operations & error handling even further, we strongly recommend running your CDC pipelines using Delta Live Tables: `dbdemos.install('delta-live-table')` +# MAGIC +# MAGIC DLT provides native CDC support with `APPLY CHANGES` and automatic error handling, monitoring, and scaling. + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## 5. Test Continuous Multi-Table Serverless CDC Processing # MAGIC -# MAGIC ### Production readiness -# MAGIC Error and exception in each stream should be properly captured. Multiple strategy exist: send a notification when a table has some error and continue processing the others, stop the entire job, define table "priorities" etc. +# MAGIC With multiple data generators running, we can demonstrate how serverless compute handles continuous multi-table CDC processing efficiently and cost-effectively. The pipeline processes **only newly arrived data** across all tables. # MAGIC -# MAGIC ### Spark Declarative Pipelines -# MAGIC To simplify these operations & error handling, we strongly advise you to run your CDC pipelines on top of Spark Declarative Pipelines: `dbdemos.install('pipeline-bike')` +# MAGIC **Multi-Table Incremental Processing:** +# MAGIC - āœ… **Per-Table Checkpoints**: Each table tracks its own processing progress +# MAGIC - āœ… **Parallel Incremental Processing**: Multiple tables process only new data simultaneously +# MAGIC - āœ… **Independent Scaling**: Each table scales based on its own data volume +# MAGIC - āœ… **No Cross-Table Reprocessing**: Changes in one table don't affect others +# MAGIC - āœ… **Efficient Resource Usage**: Pay only for actual new data processing # COMMAND ---------- -# DBTITLE 1,Make sure we stop all actives streams +# DBTITLE 1,šŸš€ Step 4.1: Multi-Table Pipeline Trigger Function +def trigger_multi_table_cdc_pipeline(): + """ + Trigger all multi-table CDC streams to process new data with serverless compute. + This processes all tables in parallel for maximum efficiency. + """ + print(f"šŸ”„ Triggering multi-table CDC pipeline at {datetime.now()}") + + # Enable automatic schema merging for MERGE operations across all tables + # Schema evolution is handled automatically by mergeSchema=true in writeStream operations + + # Get all table folders + tables = [table_path.name[:-1] for table_path in dbutils.fs.ls(base_folder)] + print(f"šŸ“Š Processing {len(tables)} tables: {tables}") + + # Process all tables in parallel using ThreadPoolExecutor + max_parallel_tables = min(len(tables), 3) + print(f"⚔ Processing {max_parallel_tables} tables in parallel with serverless compute...") + + start_time = datetime.now() + + with ThreadPoolExecutor(max_workers=max_parallel_tables) as executor: + deque(executor.map(refresh_cdc_table, tables)) + + end_time = datetime.now() + processing_time = (end_time - start_time).total_seconds() + + print(f"āœ… Multi-table CDC pipeline completed in {processing_time:.2f} seconds") + return processing_time + +# COMMAND ---------- + +# DBTITLE 1,šŸš€ Step 4: Complete Multi-Table CDC Pipeline Demo +print("šŸŽÆ Running multi-table serverless CDC processing demonstration...") +print("šŸ’” In production, schedule this via Databricks Jobs/Workflows") + +# Give generators time to create files for both tables +print("ā³ Waiting 65 seconds for multi-table data generators to create new files...") +time.sleep(65) + +# Process all tables and measure performance +start_time = datetime.now() +processing_time = trigger_multi_table_cdc_pipeline() +total_time = (datetime.now() - start_time).total_seconds() + +print(f"\nšŸ“ˆ Performance Metrics:") +print(f"šŸ”¹ Total processing time: {total_time:.2f} seconds") +print(f"šŸ”¹ Parallel execution efficiency: {(processing_time/total_time)*100:.1f}%") + +# Show results with multi-table growth monitoring +print("\nšŸ“Š Monitoring multi-table growth over time...") +print("šŸ’” Watch how serverless compute handles growing data across multiple tables") + +# Function to get all table sizes +def get_all_table_sizes(): + sizes = {} + tables = ["users", "transactions"] + + for table in tables: + bronze_table = f"bronze_{table}" + silver_table = f"silver_{table}" + + try: + sizes[f"{table}_bronze"] = spark.sql(f"SELECT COUNT(*) as count FROM {bronze_table}").collect()[0]['count'] + except: + sizes[f"{table}_bronze"] = 0 + + try: + sizes[f"{table}_silver"] = spark.sql(f"SELECT COUNT(*) as count FROM {silver_table}").collect()[0]['count'] + except: + sizes[f"{table}_silver"] = 0 + + return sizes + +# Monitor multi-table growth over multiple iterations +print("šŸ” Multi-Table Growth Monitoring:") +print("=" * 80) + +for iteration in range(1, 4): # Monitor 3 iterations + print(f"\nšŸ“ˆ Iteration {iteration} - {datetime.now().strftime('%H:%M:%S')}") + + # Get current sizes + sizes = get_all_table_sizes() + + print("šŸ„‰ Bronze Tables (Raw CDC):") + print(f" šŸ‘„ Users: {sizes['users_bronze']:,} records") + print(f" šŸ’³ Transactions: {sizes['transactions_bronze']:,} records") + print(f" šŸ“Š Total Bronze: {sizes['users_bronze'] + sizes['transactions_bronze']:,} records") + + print("🄈 Silver Tables (Materialized):") + print(f" šŸ‘„ Users: {sizes['users_silver']:,} records") + print(f" šŸ’³ Transactions: {sizes['transactions_silver']:,} records") + print(f" šŸ“Š Total Silver: {sizes['users_silver'] + sizes['transactions_silver']:,} records") + + # Calculate growth if not first iteration + if iteration > 1: + users_bronze_growth = sizes['users_bronze'] - previous_sizes['users_bronze'] + users_silver_growth = sizes['users_silver'] - previous_sizes['users_silver'] + transactions_bronze_growth = sizes['transactions_bronze'] - previous_sizes['transactions_bronze'] + transactions_silver_growth = sizes['transactions_silver'] - previous_sizes['transactions_silver'] + + print(" šŸ“Š Growth Since Last Check:") + print(f" šŸ‘„ Users: Bronze +{users_bronze_growth}, Silver +{users_silver_growth}") + print(f" šŸ’³ Transactions: Bronze +{transactions_bronze_growth}, Silver +{transactions_silver_growth}") + + total_growth = (users_bronze_growth + users_silver_growth + + transactions_bronze_growth + transactions_silver_growth) + print(f" šŸŽÆ Total Growth: +{total_growth} records across all tables") + + # Show recent activity details + print(" šŸ” Recent Activity:") + try: + # Users operations + users_ops = spark.sql(""" + SELECT operation, COUNT(*) as count + FROM bronze_users + GROUP BY operation + ORDER BY operation + """).collect() + users_summary = {row['operation']: row['count'] for row in users_ops} + print(f" šŸ‘„ Users Operations: {users_summary}") + + # Transactions operations + trans_ops = spark.sql(""" + SELECT operation, COUNT(*) as count + FROM bronze_transactions + GROUP BY operation + ORDER BY operation + """).collect() + trans_summary = {row['operation']: row['count'] for row in trans_ops} + print(f" šŸ’³ Transactions Operations: {trans_summary}") + + # Show latest silver records + print(" šŸ“ Latest Records:") + latest_users = spark.sql(""" + SELECT id, name, email + FROM silver_users + ORDER BY id DESC + LIMIT 2 + """).collect() + if latest_users: + print(" šŸ‘„ Latest Users:") + for row in latest_users: + print(f" ID: {row['id']}, User: {row['name']}, Email: {row['email']}") + + latest_transactions = spark.sql(""" + SELECT id, amount, item_count + FROM silver_transactions + ORDER BY id DESC + LIMIT 2 + """).collect() + if latest_transactions: + print(" šŸ’³ Latest Transactions:") + for row in latest_transactions: + print(f" ID: {row['id']}, Amount: {row['amount']}, Items: {row['item_count']}") + + except Exception as e: + print(f" āš ļø Error showing details: {e}") + + previous_sizes = sizes + + # Wait for next iteration (except on last one) + if iteration < 3: + print(f" ā³ Waiting 65 seconds for more multi-table CDC data...") + print(" šŸ’° Serverless compute: Zero cost during wait - only pay for processing!") + time.sleep(65) + + # Process new data across all tables + print(f" šŸ”„ Processing new multi-table data (Iteration {iteration + 1})...") + trigger_multi_table_cdc_pipeline() + +print("\n" + "=" * 80) +print("āœ… Multi-table growth monitoring completed!") +print("šŸ“ˆ Key Multi-Table Observations:") +print(" šŸ”¹ Multiple tables grow independently with different patterns") +print(" šŸ”¹ Serverless compute scales automatically across all tables") +print(" šŸ”¹ Parallel processing efficiency demonstrated") +print(" šŸ”¹ Cost optimization: Pay only for actual multi-table processing") +print(" šŸ”¹ Real-world enterprise CDC patterns with table relationships") + +# COMMAND ---------- + +# DBTITLE 1,šŸ“Š Step 5.1: Cleanup and Stop Generators +stop_multi_table_generators() DBDemos.stop_all_streams() + +print("šŸŽ‰ Multi-table CDC demo completed!") +print("\nšŸ’° Serverless Benefits Demonstrated:") +print("āœ… Cost Optimization: Pay only for actual processing time") +print("āœ… Auto-scaling: Handled varying workloads across multiple tables") +print("āœ… Parallel Processing: Efficiently processed multiple CDC streams") +print("āœ… Zero Infrastructure: No cluster management required") +print("āœ… Fault Tolerance: Built-in error handling and recovery") + +print(f"\nšŸš€ Ready for production:") +print("• Schedule via Databricks Jobs/Workflows") +print("• Set up monitoring and alerting") +print("• Configure auto-scaling policies") +print("• Implement error handling strategies") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Key Advantages: +# MAGIC - šŸ”„ **Parallel Processing**: Multiple tables processed simultaneously +# MAGIC - šŸ“Š **Scalable Architecture**: Easy to add new tables to the pipeline +# MAGIC - šŸ’° **Cost Efficient**: Pay only for actual processing across all tables +# MAGIC - šŸš€ **Auto-scaling**: Serverless handles varying workloads automatically +# MAGIC - šŸ›”ļø **Fault Tolerance**: Isolated processing prevents cross-table failures + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Deployment Options: +# MAGIC - šŸ“… **Scheduled Jobs**: Use Databricks Jobs for automated processing +# MAGIC - šŸ”„ **Workflows**: Orchestrate complex multi-table pipelines +# MAGIC - šŸ“Š **Monitoring**: Set up alerts and dashboards for all tables +# MAGIC - šŸ”’ **Security**: Implement proper access controls and data governance +# MAGIC - šŸ’° **Cost Optimization**: Monitor and optimize serverless compute usage + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## šŸ“Š Step 8: Next Steps +# MAGIC +# MAGIC ### Continue Your CDC Journey: +# MAGIC - šŸ—ļø **[Delta Live Tables]($./dlt-cdc)**: Simplified multi-table CDC with `APPLY CHANGES` +# MAGIC - šŸ“š **[Delta Lake Demo]($./delta-lake)**: Deep dive into Delta Lake features +# MAGIC - šŸš€ **[Auto Loader Demo]($./auto-loader)**: Advanced file ingestion patterns +# MAGIC +# MAGIC ### Advanced Patterns: +# MAGIC - šŸ”„ **Cross-Table Dependencies**: Handle table relationships and dependencies +# MAGIC - šŸ“Š **Data Quality**: Implement validation and quality checks +# MAGIC - šŸ›”ļø **Error Handling**: Advanced retry and recovery strategies +# MAGIC - šŸ“ˆ **Performance Tuning**: Optimize for large-scale multi-table processing \ No newline at end of file