Skip to content

Commit f708929

Browse files
committed
Complete demo storyline restructuring with numbered stepsv2
1 parent 5206326 commit f708929

File tree

2 files changed

+97
-157
lines changed

2 files changed

+97
-157
lines changed

product_demos/cdc-pipeline/01-CDC-CDF-simple-pipeline.py

Lines changed: 65 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -11,20 +11,8 @@
1111
# MAGIC 2. **🥈 Step 2**: Build Bronze layer with Auto Loader
1212
# MAGIC 3. **🥇 Step 3**: Create Silver layer with MERGE operations
1313
# MAGIC 4. **🚀 Step 4**: Implement Gold layer with Change Data Feed (CDF)
14-
# MAGIC 5. **📊 Step 5**: Monitor and optimize with serverless compute
15-
# MAGIC 6. **📊 Step 6**: Data sharing and Datamesh organization
16-
# MAGIC 7. **📊 Step 7**: Data ready for BI & ML use cases
17-
# MAGIC 8. **📊 Step 8**: Next steps and production deployment
18-
# MAGIC
19-
# MAGIC ### Progress Tracking:
20-
# MAGIC - ✅ **Step 1**: CDC data simulation setup
21-
# MAGIC - ⏳ **Step 2**: Bronze layer implementation
22-
# MAGIC - ⏳ **Step 3**: Silver layer implementation
23-
# MAGIC - ⏳ **Step 4**: Gold layer implementation
24-
# MAGIC - ⏳ **Step 5**: Monitoring and optimization
25-
# MAGIC - ⏳ **Step 6**: Data sharing and Datamesh
26-
# MAGIC - ⏳ **Step 7**: BI & ML readiness
27-
# MAGIC - ⏳ **Step 8**: Next steps and deployment
14+
# MAGIC 5. **📊 Step 5**: Continuous CDC Data
15+
# MAGIC
2816
# MAGIC
2917
# MAGIC ### Key Benefits of Serverless CDC:
3018
# MAGIC - 💰 **Cost-effective**: Pay only for compute time used
@@ -105,22 +93,30 @@
10593

10694
# COMMAND ----------
10795

108-
# DBTITLE 1,📊 Step 1.1: Explore Incoming CDC Data
96+
# MAGIC %md
97+
# MAGIC ## Step 1.1: Explore Incoming CDC Data
98+
99+
# COMMAND ----------
100+
109101
print("🔍 Exploring our incoming CDC data structure...")
110102
cdc_raw_data = spark.read.option('header', "true").csv(raw_data_location+'/user_csv')
111103
display(cdc_raw_data)
112104

113105
# COMMAND ----------
114106

115-
# DBTITLE 1,📊 Step 1.2: Understand CDC Operation Types
107+
# MAGIC %md
108+
# MAGIC ## Step 1.2: Understand CDC Operation Types
109+
110+
# COMMAND ----------
111+
116112
print("🔍 Understanding CDC operation types...")
117113
print("Our CDC system sends 3 types of operations:")
118114
display(cdc_raw_data.dropDuplicates(['operation']))
119115

120116
# COMMAND ----------
121117

122118
# MAGIC %md
123-
# MAGIC ## 🎯 Step 1.3: Set Up Continuous CDC Data Simulation
119+
# MAGIC ## Step 1.3: Set Up Continuous CDC Data Simulation
124120
# MAGIC
125121
# MAGIC To demonstrate serverless compute capabilities, we'll create a data generator that simulates incoming CDC events every 60 seconds.
126122
# MAGIC
@@ -132,7 +128,11 @@
132128

133129
# COMMAND ----------
134130

135-
# DBTITLE 1,🎯 Step 1.3: CDC Data Generator Implementation
131+
# MAGIC %md
132+
# MAGIC ## Step 1.4: CDC Data Generator Implementation
133+
134+
# COMMAND ----------
135+
136136
import threading
137137
import time
138138
import random
@@ -239,7 +239,12 @@ def stop_cdc_generator():
239239

240240
# COMMAND ----------
241241

