Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
**/venv
*.db
.env
17 changes: 17 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File",
"type": "debugpy",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal",
"cwd": "${fileDirname}",
"justMyCode": false
}
]
}
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"debug.inlineValues": "on"
}
3 changes: 1 addition & 2 deletions sqlmesh/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -164,5 +164,4 @@ service-account.json

sqlmesh/logs/**

**/venv
config.yaml
**/venv
28 changes: 28 additions & 0 deletions sqlmesh/config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
gateways:
duckdb:
connection:
type: duckdb
database: sqlmesh_multi_engine_demo.db
snowflake:
connection:
type: snowflake
account: {{ env_var('SNOWFLAKE_ACCOUNT') }} # TODO: export your env_var
user: {{ env_var('SNOWFLAKE_USER') }} # TODO: export your env_var
password: {{ env_var('SNOWFLAKE_PASSWORD') }} # TODO: export your env_var
database: {{ env_var('SNOWFLAKE_DATABASE') }} # TODO: export your env
role: {{ env_var('SNOWFLAKE_ROLE') }} # TODO: export your env
warehouse: {{ env_var('SNOWFLAKE_WAREHOUSE') }} # TODO: export your env
state_connection:
type: duckdb
database: snowflake_state.db
model_defaults:
dialect: {{ env_var('DEFAULT_DIALECT') }}

default_gateway: {{ env_var('DEFAULT_GATEWAY') }}

plan:
enable_preview: true

ui:
format_on_save: false

23 changes: 0 additions & 23 deletions sqlmesh/config.yaml.local

This file was deleted.

20 changes: 20 additions & 0 deletions sqlmesh/models/duckdb_only/incremental_example.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
MODEL (
name reviews.incremental_example,
kind INCREMENTAL_BY_TIME_RANGE (
time_column event_date
),
start '2020-01-01',
cron '@daily',
grain (id, event_date),
allow_partials true,
enabled @IF(@gateway='duckdb', True, False)
);

SELECT
id,
item_id,
event_date
FROM
reviews.raw_data_example
WHERE
event_date BETWEEN @start_date AND @end_date;
24 changes: 24 additions & 0 deletions sqlmesh/models/duckdb_only/raw_data_example.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
MODEL (
name reviews.raw_data_example,
kind VIEW,
enabled @IF(@gateway='duckdb', True, False)
);

SELECT
CAST(id AS INTEGER),
CAST(item_id AS VARCHAR),
CAST(event_date AS DATE)
FROM (
VALUES
(1, 'item_1', '2020-01-01'),
(2, 'item_2', '2020-01-02'),
(3, 'item_3', '2020-01-03'),
(4, 'item_4', '2020-01-04'),
(5, 'item_5', '2020-01-05'),
(6, 'item_6', '2020-01-06'),
(7, 'item_7', '2020-01-07'),
(8, 'item_8', '2020-01-08'),
(9, 'item_9', '2020-01-09'),
(10, 'item_10', '2020-01-10')
) AS t(id, item_id, event_date);

Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,28 @@
from datetime import datetime

from sqlmesh import ExecutionContext, model
from sqlglot.expressions import to_column
from pyiceberg.catalog import load_catalog
import os


@model(
"reviews.staging_reviews",
kind="FULL",
cron="*/5 * * * *",
columns={
"reviewid": "string",
"username": "string",
"review": "string",
"ingestion_timestamp": "timestamp",
"source_s3_key": "string",
},
enabled=False #"@IF(@gateway='duckdb', False, True)"
audits=[
("unique_values", {"columns": [to_column("reviewid")]}),
("not_null", {"columns": [to_column("reviewid")]}),
],
start="2024-07-01",
enabled=os.environ.get("DUCKDB_ENABLED"),
)
def execute(
context: ExecutionContext,
Expand All @@ -24,34 +34,46 @@ def execute(
) -> None:
print(start, end)

catalog = load_catalog("glue", **{"type": "glue",
"region_name":"eu-central-1",
"s3.region":"eu-central-1",
})
# aws glue catalog for iceberg is read only
catalog = load_catalog(
"glue",
**{
"type": "glue",
"s3.region": "eu-central-1",
"s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"),
"s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY"),
},
)

# load landing data in duckdb
con = catalog.load_table("multiengine.landing_reviews").scan().to_duckdb(table_name="landing_reviews")
con = (
catalog.load_table("multiengine.landing_reviews")
.scan()
.to_duckdb(table_name="landing_reviews")
)

