diff --git a/.gitignore b/.gitignore index f619cbd..57198b5 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ **/venv *.db +.env \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..07d1418 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,17 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python Debugger: Current File", + "type": "debugpy", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "cwd": "${fileDirname}", + "justMyCode": false + } + ] +} \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..a3d73fc --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "debug.inlineValues": "on" +} \ No newline at end of file diff --git a/sqlmesh/.gitignore b/sqlmesh/.gitignore index 9225948..26feb90 100644 --- a/sqlmesh/.gitignore +++ b/sqlmesh/.gitignore @@ -164,5 +164,4 @@ service-account.json sqlmesh/logs/** -**/venv -config.yaml \ No newline at end of file +**/venv \ No newline at end of file diff --git a/sqlmesh/config.yaml b/sqlmesh/config.yaml new file mode 100644 index 0000000..8f5704c --- /dev/null +++ b/sqlmesh/config.yaml @@ -0,0 +1,28 @@ +gateways: + duckdb: + connection: + type: duckdb + database: sqlmesh_multi_engine_demo.db + snowflake: + connection: + type: snowflake + account: {{ env_var('SNOWFLAKE_ACCOUNT') }} # TODO: export your env_var + user: {{ env_var('SNOWFLAKE_USER') }} # TODO: export your env_var + password: {{ env_var('SNOWFLAKE_PASSWORD') }} # TODO: export your env_var + database: {{ env_var('SNOWFLAKE_DATABASE') }} # TODO: export your env + role: {{ env_var('SNOWFLAKE_ROLE') }} # TODO: export your env + warehouse: {{ env_var('SNOWFLAKE_WAREHOUSE') }} # TODO: export your env + state_connection: + type: duckdb + database: snowflake_state.db +model_defaults: + dialect: {{ env_var('DEFAULT_DIALECT') }} + +default_gateway: {{ env_var('DEFAULT_GATEWAY') }} + +plan: + enable_preview: true + +ui: + format_on_save: false + diff --git a/sqlmesh/config.yaml.local b/sqlmesh/config.yaml.local deleted file mode 100644 index ddee68c..0000000 --- a/sqlmesh/config.yaml.local +++ /dev/null @@ -1,23 +0,0 @@ -gateways: - duckdb: - connection: - type: duckdb - database: sqlmesh-multi_engine_demo.db - snowflake: - connection: - type: snowflake - account: - user: - password: - database: - role: - -model_defaults: - dialect: snowflake - -plan: - enable_preview: true - -ui: - format_on_save: false - diff --git a/sqlmesh/models/duckdb_only/incremental_example.sql b/sqlmesh/models/duckdb_only/incremental_example.sql new file mode 100644 index 0000000..95eedbd --- /dev/null +++ b/sqlmesh/models/duckdb_only/incremental_example.sql @@ -0,0 +1,20 @@ +MODEL ( + name reviews.incremental_example, + kind INCREMENTAL_BY_TIME_RANGE ( + time_column event_date + ), + start '2020-01-01', + cron '@daily', + grain (id, event_date), + allow_partials true, + enabled @IF(@gateway='duckdb', True, False) +); + +SELECT + id, + item_id, + event_date +FROM + reviews.raw_data_example +WHERE + event_date BETWEEN @start_date AND @end_date; diff --git a/sqlmesh/models/duckdb_only/raw_data_example.sql b/sqlmesh/models/duckdb_only/raw_data_example.sql new file mode 100644 index 0000000..be0271d --- /dev/null +++ b/sqlmesh/models/duckdb_only/raw_data_example.sql @@ -0,0 +1,24 @@ +MODEL ( + name reviews.raw_data_example, + kind VIEW, + enabled @IF(@gateway='duckdb', True, False) +); + +SELECT + CAST(id AS INTEGER), + CAST(item_id AS VARCHAR), + CAST(event_date AS DATE) +FROM ( + VALUES + (1, 'item_1', '2020-01-01'), + (2, 'item_2', '2020-01-02'), + (3, 'item_3', '2020-01-03'), + (4, 'item_4', '2020-01-04'), + (5, 'item_5', '2020-01-05'), + (6, 'item_6', '2020-01-06'), + (7, 'item_7', '2020-01-07'), + (8, 'item_8', '2020-01-08'), + (9, 'item_9', '2020-01-09'), + (10, 'item_10', '2020-01-10') +) AS t(id, item_id, event_date); + diff --git a/sqlmesh/models/staging_reviews.py b/sqlmesh/models/duckdb_only/staging_reviews.py similarity index 55% rename from sqlmesh/models/staging_reviews.py rename to sqlmesh/models/duckdb_only/staging_reviews.py index 41b81ab..1e38a7e 100644 --- a/sqlmesh/models/staging_reviews.py +++ b/sqlmesh/models/duckdb_only/staging_reviews.py @@ -2,18 +2,28 @@ from datetime import datetime from sqlmesh import ExecutionContext, model +from sqlglot.expressions import to_column from pyiceberg.catalog import load_catalog +import os + @model( "reviews.staging_reviews", kind="FULL", + cron="*/5 * * * *", columns={ "reviewid": "string", "username": "string", "review": "string", "ingestion_timestamp": "timestamp", + "source_s3_key": "string", }, - enabled=False #"@IF(@gateway='duckdb', False, True)" + audits=[ + ("unique_values", {"columns": [to_column("reviewid")]}), + ("not_null", {"columns": [to_column("reviewid")]}), + ], + start="2024-07-01", + enabled=os.environ.get("DUCKDB_ENABLED"), ) def execute( context: ExecutionContext, @@ -24,34 +34,46 @@ def execute( ) -> None: print(start, end) - catalog = load_catalog("glue", **{"type": "glue", - "region_name":"eu-central-1", - "s3.region":"eu-central-1", - }) + # aws glue catalog for iceberg is read only + catalog = load_catalog( + "glue", + **{ + "type": "glue", + "s3.region": "eu-central-1", + "s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"), + "s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY"), + }, + ) # load landing data in duckdb - con = catalog.load_table("multiengine.landing_reviews").scan().to_duckdb(table_name="landing_reviews") + con = ( + catalog.load_table("multiengine.landing_reviews") + .scan() + .to_duckdb(table_name="landing_reviews") + ) - # compute ouput + # compute output output = con.execute(""" SELECT reviewid, username, review, - epoch_ms(ingestion_date) as ingestion_timestamp + epoch_ms(ingestion_date) as ingestion_timestamp, + source_s3_key FROM landing_reviews QUALIFY row_number() OVER (PARTITION BY username, review order by ingestion_timestamp) =1; """).arrow() - - # create Iceberg if not exists + + # create Iceberg table if not exists tables = catalog.list_tables("multiengine") if ("multiengine", "staging_reviews") not in tables: catalog.create_table( "multiengine.staging_reviews", output.schema, - location="s3://sumeo-parquet-data-lake/staging/reviews") - + location="s3://sumeo-parquet-data-lake/staging/reviews", + ) + # overwrite target Iceberg table catalog.load_table("multiengine.staging_reviews").overwrite(output) diff --git a/sqlmesh/models/snowflake_only/staging_prediction.py b/sqlmesh/models/snowflake_only/staging_prediction.py new file mode 100644 index 0000000..062375d --- /dev/null +++ b/sqlmesh/models/snowflake_only/staging_prediction.py @@ -0,0 +1,106 @@ +import typing as t +from datetime import datetime +import pyarrow as pa +import pandas as pd +from snowflake.snowpark.dataframe import DataFrame +from snowflake.cortex import Sentiment, Complete +from snowflake.snowpark.functions import col, concat, lit, current_timestamp, parse_json +from sqlmesh.core.model.kind import ModelKindName +from sqlmesh import ExecutionContext, model +from pyiceberg.catalog import load_catalog +from sqlglot.expressions import to_column +import os + + +@model( + "reviews.prediction", + kind=dict( + name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ingestion_timestamp" + ), + cron="*/5 * * * *", + columns={ + "reviewid": "string", + "sentiment": "float", + "classification": "string", + "ingestion_timestamp": "timestamp", + }, + audits=[ + ("unique_values", {"columns": [to_column("reviewid")]}), + ("not_null", {"columns": [to_column("reviewid")]}), + ], + start="2024-07-01", + depends_on=["reviews.staging_reviews"], + enabled=os.environ.get("SNOWFLAKE_ENABLED"), +) +def execute( + context: ExecutionContext, + start: datetime, + end: datetime, + execution_time: datetime, + **kwargs: t.Any, +) -> DataFrame: + print(start, end) + + context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.STAGING_REVIEWS REFRESH") + + df = context.snowpark.table("MULTIENGINE_DB.REVIEWS.STAGING_REVIEWS") + + df = df.filter( + f"(ingestion_timestamp >= '{start}') and (ingestion_timestamp <= '{end}')" + ).select("reviewid", "review", "ingestion_timestamp") + + df = df.withColumn("sentiment", Sentiment(col("review"))) + + df = df.withColumn( + "classification", + Complete( + "llama3-8b", + concat( + lit("Extract author, book and character of the following "), + col("review"), + lit(""". Return only a json with the following format {author: , + book: , character: }. Return only JSON, no verbose text."""), + ), + ), + ) + + # df = df.withColumn("prediction_timestamp", current_timestamp()) + + df = df.select("reviewid", "sentiment", "classification", "ingestion_timestamp") + + schema = pa.schema( + [ + pa.field("REVIEWID", pa.string(), nullable=False), + pa.field("SENTIMENT", pa.float64(), nullable=True), + pa.field("CLASSIFICATION", pa.string(), nullable=True), + pa.field("INGESTION_TIMESTAMP", pa.timestamp("us"), nullable=False), + ] + ) + + catalog = load_catalog( + "glue", + **{ + "type": "glue", + "s3.region": "eu-central-1", + "s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"), + "s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY"), + }, + ) + + # create Iceberg if not exists + tables = catalog.list_tables("multiengine") + if ("multiengine", "predictions") not in tables: + catalog.create_table( + "multiengine.predictions", + schema, + location="s3://sumeo-parquet-data-lake/staging/predictions", + ) + + # append partition to Iceberg table + catalog.load_table("multiengine.predictions").append( + pa.Table.from_pandas(df.to_pandas(), schema=schema) + ) + + context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.PREDICTION REFRESH") + + return df diff --git a/sqlmesh/models/staging_prediction.py b/sqlmesh/models/staging_prediction.py deleted file mode 100644 index f02410b..0000000 --- a/sqlmesh/models/staging_prediction.py +++ /dev/null @@ -1,91 +0,0 @@ -import typing as t -from datetime import datetime -import pyarrow as pa -import pandas as pd -from snowflake.snowpark.dataframe import DataFrame -from snowflake.cortex import Sentiment, Complete -from snowflake.snowpark.functions import col, concat, lit, current_timestamp, parse_json -from sqlmesh.core.model.kind import ModelKindName -from sqlmesh import ExecutionContext, model -from pyiceberg.catalog import load_catalog - -@model( - "reviews.prediction", - kind=dict( - name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, - time_column="ingestion_timestamp" - ), - # cron="@hourly", - columns={ - "reviewid": "string", - "sentiment": "float", - "classification": "variant", - "ingestion_timestamp": "timestamp", - # "prediction_timestamp": "timestamp" - }, - enabled= True, - depends_on=["reviews.staging_reviews"] -) -def execute( - context: ExecutionContext, - start: datetime, - end: datetime, - execution_time: datetime, - **kwargs: t.Any, -) -> DataFrame: - - print(start, end) - - df = context.snowpark.table("REVIEWS.STAGING_REVIEWS").limit(2) - - df = df.filter(f"(ingestion_timestamp >= '{start}') and (ingestion_timestamp <= '{end}')").select("reviewid", "review", "ingestion_timestamp") - - df = df.withColumn( - "sentiment", - Sentiment(col("review")) - ) - - df = df.withColumn( - "classification", - parse_json(Complete( - "llama3-8b", - concat( - lit("Extract author, book and character of the following "), - col("review"), - lit(". Return only a json with the following format {author: , book: , character: }. Return only JSON, no verbose text.") - ) - )) - ) - # df = df.withColumn("prediction_timestamp", current_timestamp()) - - df = df.select( - "reviewid", - "sentiment", - "classification", - "ingestion_timestamp", - # "prediction_timestamp" - ) - - output= pa.Table.from_pandas(df.to_pandas()) - - catalog = load_catalog("glue", **{"type": "glue", - "region_name":"eu-central-1", - "s3.region":"eu-central-1", - }) - - # create Iceberg if not exists - tables = catalog.list_tables("multiengine") - if ("multiengine", "predictions") not in tables: - catalog.create_table( - "multiengine.predictions", - output.schema, - location="s3://sumeo-parquet-data-lake/staging/predictions") - - # append partition to Iceberg table - catalog.load_table("multiengine.predictions").append(output) - - - - context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.PREDICTION REFRESH") - - return df diff --git a/sqlmesh/requirements.txt b/sqlmesh/requirements.txt index c47535f..4e40a29 100644 --- a/sqlmesh/requirements.txt +++ b/sqlmesh/requirements.txt @@ -1,5 +1,5 @@ -sqlmesh[web,github,bigquery,snowflake]==0.105.1 +sqlmesh[web,github,bigquery,snowflake]==0.110.1 pytest==8.1.1 -pyiceberg[glue] -snowflake-snowpark-python -snowflake-ml-python \ No newline at end of file +pyiceberg[glue]==0.6.1 +snowflake-snowpark-python==1.19.0 +snowflake-ml-python==1.5.3 \ No newline at end of file