242-
# DBTITLE 1,🥉 Step 1.4: Create Bronze Delta Table
242+
# MAGIC %md
243+
# MAGIC ## 🥈 Step 2: Create Bronze Delta Table With Auto Loader
244+
# MAGIC
245+
246+
# COMMAND ----------
247+
243248
# Drop existing table if it exists to avoid schema conflicts
244249
try:
245250
spark.sql("DROP TABLE IF EXISTS clients_cdc")
@@ -265,7 +270,7 @@ def stop_cdc_generator():
265270
.trigger(availableNow=True) # Serverless trigger for cost-effective processing
266271
.table("clients_cdc"))
267272

268-
time.sleep(20)
273+
time.sleep(10)
269274

270275
# COMMAND ----------
271276

@@ -283,7 +288,7 @@ def stop_cdc_generator():
283288
# COMMAND ----------
284289

285290
# MAGIC %md
286-
# MAGIC ## 🥈 Step 2: Silver Layer - Data Cleaning and Deduplication
291+
# MAGIC ## 🥈 Step 3: Silver Layer - Data Cleaning and Deduplication
287292
# MAGIC
288293
# MAGIC <img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/product/Delta-Lake-CDC-CDF/cdc-flow-2.png" alt='Silver Layer' style='float: right' width='600'/>
289294
# MAGIC
@@ -302,8 +307,12 @@ def stop_cdc_generator():
302307

303308
# COMMAND ----------
304309

305-
# DBTITLE 1,🥈 Step 2.1: Create Silver Table with CDF Enabled
306-
# MAGIC %sql
310+
# MAGIC %md
311+
# MAGIC ## Step 3.1: Create Silver Table With Change Data Feed Enabled
312+
313+
# COMMAND ----------
314+
315+
# MAGIC %sql
307316
# MAGIC -- Create silver table with optimized settings for serverless and CDC
308317
# MAGIC CREATE TABLE IF NOT EXISTS retail_client_silver (id BIGINT NOT NULL, name STRING, address STRING, email STRING, operation STRING)
309318
# MAGIC TBLPROPERTIES (
@@ -316,7 +325,11 @@ def stop_cdc_generator():
316325

317326
# COMMAND ----------
318327

319-
# DBTITLE 1,🥈 Step 2.2: Implement MERGE Operations
328+
# MAGIC %md
329+
# MAGIC ## Step 3.2: Implement MERGE Operations
330+
331+
# COMMAND ----------
332+
320333
#for each batch / incremental update from the raw cdc table, we'll run a MERGE on the silver table
321334
def merge_stream(df, i):
322335
df.createOrReplaceTempView("clients_cdc_microbatch")
@@ -352,12 +365,11 @@ def merge_stream(df, i):
352365
# COMMAND ----------
353366

354367
# MAGIC %md
355-
# MAGIC ### 🥈 Step 2.3: Test CDC Layer
368+
# MAGIC ### Step 3.3: Test Merge Operations In Silver Layer
356369
# MAGIC Let's send a new CDC entry to simulate an update and a DELETE for the ID 1 and 2
357370

358371
# COMMAND ----------
359372

360-
# DBTITLE 1,🥈 Step 2.4: Simulate CDC Operations
361373
# MAGIC %sql
362374
# MAGIC insert into clients_cdc (id, name, address, email, operation_date, operation, _rescued_data, file_name) values
363375
# MAGIC (1000, "Quentin", "Paris 75020", "quentin.ambard@databricks.com", now(), "UPDATE", null, null),
@@ -379,7 +391,7 @@ def merge_stream(df, i):
379391
# COMMAND ----------
380392

381393
# MAGIC %md
382-
# MAGIC ## 🥇 Step 3: Gold Layer - Business-Ready Data with Change Data Feed
394+
# MAGIC ## 🚀 Step 4: Gold Layer - Business-Ready Data with Change Data Feed
383395
# MAGIC
384396
# MAGIC <img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/product/Delta-Lake-CDC-CDF/cdc-flow-3.png" alt='Gold Layer' style='float: right' width='600'/>
385397
# MAGIC
@@ -403,7 +415,7 @@ def merge_stream(df, i):
403415
# COMMAND ----------
404416

405417
# MAGIC %md
406-
# MAGIC ### 🥇 Step 3.1: Working with Delta Lake CDF
418+
# MAGIC ### Step 4.1: Working with Delta Lake CDF
407419

408420
# COMMAND ----------
409421

@@ -417,7 +429,6 @@ def merge_stream(df, i):
417429
# COMMAND ----------
418430

