Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file modified metrics-pipeline/metrics_pipeline/.DS_Store
Binary file not shown.
2 changes: 0 additions & 2 deletions metrics-pipeline/metrics_pipeline/.env

This file was deleted.

4 changes: 4 additions & 0 deletions metrics-pipeline/metrics_pipeline/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
.env
.env*
.venv/
pipeline/**/__pycache__
1 change: 1 addition & 0 deletions metrics-pipeline/metrics_pipeline/.python-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
3.11.6

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

1 change: 0 additions & 1 deletion metrics-pipeline/metrics_pipeline/encoded_creds.txt

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
from dagster_dbt import DbtCliResource
from dagster import Definitions, load_assets_from_modules
from .assets.dbt import licensor_metrics_dbt_assets, licensor_metrics_project
from .assets import bigquery, common, postgres
from .assets import bigquery, common, postgres, redis
from .resources import bigquery_resource

common_assets = load_assets_from_modules([common])
postgres_assets = load_assets_from_modules([postgres])
bigquery_assets = load_assets_from_modules([bigquery])
redis_assets = load_assets_from_modules([redis])

defs = Definitions(
assets=[licensor_metrics_dbt_assets, *common_assets, *postgres_assets, *bigquery_assets],
assets=[licensor_metrics_dbt_assets, *common_assets, *postgres_assets, *bigquery_assets, *redis_assets],
resources={
"dbt": DbtCliResource(project_dir=licensor_metrics_project),
"bigquery": bigquery_resource
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
LATEST_PROCESSED_TIME_PATH = "pipeline/data/state/latest_processed_time.csv"
BATCH_ID_PATH = "pipeline/data/state/batch_id.csv"

PLAYS_FILE_PATH = "pipeline/data/raw/plays.parquet"
LOCATIONS_FILE_PATH = "pipeline/data/raw/locations.parquet"
TRACKS_FILE_PATH = "pipeline/data/raw/tracks.parquet"
62 changes: 62 additions & 0 deletions metrics-pipeline/metrics_pipeline/pipeline/assets/redis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import os
import typing
import pandas as pd
from dagster import asset, MaterializeResult
from loguru import logger
from dagster_gcp import BigQueryResource, BigQueryError
from upstash_redis.errors import UpstashError
from ..resources import redis_resource
from .dbt import licensor_metrics_dbt_assets


def get_metrics(
bigquery: BigQueryResource, table_name: str
) -> typing.Union[pd.DataFrame, None]:
try:
sql: str = f"SELECT * FROM `user_activity.{table_name}`"
df: pd.DataFrame = pd.DataFrame()

with bigquery.get_client() as bqclient:
df: pd.DataFrame = bqclient.query(query=sql).result().to_dataframe()

if df.empty:
raise pd.errors.DataError(f"DataFrame for {table_name} turned out empty!")

return df
except BigQueryError as e:
logger.error(e)
return None


@asset(deps=[licensor_metrics_dbt_assets], group_name="serve")
def redis_cache(bigquery: BigQueryResource) -> MaterializeResult:

marts_tables = [
"mart_country_market_share",
"mart_top_track",
"mart_top_track_per_country",
"mart_total_hours_played",
]

successful_insertions: list[str] = []

for table in marts_tables:
try:
incoming_df = get_metrics(bigquery=bigquery, table_name=table)

df_json = incoming_df.to_json()
insertion_result = redis_resource.set(key=table, value=df_json)
if not insertion_result:
raise UpstashError(f"Error inserting value {table} into the cache.")

successful_insertions.append(table)
except UpstashError as e:
logger.error(e)
return MaterializeResult(
metadata={
"Status": f"Error in {table}",
"Successful Insertions": ",".join(successful_insertions),
}
)

return MaterializeResult(metadata={"Status": "Success"})
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
5941d45a-4d80-41d7-8261-d6e15884c25d
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
,latest_processed_time
0,2025-01-03 12:00:00+00:00
12 changes: 12 additions & 0 deletions metrics-pipeline/metrics_pipeline/pipeline/resources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import os
from dagster import EnvVar
from dagster_gcp import BigQueryResource
from upstash_redis import Redis

bigquery_resource = BigQueryResource(
project="licensor-metrics", gcp_credentials=EnvVar("GOOGLE_CLOUD_API_KEY")
)

redis_resource = Redis(
url=os.environ["UPSTASH_REDIS_URI"], token=os.environ["UPSTASH_REDIS_TOKEN"]
)
6 changes: 3 additions & 3 deletions metrics-pipeline/metrics_pipeline/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[project]
name = "metrics_pipeline"
name = "pipeline"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
Expand All @@ -26,5 +26,5 @@ requires = ["setuptools"]
build-backend = "setuptools.build_meta"

[tool.dagster]
module_name = "metrics_pipeline"
code_location_name = "metrics_pipeline"
module_name = "pipeline"
code_location_name = "pipeline"
1 change: 1 addition & 0 deletions metrics-pipeline/metrics_pipeline/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ pyarrow
connectorx
dagster-gcp
dagster-gcp-pandas
upstash_redis
dbt-core
dbt-bigquery
dbt-postgres
1 change: 1 addition & 0 deletions metrics-pipeline/metrics_pipeline/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"pyarrow",
"connectorx",
"loguru",
"upstash_redis",
"dbt-snowflake<1.9",
"dbt-snowflake<1.9",
"dbt-clickhouse<1.9",
Expand Down
Loading