# compute ouput
# compute output
output = con.execute("""
SELECT
reviewid,
username,
review,
epoch_ms(ingestion_date) as ingestion_timestamp
epoch_ms(ingestion_date) as ingestion_timestamp,
source_s3_key
FROM landing_reviews
QUALIFY
row_number() OVER (PARTITION BY username, review order by ingestion_timestamp) =1;
""").arrow()
# create Iceberg if not exists

# create Iceberg table if not exists
tables = catalog.list_tables("multiengine")
if ("multiengine", "staging_reviews") not in tables:
catalog.create_table(
"multiengine.staging_reviews",
output.schema,
location="s3://sumeo-parquet-data-lake/staging/reviews")

location="s3://sumeo-parquet-data-lake/staging/reviews",
)

# overwrite target Iceberg table
catalog.load_table("multiengine.staging_reviews").overwrite(output)

Expand Down
106 changes: 106 additions & 0 deletions sqlmesh/models/snowflake_only/staging_prediction.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
import typing as t
from datetime import datetime
import pyarrow as pa
import pandas as pd
from snowflake.snowpark.dataframe import DataFrame
from snowflake.cortex import Sentiment, Complete
from snowflake.snowpark.functions import col, concat, lit, current_timestamp, parse_json
from sqlmesh.core.model.kind import ModelKindName
from sqlmesh import ExecutionContext, model
from pyiceberg.catalog import load_catalog
from sqlglot.expressions import to_column
import os


@model(
"reviews.prediction",
kind=dict(
name=ModelKindName.INCREMENTAL_BY_TIME_RANGE, time_column="ingestion_timestamp"
),
cron="*/5 * * * *",
columns={
"reviewid": "string",
"sentiment": "float",
"classification": "string",
"ingestion_timestamp": "timestamp",
},
audits=[
("unique_values", {"columns": [to_column("reviewid")]}),
("not_null", {"columns": [to_column("reviewid")]}),
],
start="2024-07-01",
depends_on=["reviews.staging_reviews"],
enabled=os.environ.get("SNOWFLAKE_ENABLED"),
)
def execute(
context: ExecutionContext,
start: datetime,
end: datetime,
execution_time: datetime,
**kwargs: t.Any,
) -> DataFrame:
print(start, end)

context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.STAGING_REVIEWS REFRESH")

df = context.snowpark.table("MULTIENGINE_DB.REVIEWS.STAGING_REVIEWS")

df = df.filter(
f"(ingestion_timestamp >= '{start}') and (ingestion_timestamp <= '{end}')"
).select("reviewid", "review", "ingestion_timestamp")

df = df.withColumn("sentiment", Sentiment(col("review")))

df = df.withColumn(
"classification",
Complete(
"llama3-8b",
concat(
lit("Extract author, book and character of the following <quote>"),
col("review"),
lit("""</quote>. Return only a json with the following format {author: <author>,
book: <book>, character: <character>}. Return only JSON, no verbose text."""),
),
),
)

# df = df.withColumn("prediction_timestamp", current_timestamp())

df = df.select("reviewid", "sentiment", "classification", "ingestion_timestamp")

schema = pa.schema(
[
pa.field("REVIEWID", pa.string(), nullable=False),
pa.field("SENTIMENT", pa.float64(), nullable=True),
pa.field("CLASSIFICATION", pa.string(), nullable=True),
pa.field("INGESTION_TIMESTAMP", pa.timestamp("us"), nullable=False),
]
)

catalog = load_catalog(
"glue",
**{
"type": "glue",
"s3.region": "eu-central-1",
"s3.access-key-id": os.environ.get("AWS_ACCESS_KEY_ID"),
"s3.secret-access-key": os.environ.get("AWS_SECRET_ACCESS_KEY"),
},
)

# create Iceberg if not exists
tables = catalog.list_tables("multiengine")
if ("multiengine", "predictions") not in tables:
catalog.create_table(
"multiengine.predictions",
schema,
location="s3://sumeo-parquet-data-lake/staging/predictions",
)

# append partition to Iceberg table
catalog.load_table("multiengine.predictions").append(
pa.Table.from_pandas(df.to_pandas(), schema=schema)
)

context.snowpark.sql("ALTER ICEBERG TABLE REVIEWS.PREDICTION REFRESH")

return df
Loading