419431
# MAGIC %md
420-
# MAGIC #### 🥇 Step 3.2: Delta CDF table_changes Output
421432
# MAGIC Table Changes provides back 4 cdc types in the "_change_type" column:
422433
# MAGIC
423434
# MAGIC | CDC Type | Description |
@@ -431,7 +442,11 @@ def merge_stream(df, i):
431442

432443
# COMMAND ----------
433444

434-
# DBTITLE 1,🥇 Step 3.3: Get Modifications with Python API
445+
# MAGIC %md
446+
# MAGIC ## Step 4.2: Get The Latest Records Updates with Python API
447+
448+
# COMMAND ----------
449+
435450
from delta.tables import *
436451

437452
#Let's get the last table version to only see the last update mofications
@@ -447,7 +462,7 @@ def merge_stream(df, i):
447462
# COMMAND ----------
448463

449464
# MAGIC %md
450-
# MAGIC ### 🥇 Step 3.1: Synchronize Gold Table with Silver Changes
465+
# MAGIC ### Step 4.3: Synchronize Gold Table with Silver Changes
451466
# MAGIC
452467
# MAGIC Let's now say that we want to perform another table enhancement and propagate these changes downstream.
453468
# MAGIC
@@ -459,7 +474,7 @@ def merge_stream(df, i):
459474

460475
# COMMAND ----------
461476

462-
# DBTITLE 1,🥇 Step 3.4: Create Gold Table
477+
# DBTITLE 1,Create Gold Table
463478
# MAGIC %sql
464479
# MAGIC CREATE TABLE IF NOT EXISTS retail_client_gold (id BIGINT NOT NULL, name STRING, address STRING, email STRING, gold_data STRING)
465480
# MAGIC TBLPROPERTIES (
@@ -512,13 +527,10 @@ def upsertToDelta(data, batchId):
512527

513528
# MAGIC %sql SELECT * FROM retail_client_gold
514529

515-
# COMMAND ----------
516-
517-
518530
# COMMAND ----------
519531

520532
# MAGIC %md
521-
# MAGIC ## Continuous Serverless CDC Processing with Incremental Data Processing
533+
# MAGIC ## 📊 Step 5: Continuous Serverless Incremental Processing
522534
# MAGIC
523535
# MAGIC With the data generator running, you can now demonstrate continuous serverless CDC processing. The pipeline is designed to process **only newly arrived data** using checkpoints and streaming offsets.
524536
# MAGIC
@@ -531,7 +543,6 @@ def upsertToDelta(data, batchId):
531543

532544
# COMMAND ----------
533545

