From 373622be4d303dba8e0b6a1945e7139aa90986b9 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Sun, 7 Jul 2024 13:07:48 -0700 Subject: [PATCH 01/12] update gitignore --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index f619cbd..57198b5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ **/venv *.db +.env \ No newline at end of file From 8979f253349a3f6fab56d433bd6be3970dd67ddf Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Sun, 7 Jul 2024 13:19:54 -0700 Subject: [PATCH 02/12] update configs --- sqlmesh/.gitignore | 3 +-- sqlmesh/config.yaml | 25 +++++++++++++++++++++++++ sqlmesh/config.yaml.local | 23 ----------------------- sqlmesh/requirements.txt | 2 +- 4 files changed, 27 insertions(+), 26 deletions(-) create mode 100644 sqlmesh/config.yaml delete mode 100644 sqlmesh/config.yaml.local diff --git a/sqlmesh/.gitignore b/sqlmesh/.gitignore index 9225948..26feb90 100644 --- a/sqlmesh/.gitignore +++ b/sqlmesh/.gitignore @@ -164,5 +164,4 @@ service-account.json sqlmesh/logs/** -**/venv -config.yaml \ No newline at end of file +**/venv \ No newline at end of file diff --git a/sqlmesh/config.yaml b/sqlmesh/config.yaml new file mode 100644 index 0000000..23c3885 --- /dev/null +++ b/sqlmesh/config.yaml @@ -0,0 +1,25 @@ +gateways: + duckdb: + connection: + type: duckdb + database: sqlmesh_multi_engine_demo.db + snowflake: + connection: + type: snowflake + account: {{ env_var('SNOWFLAKE_ACCOUNT') }} # TODO: export your env_var + user: {{ env_var('SNOWFLAKE_USER') }} # TODO: export your env_var + password: {{ env_var('SNOWFLAKE_PASSWORD') }} # TODO: export your env_var + database: {{ env_var('SNOWFLAKE_DATABASE') }} # TODO: export your env + role: {{ env_var('SNOWFLAKE_ROLE') }} # TODO: export your env + +model_defaults: + dialect: "@IF(@gateway = 'duckdb', 'duckdb', 'snowflake')" + +default_gateway: duckdb + +plan: + enable_preview: true + +ui: + format_on_save: false + diff --git a/sqlmesh/config.yaml.local b/sqlmesh/config.yaml.local deleted file mode 100644 index ddee68c..0000000 --- a/sqlmesh/config.yaml.local +++ /dev/null @@ -1,23 +0,0 @@ -gateways: - duckdb: - connection: - type: duckdb - database: sqlmesh-multi_engine_demo.db - snowflake: - connection: - type: snowflake - account: - user: - password: - database: - role: - -model_defaults: - dialect: snowflake - -plan: - enable_preview: true - -ui: - format_on_save: false - diff --git a/sqlmesh/requirements.txt b/sqlmesh/requirements.txt index c47535f..aad0087 100644 --- a/sqlmesh/requirements.txt +++ b/sqlmesh/requirements.txt @@ -1,5 +1,5 @@ sqlmesh[web,github,bigquery,snowflake]==0.105.1 pytest==8.1.1 -pyiceberg[glue] +pyiceberg[glue] # TODO pin these versions snowflake-snowpark-python snowflake-ml-python \ No newline at end of file From fcbdc5b90c1e24cd5d4093371095bac1e5a76211 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Sun, 7 Jul 2024 13:26:33 -0700 Subject: [PATCH 03/12] pin versions --- sqlmesh/requirements.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sqlmesh/requirements.txt b/sqlmesh/requirements.txt index aad0087..a8988ac 100644 --- a/sqlmesh/requirements.txt +++ b/sqlmesh/requirements.txt @@ -1,5 +1,5 @@ sqlmesh[web,github,bigquery,snowflake]==0.105.1 pytest==8.1.1 -pyiceberg[glue] # TODO pin these versions -snowflake-snowpark-python -snowflake-ml-python \ No newline at end of file +pyiceberg[glue]==0.6.1 +snowflake-snowpark-python==1.19.0 +snowflake-ml-python==1.5.3 \ No newline at end of file From 00d52b446610e3c3a402cea4342d45e1be1009b9 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Sun, 7 Jul 2024 13:26:37 -0700 Subject: [PATCH 04/12] reorganize files --- sqlmesh/models/duckdb_only/stg_reviews.sql | 22 +++++++++++++++++++ .../staging_prediction.py | 0 .../{ => snowflake_only}/staging_reviews.py | 0 3 files changed, 22 insertions(+) create mode 100644 sqlmesh/models/duckdb_only/stg_reviews.sql rename sqlmesh/models/{ => snowflake_only}/staging_prediction.py (100%) rename sqlmesh/models/{ => snowflake_only}/staging_reviews.py (100%) diff --git a/sqlmesh/models/duckdb_only/stg_reviews.sql b/sqlmesh/models/duckdb_only/stg_reviews.sql new file mode 100644 index 0000000..4a412f6 --- /dev/null +++ b/sqlmesh/models/duckdb_only/stg_reviews.sql @@ -0,0 +1,22 @@ +MODEL ( + name reviews.stg_reviews, + kind FULL, + columns ( + reviewid string, + username string, + review string, + ingestion_date timestamp + ), + enabled @IF(@gateway='duckdb', True, False) +); + +SELECT + reviewid, + username, + review, + epoch_ms(ingestion_date) as ingestion_date +FROM landing_reviews +QUALIFY + row_number() OVER (PARTITION BY username, review ORDER BY ingestion_date) = 1; + + diff --git a/sqlmesh/models/staging_prediction.py b/sqlmesh/models/snowflake_only/staging_prediction.py similarity index 100% rename from sqlmesh/models/staging_prediction.py rename to sqlmesh/models/snowflake_only/staging_prediction.py diff --git a/sqlmesh/models/staging_reviews.py b/sqlmesh/models/snowflake_only/staging_reviews.py similarity index 100% rename from sqlmesh/models/staging_reviews.py rename to sqlmesh/models/snowflake_only/staging_reviews.py From 661a495a50e706f84cbe58123f87287aa51d0a2a Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Sun, 7 Jul 2024 14:33:35 -0700 Subject: [PATCH 05/12] update configs --- sqlmesh/config.yaml | 2 +- sqlmesh/requirements.txt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sqlmesh/config.yaml b/sqlmesh/config.yaml index 23c3885..f9cd6d2 100644 --- a/sqlmesh/config.yaml +++ b/sqlmesh/config.yaml @@ -13,7 +13,7 @@ gateways: role: {{ env_var('SNOWFLAKE_ROLE') }} # TODO: export your env model_defaults: - dialect: "@IF(@gateway = 'duckdb', 'duckdb', 'snowflake')" + dialect: duckdb default_gateway: duckdb diff --git a/sqlmesh/requirements.txt b/sqlmesh/requirements.txt index a8988ac..e9a42e0 100644 --- a/sqlmesh/requirements.txt +++ b/sqlmesh/requirements.txt @@ -1,4 +1,4 @@ -sqlmesh[web,github,bigquery,snowflake]==0.105.1 +sqlmesh[web,github,bigquery,snowflake]==0.109.2 pytest==8.1.1 pyiceberg[glue]==0.6.1 snowflake-snowpark-python==1.19.0 From a5a2c8b95bb4ce3f69aefcc261a3bc0a2dfe22ff Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Sun, 7 Jul 2024 14:52:53 -0700 Subject: [PATCH 06/12] enabled flag not working as expected --- sqlmesh/models/snowflake_only/staging_prediction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sqlmesh/models/snowflake_only/staging_prediction.py b/sqlmesh/models/snowflake_only/staging_prediction.py index f02410b..3e3718d 100644 --- a/sqlmesh/models/snowflake_only/staging_prediction.py +++ b/sqlmesh/models/snowflake_only/staging_prediction.py @@ -23,7 +23,7 @@ "ingestion_timestamp": "timestamp", # "prediction_timestamp": "timestamp" }, - enabled= True, + enabled= False, # TODO: fix this depends_on=["reviews.staging_reviews"] ) def execute( From 7b7797d45999911974727ff6051dde90a747268a Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Sun, 7 Jul 2024 15:05:59 -0700 Subject: [PATCH 07/12] add in allow_partials example --- .../duckdb_only/incremental_example.sql | 19 +++++++++++++++ .../models/duckdb_only/raw_data_example.sql | 23 +++++++++++++++++++ 2 files changed, 42 insertions(+) create mode 100644 sqlmesh/models/duckdb_only/incremental_example.sql create mode 100644 sqlmesh/models/duckdb_only/raw_data_example.sql diff --git a/sqlmesh/models/duckdb_only/incremental_example.sql b/sqlmesh/models/duckdb_only/incremental_example.sql new file mode 100644 index 0000000..cc5e59d --- /dev/null +++ b/sqlmesh/models/duckdb_only/incremental_example.sql @@ -0,0 +1,19 @@ +MODEL ( + name reviews.incremental_example, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date + ), + start '2020-01-01', + cron '@daily', + grain (id, event_date), + allow_partials true +); + +SELECT + id, + item_id, + event_date +FROM + reviews.raw_data_example +WHERE + event_date BETWEEN @start_date AND @end_date; diff --git a/sqlmesh/models/duckdb_only/raw_data_example.sql b/sqlmesh/models/duckdb_only/raw_data_example.sql new file mode 100644 index 0000000..1715931 --- /dev/null +++ b/sqlmesh/models/duckdb_only/raw_data_example.sql @@ -0,0 +1,23 @@ +MODEL ( + name reviews.raw_data_example, + kind VIEW +); + +SELECT + CAST(id AS INTEGER), + CAST(item_id AS VARCHAR), + CAST(event_date AS DATE) +FROM ( + VALUES + (1, 'item_1', '2020-01-01'), + (2, 'item_2', '2020-01-02'), + (3, 'item_3', '2020-01-03'), + (4, 'item_4', '2020-01-04'), + (5, 'item_5', '2020-01-05'), + (6, 'item_6', '2020-01-06'), + (7, 'item_7', '2020-01-07'), + (8, 'item_8', '2020-01-08'), + (9, 'item_9', '2020-01-09'), + (10, 'item_10', '2020-01-10') +) AS t(id, item_id, event_date); + From 792139f65bc0dab47b83a76fe6915668863f7755 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Thu, 11 Jul 2024 10:12:02 -0600 Subject: [PATCH 08/12] conditional enabled python macro --- sqlmesh/macros/custom_macros.py | 8 +++++++ .../models/duckdb_only/raw_data_example.sql | 3 ++- sqlmesh/models/duckdb_only/stg_reviews.sql | 22 ------------------- .../snowflake_only/staging_prediction.py | 4 +++- .../models/snowflake_only/staging_reviews.py | 4 +++- sqlmesh/requirements.txt | 2 +- 6 files changed, 17 insertions(+), 26 deletions(-) create mode 100644 sqlmesh/macros/custom_macros.py delete mode 100644 sqlmesh/models/duckdb_only/stg_reviews.sql diff --git a/sqlmesh/macros/custom_macros.py b/sqlmesh/macros/custom_macros.py new file mode 100644 index 0000000..cfc6c82 --- /dev/null +++ b/sqlmesh/macros/custom_macros.py @@ -0,0 +1,8 @@ +from sqlmesh import macro + +@macro() +def snowflake_only(evaluator): + if evaluator.gateway == 'snowflake': + return True + else: + return False \ No newline at end of file diff --git a/sqlmesh/models/duckdb_only/raw_data_example.sql b/sqlmesh/models/duckdb_only/raw_data_example.sql index 1715931..be0271d 100644 --- a/sqlmesh/models/duckdb_only/raw_data_example.sql +++ b/sqlmesh/models/duckdb_only/raw_data_example.sql @@ -1,6 +1,7 @@ MODEL ( name reviews.raw_data_example, - kind VIEW + kind VIEW, + enabled @IF(@gateway='duckdb', True, False) ); SELECT diff --git a/sqlmesh/models/duckdb_only/stg_reviews.sql b/sqlmesh/models/duckdb_only/stg_reviews.sql deleted file mode 100644 index 4a412f6..0000000 --- a/sqlmesh/models/duckdb_only/stg_reviews.sql +++ /dev/null @@ -1,22 +0,0 @@ -MODEL ( - name reviews.stg_reviews, - kind FULL, - columns ( - reviewid string, - username string, - review string, - ingestion_date timestamp - ), - enabled @IF(@gateway='duckdb', True, False) -); - -SELECT - reviewid, - username, - review, - epoch_ms(ingestion_date) as ingestion_date -FROM landing_reviews -QUALIFY - row_number() OVER (PARTITION BY username, review ORDER BY ingestion_date) = 1; - - diff --git a/sqlmesh/models/snowflake_only/staging_prediction.py b/sqlmesh/models/snowflake_only/staging_prediction.py index 3e3718d..356fa64 100644 --- a/sqlmesh/models/snowflake_only/staging_prediction.py +++ b/sqlmesh/models/snowflake_only/staging_prediction.py @@ -6,8 +6,10 @@ from snowflake.cortex import Sentiment, Complete from snowflake.snowpark.functions import col, concat, lit, current_timestamp, parse_json from sqlmesh.core.model.kind import ModelKindName +from sqlmesh.core.macros import MacroEvaluator from sqlmesh import ExecutionContext, model from pyiceberg.catalog import load_catalog +from macros.custom_macros import snowflake_only @model( "reviews.prediction", @@ -23,7 +25,7 @@ "ingestion_timestamp": "timestamp", # "prediction_timestamp": "timestamp" }, - enabled= False, # TODO: fix this + enabled=snowflake_only(evaluator=MacroEvaluator), depends_on=["reviews.staging_reviews"] ) def execute( diff --git a/sqlmesh/models/snowflake_only/staging_reviews.py b/sqlmesh/models/snowflake_only/staging_reviews.py index 41b81ab..bee1793 100644 --- a/sqlmesh/models/snowflake_only/staging_reviews.py +++ b/sqlmesh/models/snowflake_only/staging_reviews.py @@ -2,7 +2,9 @@ from datetime import datetime from sqlmesh import ExecutionContext, model +from sqlmesh.core.macros import MacroEvaluator from pyiceberg.catalog import load_catalog +from macros.custom_macros import snowflake_only @model( "reviews.staging_reviews", @@ -13,7 +15,7 @@ "review": "string", "ingestion_timestamp": "timestamp", }, - enabled=False #"@IF(@gateway='duckdb', False, True)" + enabled=snowflake_only(evaluator=MacroEvaluator) ) def execute( context: ExecutionContext, diff --git a/sqlmesh/requirements.txt b/sqlmesh/requirements.txt index e9a42e0..4e40a29 100644 --- a/sqlmesh/requirements.txt +++ b/sqlmesh/requirements.txt @@ -1,4 +1,4 @@ -sqlmesh[web,github,bigquery,snowflake]==0.109.2 +sqlmesh[web,github,bigquery,snowflake]==0.110.1 pytest==8.1.1 pyiceberg[glue]==0.6.1 snowflake-snowpark-python==1.19.0 From ed9f76000a358f12392dc1be66d211178c7f5eb0 Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 15 Jul 2024 16:27:16 -0700 Subject: [PATCH 09/12] working python models --- .vscode/launch.json | 17 ++++++ .vscode/settings.json | 3 + sqlmesh/config.yaml | 9 ++- sqlmesh/macros/custom_macros.py | 13 +++- .../duckdb_only/incremental_example.sql | 3 +- .../snowflake_only/staging_prediction.py | 61 +++++++++++-------- .../models/snowflake_only/staging_reviews.py | 22 ++++--- 7 files changed, 89 insertions(+), 39 deletions(-) create mode 100644 .vscode/launch.json create mode 100644 .vscode/settings.json diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..07d1418 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,17 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "cwd": "${fileDirname}", + "justMyCode": false + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a3d73fc --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "debug.inlineValues": "on" +} \ No newline at end of file diff --git a/sqlmesh/config.yaml b/sqlmesh/config.yaml index f9cd6d2..dcb6854 100644 --- a/sqlmesh/config.yaml +++ b/sqlmesh/config.yaml @@ -11,11 +11,14 @@ gateways: password: {{ env_var('SNOWFLAKE_PASSWORD') }} # TODO: export your env_var database: {{ env_var('SNOWFLAKE_DATABASE') }} # TODO: export your env role: {{ env_var('SNOWFLAKE_ROLE') }} # TODO: export your env - + warehouse: {{ env_var('SNOWFLAKE_WAREHOUSE') }} # TODO: export your env + state_connection: + type: duckdb + database: snowflake_state.db model_defaults: - dialect: duckdb + dialect: snowflake # TODO: make this dynamic by gateway -default_gateway: duckdb +default_gateway: snowflake plan: enable_preview: true diff --git a/sqlmesh/macros/custom_macros.py b/sqlmesh/macros/custom_macros.py index cfc6c82..b86bcc1 100644 --- a/sqlmesh/macros/custom_macros.py +++ b/sqlmesh/macros/custom_macros.py @@ -1,8 +1,15 @@ from sqlmesh import macro - + @macro() -def snowflake_only(evaluator): - if evaluator.gateway == 'snowflake': +def snow_only(evaluator): + if evaluator.gateway == 'snow': + return True + else: + return False + +@macro() +def duckdb_only(evaluator): + if 'duckdb' in str(evaluator.gateway): return True else: return False \ No newline at end of file diff --git a/sqlmesh/models/duckdb_only/incremental_example.sql b/sqlmesh/models/duckdb_only/incremental_example.sql index cc5e59d..95eedbd 100644 --- a/sqlmesh/models/duckdb_only/incremental_example.sql +++ b/sqlmesh/models/duckdb_only/incremental_example.sql @@ -6,7 +6,8 @@ MODEL ( start '2020-01-01', cron '@daily', grain (id, event_date), - allow_partials true + allow_partials true, + enabled @IF(@gateway='duckdb', True, False) ); SELECT diff --git a/sqlmesh/models/snowflake_only/staging_prediction.py b/sqlmesh/models/snowflake_only/staging_prediction.py index 356fa64..d787c5c 100644 --- a/sqlmesh/models/snowflake_only/staging_prediction.py +++ b/sqlmesh/models/snowflake_only/staging_prediction.py @@ -6,10 +6,11 @@ from snowflake.cortex import Sentiment, Complete from snowflake.snowpark.functions import col, concat, lit, current_timestamp, parse_json from sqlmesh.core.model.kind import ModelKindName -from sqlmesh.core.macros import MacroEvaluator from sqlmesh import ExecutionContext, model from pyiceberg.catalog import load_catalog -from macros.custom_macros import snowflake_only +from sqlmesh.core.macros import MacroEvaluator +from macros.custom_macros import snow_only +import os @model( "reviews.prediction", @@ -17,16 +18,16 @@ name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ingestion_timestamp" ), - # cron="@hourly", + cron="*/5 * * * *", columns={ "reviewid": "string", "sentiment": "float", - "classification": "variant", - "ingestion_timestamp": "timestamp", - # "prediction_timestamp": "timestamp" + "classification": "string", + "ingestion_timestamp": "timestamp" }, - enabled=snowflake_only(evaluator=MacroEvaluator), - depends_on=["reviews.staging_reviews"] + start='2024-07-01', + depends_on=["reviews.staging_reviews"], + # enabled=snow_only(evaluator=MacroEvaluator) ) def execute( context: ExecutionContext, @@ -38,7 +39,10 @@ def execute( print(start, end) - df = context.snowpark.table("REVIEWS.STAGING_REVIEWS").limit(2) + context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.STAGING_REVIEWS REFRESH") + + + df = context.snowpark.table("MULTIENGINE_DB.REVIEWS.STAGING_REVIEWS") df = df.filter(f"(ingestion_timestamp >= '{start}') and (ingestion_timestamp <= '{end}')").select("reviewid", "review", "ingestion_timestamp") @@ -49,45 +53,52 @@ def execute( df = df.withColumn( "classification", - parse_json(Complete( + Complete( "llama3-8b", concat( lit("Extract author, book and character of the following "), col("review"), - lit(". Return only a json with the following format {author: , book: , character: }. Return only JSON, no verbose text.") + lit(""". Return only a json with the following format {author: , + book: , character: }. Return only JSON, no verbose text.""") ) - )) + ) ) + # df = df.withColumn("prediction_timestamp", current_timestamp()) df = df.select( "reviewid", "sentiment", - "classification", - "ingestion_timestamp", - # "prediction_timestamp" + "classification", + "ingestion_timestamp" ) - output= pa.Table.from_pandas(df.to_pandas()) - + schema = pa.schema( + [ + pa.field("REVIEWID", pa.string(), nullable=False), + pa.field("SENTIMENT", pa.float64(), nullable=True), + pa.field("CLASSIFICATION", pa.string(), nullable=True), + pa.field("INGESTION_TIMESTAMP", pa.timestamp('us'), nullable=False) + ] + ) + catalog = load_catalog("glue", **{"type": "glue", - "region_name":"eu-central-1", - "s3.region":"eu-central-1", - }) + "s3.region":"eu-central-1", + "s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"), + "s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY") + }) # create Iceberg if not exists tables = catalog.list_tables("multiengine") if ("multiengine", "predictions") not in tables: catalog.create_table( "multiengine.predictions", - output.schema, + schema, location="s3://sumeo-parquet-data-lake/staging/predictions") # append partition to Iceberg table - catalog.load_table("multiengine.predictions").append(output) - - + catalog.load_table("multiengine.predictions").append(pa.Table.from_pandas(df.to_pandas(), schema=schema)) context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.PREDICTION REFRESH") - return df + return df \ No newline at end of file diff --git a/sqlmesh/models/snowflake_only/staging_reviews.py b/sqlmesh/models/snowflake_only/staging_reviews.py index bee1793..fe16410 100644 --- a/sqlmesh/models/snowflake_only/staging_reviews.py +++ b/sqlmesh/models/snowflake_only/staging_reviews.py @@ -2,20 +2,25 @@ from datetime import datetime from sqlmesh import ExecutionContext, model -from sqlmesh.core.macros import MacroEvaluator from pyiceberg.catalog import load_catalog -from macros.custom_macros import snowflake_only +from sqlmesh.core.macros import MacroEvaluator +from macros.custom_macros import duckdb_only +import os + @model( "reviews.staging_reviews", kind="FULL", + cron="*/5 * * * *", columns={ "reviewid": "string", "username": "string", "review": "string", "ingestion_timestamp": "timestamp", + "source_s3_key":"string" }, - enabled=snowflake_only(evaluator=MacroEvaluator) + start='2024-07-01', + enabled=duckdb_only(evaluator=MacroEvaluator) ) def execute( context: ExecutionContext, @@ -26,9 +31,11 @@ def execute( ) -> None: print(start, end) + # aws glue catalog for iceberg is read only catalog = load_catalog("glue", **{"type": "glue", - "region_name":"eu-central-1", "s3.region":"eu-central-1", + "s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"), + "s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY") }) # load landing data in duckdb @@ -40,13 +47,14 @@ def execute( reviewid, username, review, - epoch_ms(ingestion_date) as ingestion_timestamp + epoch_ms(ingestion_date) as ingestion_timestamp, + source_s3_key FROM landing_reviews QUALIFY row_number() OVER (PARTITION BY username, review order by ingestion_timestamp) =1; """).arrow() - # create Iceberg if not exists + # create Iceberg table if not exists tables = catalog.list_tables("multiengine") if ("multiengine", "staging_reviews") not in tables: catalog.create_table( @@ -57,4 +65,4 @@ def execute( # overwrite target Iceberg table catalog.load_table("multiengine.staging_reviews").overwrite(output) - return output.to_pandas() + return output.to_pandas() \ No newline at end of file From 2a6c32e61db984efaf93c73302410735d322fb7b Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 15 Jul 2024 17:37:54 -0700 Subject: [PATCH 10/12] fix duckdb and enabled configs --- sqlmesh/config.yaml | 4 ++-- sqlmesh/macros/custom_macros.py | 15 --------------- .../staging_reviews.py | 4 +--- 3 files changed, 3 insertions(+), 20 deletions(-) delete mode 100644 sqlmesh/macros/custom_macros.py rename sqlmesh/models/{snowflake_only => duckdb_only}/staging_reviews.py (93%) diff --git a/sqlmesh/config.yaml b/sqlmesh/config.yaml index dcb6854..8f5704c 100644 --- a/sqlmesh/config.yaml +++ b/sqlmesh/config.yaml @@ -16,9 +16,9 @@ gateways: type: duckdb database: snowflake_state.db model_defaults: - dialect: snowflake # TODO: make this dynamic by gateway + dialect: {{ env_var('DEFAULT_DIALECT') }} -default_gateway: snowflake +default_gateway: {{ env_var('DEFAULT_GATEWAY') }} plan: enable_preview: true diff --git a/sqlmesh/macros/custom_macros.py b/sqlmesh/macros/custom_macros.py deleted file mode 100644 index b86bcc1..0000000 --- a/sqlmesh/macros/custom_macros.py +++ /dev/null @@ -1,15 +0,0 @@ -from sqlmesh import macro - -@macro() -def snow_only(evaluator): - if evaluator.gateway == 'snow': - return True - else: - return False - -@macro() -def duckdb_only(evaluator): - if 'duckdb' in str(evaluator.gateway): - return True - else: - return False \ No newline at end of file diff --git a/sqlmesh/models/snowflake_only/staging_reviews.py b/sqlmesh/models/duckdb_only/staging_reviews.py similarity index 93% rename from sqlmesh/models/snowflake_only/staging_reviews.py rename to sqlmesh/models/duckdb_only/staging_reviews.py index fe16410..a16e603 100644 --- a/sqlmesh/models/snowflake_only/staging_reviews.py +++ b/sqlmesh/models/duckdb_only/staging_reviews.py @@ -3,8 +3,6 @@ from sqlmesh import ExecutionContext, model from pyiceberg.catalog import load_catalog -from sqlmesh.core.macros import MacroEvaluator -from macros.custom_macros import duckdb_only import os @@ -20,7 +18,7 @@ "source_s3_key":"string" }, start='2024-07-01', - enabled=duckdb_only(evaluator=MacroEvaluator) + enabled=os.environ.get("DUCKDB_ENABLED") ) def execute( context: ExecutionContext, From d30bd575ae67b32ec1c1ca21e488f26b14b1bf5b Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Mon, 15 Jul 2024 17:52:23 -0700 Subject: [PATCH 11/12] fix --- sqlmesh/models/snowflake_only/staging_prediction.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sqlmesh/models/snowflake_only/staging_prediction.py b/sqlmesh/models/snowflake_only/staging_prediction.py index d787c5c..fe3b8df 100644 --- a/sqlmesh/models/snowflake_only/staging_prediction.py +++ b/sqlmesh/models/snowflake_only/staging_prediction.py @@ -8,8 +8,6 @@ from sqlmesh.core.model.kind import ModelKindName from sqlmesh import ExecutionContext, model from pyiceberg.catalog import load_catalog -from sqlmesh.core.macros import MacroEvaluator -from macros.custom_macros import snow_only import os @model( @@ -27,7 +25,7 @@ }, start='2024-07-01', depends_on=["reviews.staging_reviews"], - # enabled=snow_only(evaluator=MacroEvaluator) + enabled=os.environ.get("SNOWFLAKE_ENABLED") ) def execute( context: ExecutionContext, From 2a9801d78c369df0691d613639757a1ad12c29ad Mon Sep 17 00:00:00 2001 From: Sung Won Chung Date: Tue, 16 Jul 2024 10:39:10 -0700 Subject: [PATCH 12/12] clean and format --- sqlmesh/models/duckdb_only/staging_reviews.py | 44 +++++---- .../snowflake_only/staging_prediction.py | 90 ++++++++++--------- 2 files changed, 76 insertions(+), 58 deletions(-) diff --git a/sqlmesh/models/duckdb_only/staging_reviews.py b/sqlmesh/models/duckdb_only/staging_reviews.py index a16e603..1e38a7e 100644 --- a/sqlmesh/models/duckdb_only/staging_reviews.py +++ b/sqlmesh/models/duckdb_only/staging_reviews.py @@ -2,8 +2,9 @@ from datetime import datetime from sqlmesh import ExecutionContext, model +from sqlglot.expressions import to_column from pyiceberg.catalog import load_catalog -import os +import os @model( @@ -15,10 +16,14 @@ "username": "string", "review": "string", "ingestion_timestamp": "timestamp", - "source_s3_key":"string" + "source_s3_key": "string", }, - start='2024-07-01', - enabled=os.environ.get("DUCKDB_ENABLED") + audits=[ + ("unique_values", {"columns": [to_column("reviewid")]}), + ("not_null", {"columns": [to_column("reviewid")]}), + ], + start="2024-07-01", + enabled=os.environ.get("DUCKDB_ENABLED"), ) def execute( context: ExecutionContext, @@ -30,16 +35,24 @@ def execute( print(start, end) # aws glue catalog for iceberg is read only - catalog = load_catalog("glue", **{"type": "glue", - "s3.region":"eu-central-1", - "s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"), - "s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY") - }) + catalog = load_catalog( + "glue", + **{ + "type": "glue", + "s3.region": "eu-central-1", + "s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"), + "s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY"), + }, + ) # load landing data in duckdb - con = catalog.load_table("multiengine.landing_reviews").scan().to_duckdb(table_name="landing_reviews") + con = ( + catalog.load_table("multiengine.landing_reviews") + .scan() + .to_duckdb(table_name="landing_reviews") + ) - # compute ouput + # compute output output = con.execute(""" SELECT reviewid, @@ -51,16 +64,17 @@ def execute( QUALIFY row_number() OVER (PARTITION BY username, review order by ingestion_timestamp) =1; """).arrow() - + # create Iceberg table if not exists tables = catalog.list_tables("multiengine") if ("multiengine", "staging_reviews") not in tables: catalog.create_table( "multiengine.staging_reviews", output.schema, - location="s3://sumeo-parquet-data-lake/staging/reviews") - + location="s3://sumeo-parquet-data-lake/staging/reviews", + ) + # overwrite target Iceberg table catalog.load_table("multiengine.staging_reviews").overwrite(output) - return output.to_pandas() \ No newline at end of file + return output.to_pandas() diff --git a/sqlmesh/models/snowflake_only/staging_prediction.py b/sqlmesh/models/snowflake_only/staging_prediction.py index fe3b8df..062375d 100644 --- a/sqlmesh/models/snowflake_only/staging_prediction.py +++ b/sqlmesh/models/snowflake_only/staging_prediction.py @@ -8,24 +8,29 @@ from sqlmesh.core.model.kind import ModelKindName from sqlmesh import ExecutionContext, model from pyiceberg.catalog import load_catalog -import os +from sqlglot.expressions import to_column +import os + @model( "reviews.prediction", kind=dict( - name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, - time_column="ingestion_timestamp" + name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ingestion_timestamp" ), cron="*/5 * * * *", columns={ "reviewid": "string", "sentiment": "float", "classification": "string", - "ingestion_timestamp": "timestamp" + "ingestion_timestamp": "timestamp", }, - start='2024-07-01', + audits=[ + ("unique_values", {"columns": [to_column("reviewid")]}), + ("not_null", {"columns": [to_column("reviewid")]}), + ], + start="2024-07-01", depends_on=["reviews.staging_reviews"], - enabled=os.environ.get("SNOWFLAKE_ENABLED") + enabled=os.environ.get("SNOWFLAKE_ENABLED"), ) def execute( context: ExecutionContext, @@ -34,69 +39,68 @@ def execute( execution_time: datetime, **kwargs: t.Any, ) -> DataFrame: - print(start, end) context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.STAGING_REVIEWS REFRESH") - df = context.snowpark.table("MULTIENGINE_DB.REVIEWS.STAGING_REVIEWS") - - df = df.filter(f"(ingestion_timestamp >= '{start}') and (ingestion_timestamp <= '{end}')").select("reviewid", "review", "ingestion_timestamp") - - df = df.withColumn( - "sentiment", - Sentiment(col("review")) - ) + + df = df.filter( + f"(ingestion_timestamp >= '{start}') and (ingestion_timestamp <= '{end}')" + ).select("reviewid", "review", "ingestion_timestamp") + + df = df.withColumn("sentiment", Sentiment(col("review"))) df = df.withColumn( "classification", Complete( "llama3-8b", concat( - lit("Extract author, book and character of the following "), - col("review"), - lit(""". Return only a json with the following format {author: , - book: , character: }. Return only JSON, no verbose text.""") - ) - ) + lit("Extract author, book and character of the following "), + col("review"), + lit(""". Return only a json with the following format {author: , + book: , character: }. Return only JSON, no verbose text."""), + ), + ), ) # df = df.withColumn("prediction_timestamp", current_timestamp()) - df = df.select( - "reviewid", - "sentiment", - "classification", - "ingestion_timestamp" - ) + df = df.select("reviewid", "sentiment", "classification", "ingestion_timestamp") schema = pa.schema( - [ - pa.field("REVIEWID", pa.string(), nullable=False), - pa.field("SENTIMENT", pa.float64(), nullable=True), - pa.field("CLASSIFICATION", pa.string(), nullable=True), - pa.field("INGESTION_TIMESTAMP", pa.timestamp('us'), nullable=False) - ] - ) + [ + pa.field("REVIEWID", pa.string(), nullable=False), + pa.field("SENTIMENT", pa.float64(), nullable=True), + pa.field("CLASSIFICATION", pa.string(), nullable=True), + pa.field("INGESTION_TIMESTAMP", pa.timestamp("us"), nullable=False), + ] + ) + + catalog = load_catalog( + "glue", + **{ + "type": "glue", + "s3.region": "eu-central-1", + "s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"), + "s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY"), + }, + ) - catalog = load_catalog("glue", **{"type": "glue", - "s3.region":"eu-central-1", - "s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"), - "s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY") - }) - # create Iceberg if not exists tables = catalog.list_tables("multiengine") if ("multiengine", "predictions") not in tables: catalog.create_table( "multiengine.predictions", schema, - location="s3://sumeo-parquet-data-lake/staging/predictions") + location="s3://sumeo-parquet-data-lake/staging/predictions", + ) # append partition to Iceberg table - catalog.load_table("multiengine.predictions").append(pa.Table.from_pandas(df.to_pandas(), schema=schema)) + catalog.load_table("multiengine.predictions").append( + pa.Table.from_pandas(df.to_pandas(), schema=schema) + ) context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.PREDICTION REFRESH") - return df \ No newline at end of file + return df