534-
# DBTITLE 1,🚀 Step 4.1: Serverless Pipeline Trigger Function
535546
def trigger_cdc_pipeline():
536547
"""
537548
Trigger all CDC streams to process new data with serverless compute.
@@ -600,33 +611,6 @@ def trigger_cdc_pipeline():
600611

601612
# COMMAND ----------
602613

603-
# MAGIC %md
604-
# MAGIC ### Production Deployment Options
605-
# MAGIC
606-
# MAGIC **Option 1: Scheduled Databricks Job**
607-
# MAGIC ```python
608-
# MAGIC # Schedule this notebook to run every 5 minutes using Databricks Jobs
609-
# MAGIC # The data generator creates new files every 60 seconds
610-
# MAGIC # Serverless compute will auto-scale and process all available data
611-
# MAGIC trigger_cdc_pipeline()
612-
# MAGIC ```
613-
# MAGIC
614-
# MAGIC **Option 2: Continuous Loop (for demo purposes)**
615-
# MAGIC ```python
616-
# MAGIC # Run continuous processing loop
617-
# MAGIC while generator_running:
618-
# MAGIC trigger_cdc_pipeline()
619-
# MAGIC time.sleep(60) # Process every minute
620-
# MAGIC ```
621-
# MAGIC
622-
# MAGIC **Option 3: Event-Driven Processing**
623-
# MAGIC - Use cloud storage notifications
624-
# MAGIC - Trigger via REST API
625-
# MAGIC - Integrate with orchestration tools (Airflow, etc.)
626-
627-
# COMMAND ----------
628-
629-
# DBTITLE 1,🚀 Step 4: Complete CDC Pipeline Demo
630614
print("🎯 Running one iteration of serverless CDC processing...")
631615
print("💡 In production, schedule this via Databricks Jobs every few minutes")
632616

@@ -727,7 +711,7 @@ def get_table_sizes():
727711

728712
# COMMAND ----------
729713

730-
# DBTITLE 1,📊 Step 5.1: Cleanup and Stop Data Generator
714+
# DBTITLE 1,Cleanup and Stop Data Generator
731715
stop_cdc_generator()
732716
DBDemos.stop_all_streams()
733717

@@ -740,20 +724,7 @@ def get_table_sizes():
740724
# COMMAND ----------
741725

742726
# MAGIC %md
743-
# MAGIC ## 📊 Step 6: Data Sharing and Datamesh Organization
744-
# MAGIC
745-
# MAGIC <img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/product/Delta-Lake-CDC-CDF/delta-cdf-datamesh.png" style="float:right; margin-right: 50px" width="300px" />
746-
# MAGIC
747-
# MAGIC ### Key Benefits:
748-
# MAGIC - 🔄 **Change Tracking**: Track all INSERT/UPDATE/DELETE operations from any Delta table
749-
# MAGIC - 📡 **Incremental Processing**: Subscribe to table modifications as incremental processes
750-
# MAGIC - 🏗️ **Data Mesh Ready**: Each mesh can publish tables, others can subscribe to changes
751-
# MAGIC - 🛡️ **GDPR Compliance**: Propagate changes (e.g., GDPR DELETE) across data meshes
752-
753-
# COMMAND ----------
754-
755-
# MAGIC %md
756-
# MAGIC ## 📊 Step 7: Data Ready for BI & ML Use Cases
727+
# MAGIC ## Data Ready for BI & ML Use Cases
757728
# MAGIC
758729
# MAGIC <img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/product/Delta-Lake-CDC-CDF/cdc-flow-4.png" alt='BI and ML Ready' style='float: right' width='600'/>
759730
# MAGIC
@@ -766,16 +737,23 @@ def get_table_sizes():
766737
# COMMAND ----------
767738

768739
# MAGIC %md
769-
# MAGIC ## 📊 Step 8: Next Steps
740+
# MAGIC ## Data Sharing and Datamesh Organization
741+
# MAGIC
742+
# MAGIC <img src="https://github.com/databricks-demos/dbdemos-resources/raw/main/images/product/Delta-Lake-CDC-CDF/delta-cdf-datamesh.png" style="float:right; margin-right: 50px" width="300px" />
743+
# MAGIC
744+
# MAGIC ### Key Benefits:
745+
# MAGIC - 🔄 **Change Tracking**: Track all INSERT/UPDATE/DELETE operations from any Delta table
746+
# MAGIC - 📡 **Incremental Processing**: Subscribe to table modifications as incremental processes
747+
# MAGIC - 🏗️ **Data Mesh Ready**: Each mesh can publish tables, others can subscribe to changes
748+
# MAGIC - 🛡️ **GDPR Compliance**: Propagate changes (e.g., GDPR DELETE) across data meshes
749+
750+
# COMMAND ----------
751+
752+
# MAGIC %md
753+
# MAGIC ## Next Steps
770754
# MAGIC
771755
# MAGIC ### Continue Your CDC Journey:
772756
# MAGIC - 🔗 **[Multi-Table CDC Pipeline]($./02-CDC-CDF-full-multi-tables)**: Scale to multiple tables
773757
# MAGIC - 🏗️ **[Delta Live Tables]($./dlt-cdc)**: Simplified CDC with `APPLY CHANGES`
774758
# MAGIC - 📚 **[Delta Lake Demo]($./delta-lake)**: Deep dive into Delta Lake features
775-
# MAGIC - 🚀 **[Auto Loader Demo]($./auto-loader)**: Advanced file ingestion patterns
776-
# MAGIC
777-
# MAGIC ### Production Deployment:
778-
# MAGIC - 📅 **Schedule Jobs**: Use Databricks Jobs for automated processing
779-
# MAGIC - 📊 **Monitor Performance**: Set up alerts and dashboards
780-
# MAGIC - 🔒 **Security**: Implement proper access controls and data governance
781-
# MAGIC - 💰 **Cost Optimization**: Monitor and optimize serverless compute usage
759+
# MAGIC - 🚀 **[Auto Loader Demo]($./auto-loader)**: Advanced file ingestion patterns

0 commit comments

Comments
 (0)