diff --git a/api/target/bg-jobs/sbt_58a96643/job-9/target/0f7d0e36/fdb70ed2/scala-spark-template_2.12-1.0.jar b/api/target/bg-jobs/sbt_58a96643/job-9/target/0f7d0e36/fdb70ed2/scala-spark-template_2.12-1.0.jar new file mode 100644 index 0000000..e3fb8a5 Binary files /dev/null and b/api/target/bg-jobs/sbt_58a96643/job-9/target/0f7d0e36/fdb70ed2/scala-spark-template_2.12-1.0.jar differ diff --git a/api/target/bg-jobs/sbt_6d243890/job-9/target/0f7d0e36/fdb70ed2/scala-spark-template_2.12-1.0.jar b/api/target/bg-jobs/sbt_6d243890/job-9/target/0f7d0e36/fdb70ed2/scala-spark-template_2.12-1.0.jar new file mode 100644 index 0000000..e3fb8a5 Binary files /dev/null and b/api/target/bg-jobs/sbt_6d243890/job-9/target/0f7d0e36/fdb70ed2/scala-spark-template_2.12-1.0.jar differ diff --git a/metrics-pipeline/metrics_pipeline/.DS_Store b/metrics-pipeline/metrics_pipeline/.DS_Store index d22def3..fa68957 100644 Binary files a/metrics-pipeline/metrics_pipeline/.DS_Store and b/metrics-pipeline/metrics_pipeline/.DS_Store differ diff --git a/metrics-pipeline/metrics_pipeline/.env b/metrics-pipeline/metrics_pipeline/.env deleted file mode 100644 index dbfa6eb..0000000 --- a/metrics-pipeline/metrics_pipeline/.env +++ /dev/null @@ -1,2 +0,0 @@ -POSTGRES_URI="postgresql://neondb_owner:Y4G0PaUwdkgH@ep-orange-darkness-a2al3uis.eu-central-1.aws.neon.tech/neondb?sslmode=require" -GOOGLE_CLOUD_API_KEY="ewogICJ0eXBlIjogInNlcnZpY2VfYWNjb3VudCIsCiAgInByb2plY3RfaWQiOiAibGljZW5zb3ItbWV0cmljcyIsCiAgInByaXZhdGVfa2V5X2lkIjogIjM4YmJmYzJkZDU3NjNhNWQzNzQ1NWRiOWIxMTc4Nzk2NTczMmM4ZTUiLAogICJwcml2YXRlX2tleSI6ICItLS0tLUJFR0lOIFBSSVZBVEUgS0VZLS0tLS1cbk1JSUV2d0lCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktrd2dnU2xBZ0VBQW9JQkFRQ3JKSlk2TjhBcVpVNXpcbjRWSko1Y2dwaXdNaytzM2NqMGJVajZRV3ROSVRuQUZlM2p5ZVNoeE1wYWlkbW85ZWNiNkZXYlZrVWRyNDRoTktcbmF0UmZ2MmEvSisrRXRza3c3ZG55RjVGdVcyaWxxRS90MDVhdnM4VXphb3NGSS9VZUtBMzY5T0lLMWllSGhWcGFcbnFSaTlnUWFuU3dPVWJUU1V1aUhNUzZpdFdRcCtUNzZia3hwZXFuL0tJbmZQTmR1QWRwaDRwd0JJNUQ3b28rbVpcbmJhZWdXc01QMGdqMGZaQzR3a3g4bUdiWW1JTnF6MEpaRDIrSTRnUEFvTDNDSU11dkxRT2pXQUdFQi81RHhSRHpcblhWbSswNmtHS1RXNXRpSjFtUGMyRmw2UmRnZG9Zb3dENzExWWpHOUF0eTRJSmU0Wk1uOENHcnVHRVh5Z0VZeUJcbllkSnNuckJ0QWdNQkFBRUNnZ0VBUFBiY0F1SVFOS29Bejl1d2RmMkNGM1piQ09YRXhuQk5jWE8zcHBVRURwTXdcbms4eER1TGs2cW4wS2V4WHZRMXZ1Vzg5UWcrdDZ2dFM5dlhFZnRQbllEbmtaNCtNKytReUphaWxHRjVNOVRHR1FcbkUyNmZpM2hVNDVOczhVaFZUVWhFYnFTa0R1c3BpcWVRdjZIajI4R21EcHIwcS85WXp6QWMwUXNVaTZlZnlzMWpcbjc4aFRBMmRablJuMU5jK3lIVDJEUjRVSnVHTk9QdDlINHpZM0VIMUVaam80SThueXF4TUxsbUVZbWxXbG9VeEdcbnlVMEk3MzU4OGw0TGVGaHkxUEhzT0JGK0M4ZldqOWJSMTBrTXU1UytoYWJuajYxcTJZN3hwS2tOS3psOEQvdkNcbkp5Y2hTZFduSzVpb2UzVHdFN0dka2UvTWQ0ZzB6Nk1USFNzTWwzZllxd0tCZ1FEdTlIL05pZGZYNURkcXQ0enFcbjhFR3BYTU1WaE9nQUdHRU5XYWlzekxkeVVVcEZIQk5NNVl2ejV0ZWxSeXFXWHV0Q09rV0l5Y1ZMOURUamtwYXNcbjd1OVJHNTlRZ29WVHM2UVVPZnhJR2REclNOMGpyeTdzNzErY2U2MXAvaEFFZHVMYzV5K1RiYjRqM2taTXdOWWVcbmFlTXYwSXZlTEhOMDVHMDdhZW5XOUtLaVR3S0JnUUMzV2NsRWNnL0xnK2xOM3BGbkVnaXVnblJnYnhhcENWSXFcblp0eHdHazJHd1lSbVpxWituS3FmNllvNVRhNDFGSytubkJSUlMyY2dYbjdoTEF3L2psZ2ZlWDhoazY5Q2tyN1hcbmxRWUlDWVJtQ3RTOVFOdW1kZDcyTzFUcTIyelJiQ3lVTXF3STN5cVh3Z3E0K25MUVBodlJKR1Vwd1dsTnBZWjRcbmlGMUhKMkcrZ3dLQmdRRGkyM2IySUxhdGZUbnJjK1V5S3ZSN29OaUk5b3YzZ3kyb3FnVk1RSEtzOG1ZTFpKTWlcblB3OTJlQ01sQTRKNERoZFY0ZEtnQWNid3pDRG1LUVlwbWhIVU0rQTFaQ1RHQ1ZkdDZIc056SXlldzZkR3VJdFpcbllBeXFtSFJUbTJPRGlZYnA2QVl2OWxFODVrcGhsZlh5Rzk4WFJ1dHkrMHFGb0ZQZnd6YkEzN0lEV3dLQmdRQ0JcbmR3RExOODgzWVZtb0JuU2RWdnFTWHNOV0pKclhtU0ZQbDNvb0hpcUg4TFZRcVVML1BCaXUrZlVFS1huTk9XSWVcbjgvTFV4RzE1U2NCRnR4aWUrQi81ZVl6dlpKem9ZVDRvYzYzaWx2WEtKL08zL3NnYWJqaVZuYWVFZnBRRC9HSXRcbjh1blBDZVhGYXRxdmF3a1BTZ3ZTVGVTdzYwVXo4cUZWRXRUcW5VQkRpUUtCZ1FEbFR6cyswMlVNY25SYW5PU29cblhzNjV5VjFiYUJDUnJaL2ZNak55ZjZhSVd6MnR2OXV5VUVTa0pxR0EzMVVsdHNWUXZvSzVaNzc4UjgvQVdxM0hcbkV3a2pqVExZMVRibUhUTU9qaUZOY08yWHowbWZRekdmMjlZMUJ1UVZheDV4S2syR0RiUVU3NFJISm1mSmkxYVVcbk01NHg3bjVtVEw0TW8zTytxRE5qOFEydEhBPT1cbi0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS1cbiIsCiAgImNsaWVudF9lbWFpbCI6ICI4MzE4Mjg2NjAzMTYtY29tcHV0ZUBkZXZlbG9wZXIuZ3NlcnZpY2VhY2NvdW50LmNvbSIsCiAgImNsaWVudF9pZCI6ICIxMDc2NTE1NDQ0MDUwODU4NzAzMTUiLAogICJhdXRoX3VyaSI6ICJodHRwczovL2FjY291bnRzLmdvb2dsZS5jb20vby9vYXV0aDIvYXV0aCIsCiAgInRva2VuX3VyaSI6ICJodHRwczovL29hdXRoMi5nb29nbGVhcGlzLmNvbS90b2tlbiIsCiAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICJjbGllbnRfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9yb2JvdC92MS9tZXRhZGF0YS94NTA5LzgzMTgyODY2MDMxNi1jb21wdXRlJTQwZGV2ZWxvcGVyLmdzZXJ2aWNlYWNjb3VudC5jb20iLAogICJ1bml2ZXJzZV9kb21haW4iOiAiZ29vZ2xlYXBpcy5jb20iCn0K" \ No newline at end of file diff --git a/metrics-pipeline/metrics_pipeline/.gitignore b/metrics-pipeline/metrics_pipeline/.gitignore new file mode 100644 index 0000000..0d70d30 --- /dev/null +++ b/metrics-pipeline/metrics_pipeline/.gitignore @@ -0,0 +1,4 @@ +.env +.env* +.venv/ +pipeline/**/__pycache__ diff --git a/metrics-pipeline/metrics_pipeline/.python-version b/metrics-pipeline/metrics_pipeline/.python-version new file mode 100644 index 0000000..375f5ca --- /dev/null +++ b/metrics-pipeline/metrics_pipeline/.python-version @@ -0,0 +1 @@ +3.11.6 diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/assets/constants.py b/metrics-pipeline/metrics_pipeline/dagster_def/assets/constants.py deleted file mode 100644 index 0b2e860..0000000 --- a/metrics-pipeline/metrics_pipeline/dagster_def/assets/constants.py +++ /dev/null @@ -1,6 +0,0 @@ -LATEST_PROCESSED_TIME_PATH = "metrics_pipeline/data/state/latest_processed_time.csv" -BATCH_ID_PATH = "metrics_pipeline/data/state/batch_id.csv" - -PLAYS_FILE_PATH = "metrics_pipeline/data/raw/plays.parquet" -LOCATIONS_FILE_PATH = "metrics_pipeline/data/raw/locations.parquet" -TRACKS_FILE_PATH = "metrics_pipeline/data/raw/tracks.parquet" \ No newline at end of file diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/data/state/batch_id.csv b/metrics-pipeline/metrics_pipeline/dagster_def/data/state/batch_id.csv deleted file mode 100644 index 88c84a7..0000000 --- a/metrics-pipeline/metrics_pipeline/dagster_def/data/state/batch_id.csv +++ /dev/null @@ -1 +0,0 @@ -721caf41-6fec-49b2-9599-52bad8f27722 \ No newline at end of file diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/data/state/latest_processed_time.csv b/metrics-pipeline/metrics_pipeline/dagster_def/data/state/latest_processed_time.csv deleted file mode 100644 index 37c3d3e..0000000 --- a/metrics-pipeline/metrics_pipeline/dagster_def/data/state/latest_processed_time.csv +++ /dev/null @@ -1,2 +0,0 @@ -,latest_processed_time -0,2025-01-01 15:00:00+00:00 diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/resources/__init__.py b/metrics-pipeline/metrics_pipeline/dagster_def/resources/__init__.py deleted file mode 100644 index 410a693..0000000 --- a/metrics-pipeline/metrics_pipeline/dagster_def/resources/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from dagster import EnvVar -from dagster_gcp import BigQueryResource - -bigquery_resource = BigQueryResource( - project="licensor-metrics", - gcp_credentials=EnvVar("GOOGLE_CLOUD_API_KEY") -) \ No newline at end of file diff --git a/metrics-pipeline/metrics_pipeline/encoded_creds.txt b/metrics-pipeline/metrics_pipeline/encoded_creds.txt deleted file mode 100644 index 41c8e97..0000000 --- a/metrics-pipeline/metrics_pipeline/encoded_creds.txt +++ /dev/null @@ -1 +0,0 @@ -ewogICJ0eXBlIjogInNlcnZpY2VfYWNjb3VudCIsCiAgInByb2plY3RfaWQiOiAibGljZW5zb3ItbWV0cmljcyIsCiAgInByaXZhdGVfa2V5X2lkIjogIjM4YmJmYzJkZDU3NjNhNWQzNzQ1NWRiOWIxMTc4Nzk2NTczMmM4ZTUiLAogICJwcml2YXRlX2tleSI6ICItLS0tLUJFR0lOIFBSSVZBVEUgS0VZLS0tLS1cbk1JSUV2d0lCQURBTkJna3Foa2lHOXcwQkFRRUZBQVNDQktrd2dnU2xBZ0VBQW9JQkFRQ3JKSlk2TjhBcVpVNXpcbjRWSko1Y2dwaXdNaytzM2NqMGJVajZRV3ROSVRuQUZlM2p5ZVNoeE1wYWlkbW85ZWNiNkZXYlZrVWRyNDRoTktcbmF0UmZ2MmEvSisrRXRza3c3ZG55RjVGdVcyaWxxRS90MDVhdnM4VXphb3NGSS9VZUtBMzY5T0lLMWllSGhWcGFcbnFSaTlnUWFuU3dPVWJUU1V1aUhNUzZpdFdRcCtUNzZia3hwZXFuL0tJbmZQTmR1QWRwaDRwd0JJNUQ3b28rbVpcbmJhZWdXc01QMGdqMGZaQzR3a3g4bUdiWW1JTnF6MEpaRDIrSTRnUEFvTDNDSU11dkxRT2pXQUdFQi81RHhSRHpcblhWbSswNmtHS1RXNXRpSjFtUGMyRmw2UmRnZG9Zb3dENzExWWpHOUF0eTRJSmU0Wk1uOENHcnVHRVh5Z0VZeUJcbllkSnNuckJ0QWdNQkFBRUNnZ0VBUFBiY0F1SVFOS29Bejl1d2RmMkNGM1piQ09YRXhuQk5jWE8zcHBVRURwTXdcbms4eER1TGs2cW4wS2V4WHZRMXZ1Vzg5UWcrdDZ2dFM5dlhFZnRQbllEbmtaNCtNKytReUphaWxHRjVNOVRHR1FcbkUyNmZpM2hVNDVOczhVaFZUVWhFYnFTa0R1c3BpcWVRdjZIajI4R21EcHIwcS85WXp6QWMwUXNVaTZlZnlzMWpcbjc4aFRBMmRablJuMU5jK3lIVDJEUjRVSnVHTk9QdDlINHpZM0VIMUVaam80SThueXF4TUxsbUVZbWxXbG9VeEdcbnlVMEk3MzU4OGw0TGVGaHkxUEhzT0JGK0M4ZldqOWJSMTBrTXU1UytoYWJuajYxcTJZN3hwS2tOS3psOEQvdkNcbkp5Y2hTZFduSzVpb2UzVHdFN0dka2UvTWQ0ZzB6Nk1USFNzTWwzZllxd0tCZ1FEdTlIL05pZGZYNURkcXQ0enFcbjhFR3BYTU1WaE9nQUdHRU5XYWlzekxkeVVVcEZIQk5NNVl2ejV0ZWxSeXFXWHV0Q09rV0l5Y1ZMOURUamtwYXNcbjd1OVJHNTlRZ29WVHM2UVVPZnhJR2REclNOMGpyeTdzNzErY2U2MXAvaEFFZHVMYzV5K1RiYjRqM2taTXdOWWVcbmFlTXYwSXZlTEhOMDVHMDdhZW5XOUtLaVR3S0JnUUMzV2NsRWNnL0xnK2xOM3BGbkVnaXVnblJnYnhhcENWSXFcblp0eHdHazJHd1lSbVpxWituS3FmNllvNVRhNDFGSytubkJSUlMyY2dYbjdoTEF3L2psZ2ZlWDhoazY5Q2tyN1hcbmxRWUlDWVJtQ3RTOVFOdW1kZDcyTzFUcTIyelJiQ3lVTXF3STN5cVh3Z3E0K25MUVBodlJKR1Vwd1dsTnBZWjRcbmlGMUhKMkcrZ3dLQmdRRGkyM2IySUxhdGZUbnJjK1V5S3ZSN29OaUk5b3YzZ3kyb3FnVk1RSEtzOG1ZTFpKTWlcblB3OTJlQ01sQTRKNERoZFY0ZEtnQWNid3pDRG1LUVlwbWhIVU0rQTFaQ1RHQ1ZkdDZIc056SXlldzZkR3VJdFpcbllBeXFtSFJUbTJPRGlZYnA2QVl2OWxFODVrcGhsZlh5Rzk4WFJ1dHkrMHFGb0ZQZnd6YkEzN0lEV3dLQmdRQ0JcbmR3RExOODgzWVZtb0JuU2RWdnFTWHNOV0pKclhtU0ZQbDNvb0hpcUg4TFZRcVVML1BCaXUrZlVFS1huTk9XSWVcbjgvTFV4RzE1U2NCRnR4aWUrQi81ZVl6dlpKem9ZVDRvYzYzaWx2WEtKL08zL3NnYWJqaVZuYWVFZnBRRC9HSXRcbjh1blBDZVhGYXRxdmF3a1BTZ3ZTVGVTdzYwVXo4cUZWRXRUcW5VQkRpUUtCZ1FEbFR6cyswMlVNY25SYW5PU29cblhzNjV5VjFiYUJDUnJaL2ZNak55ZjZhSVd6MnR2OXV5VUVTa0pxR0EzMVVsdHNWUXZvSzVaNzc4UjgvQVdxM0hcbkV3a2pqVExZMVRibUhUTU9qaUZOY08yWHowbWZRekdmMjlZMUJ1UVZheDV4S2syR0RiUVU3NFJISm1mSmkxYVVcbk01NHg3bjVtVEw0TW8zTytxRE5qOFEydEhBPT1cbi0tLS0tRU5EIFBSSVZBVEUgS0VZLS0tLS1cbiIsCiAgImNsaWVudF9lbWFpbCI6ICI4MzE4Mjg2NjAzMTYtY29tcHV0ZUBkZXZlbG9wZXIuZ3NlcnZpY2VhY2NvdW50LmNvbSIsCiAgImNsaWVudF9pZCI6ICIxMDc2NTE1NDQ0MDUwODU4NzAzMTUiLAogICJhdXRoX3VyaSI6ICJodHRwczovL2FjY291bnRzLmdvb2dsZS5jb20vby9vYXV0aDIvYXV0aCIsCiAgInRva2VuX3VyaSI6ICJodHRwczovL29hdXRoMi5nb29nbGVhcGlzLmNvbS90b2tlbiIsCiAgImF1dGhfcHJvdmlkZXJfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9vYXV0aDIvdjEvY2VydHMiLAogICJjbGllbnRfeDUwOV9jZXJ0X3VybCI6ICJodHRwczovL3d3dy5nb29nbGVhcGlzLmNvbS9yb2JvdC92MS9tZXRhZGF0YS94NTA5LzgzMTgyODY2MDMxNi1jb21wdXRlJTQwZGV2ZWxvcGVyLmdzZXJ2aWNlYWNjb3VudC5jb20iLAogICJ1bml2ZXJzZV9kb21haW4iOiAiZ29vZ2xlYXBpcy5jb20iCn0K diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/__init__.py b/metrics-pipeline/metrics_pipeline/pipeline/__init__.py similarity index 80% rename from metrics-pipeline/metrics_pipeline/dagster_def/__init__.py rename to metrics-pipeline/metrics_pipeline/pipeline/__init__.py index d104776..8895e54 100644 --- a/metrics-pipeline/metrics_pipeline/dagster_def/__init__.py +++ b/metrics-pipeline/metrics_pipeline/pipeline/__init__.py @@ -1,15 +1,16 @@ from dagster_dbt import DbtCliResource from dagster import Definitions, load_assets_from_modules from .assets.dbt import licensor_metrics_dbt_assets, licensor_metrics_project -from .assets import bigquery, common, postgres +from .assets import bigquery, common, postgres, redis from .resources import bigquery_resource common_assets = load_assets_from_modules([common]) postgres_assets = load_assets_from_modules([postgres]) bigquery_assets = load_assets_from_modules([bigquery]) +redis_assets = load_assets_from_modules([redis]) defs = Definitions( - assets=[licensor_metrics_dbt_assets, *common_assets, *postgres_assets, *bigquery_assets], + assets=[licensor_metrics_dbt_assets, *common_assets, *postgres_assets, *bigquery_assets, *redis_assets], resources={ "dbt": DbtCliResource(project_dir=licensor_metrics_project), "bigquery": bigquery_resource diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/assets/__init__.py b/metrics-pipeline/metrics_pipeline/pipeline/assets/__init__.py similarity index 100% rename from metrics-pipeline/metrics_pipeline/dagster_def/assets/__init__.py rename to metrics-pipeline/metrics_pipeline/pipeline/assets/__init__.py diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/assets/bigquery.py b/metrics-pipeline/metrics_pipeline/pipeline/assets/bigquery.py similarity index 100% rename from metrics-pipeline/metrics_pipeline/dagster_def/assets/bigquery.py rename to metrics-pipeline/metrics_pipeline/pipeline/assets/bigquery.py diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/assets/common.py b/metrics-pipeline/metrics_pipeline/pipeline/assets/common.py similarity index 100% rename from metrics-pipeline/metrics_pipeline/dagster_def/assets/common.py rename to metrics-pipeline/metrics_pipeline/pipeline/assets/common.py diff --git a/metrics-pipeline/metrics_pipeline/pipeline/assets/constants.py b/metrics-pipeline/metrics_pipeline/pipeline/assets/constants.py new file mode 100644 index 0000000..bb73e3c --- /dev/null +++ b/metrics-pipeline/metrics_pipeline/pipeline/assets/constants.py @@ -0,0 +1,6 @@ +LATEST_PROCESSED_TIME_PATH = "pipeline/data/state/latest_processed_time.csv" +BATCH_ID_PATH = "pipeline/data/state/batch_id.csv" + +PLAYS_FILE_PATH = "pipeline/data/raw/plays.parquet" +LOCATIONS_FILE_PATH = "pipeline/data/raw/locations.parquet" +TRACKS_FILE_PATH = "pipeline/data/raw/tracks.parquet" \ No newline at end of file diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/assets/dbt.py b/metrics-pipeline/metrics_pipeline/pipeline/assets/dbt.py similarity index 100% rename from metrics-pipeline/metrics_pipeline/dagster_def/assets/dbt.py rename to metrics-pipeline/metrics_pipeline/pipeline/assets/dbt.py diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/assets/postgres.py b/metrics-pipeline/metrics_pipeline/pipeline/assets/postgres.py similarity index 100% rename from metrics-pipeline/metrics_pipeline/dagster_def/assets/postgres.py rename to metrics-pipeline/metrics_pipeline/pipeline/assets/postgres.py diff --git a/metrics-pipeline/metrics_pipeline/pipeline/assets/redis.py b/metrics-pipeline/metrics_pipeline/pipeline/assets/redis.py new file mode 100644 index 0000000..6d0f833 --- /dev/null +++ b/metrics-pipeline/metrics_pipeline/pipeline/assets/redis.py @@ -0,0 +1,62 @@ +import os +import typing +import pandas as pd +from dagster import asset, MaterializeResult +from loguru import logger +from dagster_gcp import BigQueryResource, BigQueryError +from upstash_redis.errors import UpstashError +from ..resources import redis_resource +from .dbt import licensor_metrics_dbt_assets + + +def get_metrics( + bigquery: BigQueryResource, table_name: str +) -> typing.Union[pd.DataFrame, None]: + try: + sql: str = f"SELECT * FROM `user_activity.{table_name}`" + df: pd.DataFrame = pd.DataFrame() + + with bigquery.get_client() as bqclient: + df: pd.DataFrame = bqclient.query(query=sql).result().to_dataframe() + + if df.empty: + raise pd.errors.DataError(f"DataFrame for {table_name} turned out empty!") + + return df + except BigQueryError as e: + logger.error(e) + return None + + +@asset(deps=[licensor_metrics_dbt_assets], group_name="serve") +def redis_cache(bigquery: BigQueryResource) -> MaterializeResult: + + marts_tables = [ + "mart_country_market_share", + "mart_top_track", + "mart_top_track_per_country", + "mart_total_hours_played", + ] + + successful_insertions: list[str] = [] + + for table in marts_tables: + try: + incoming_df = get_metrics(bigquery=bigquery, table_name=table) + + df_json = incoming_df.to_json() + insertion_result = redis_resource.set(key=table, value=df_json) + if not insertion_result: + raise UpstashError(f"Error inserting value {table} into the cache.") + + successful_insertions.append(table) + except UpstashError as e: + logger.error(e) + return MaterializeResult( + metadata={ + "Status": f"Error in {table}", + "Successful Insertions": ",".join(successful_insertions), + } + ) + + return MaterializeResult(metadata={"Status": "Success"}) diff --git a/metrics-pipeline/metrics_pipeline/pipeline/data/state/batch_id.csv b/metrics-pipeline/metrics_pipeline/pipeline/data/state/batch_id.csv new file mode 100644 index 0000000..9d39be2 --- /dev/null +++ b/metrics-pipeline/metrics_pipeline/pipeline/data/state/batch_id.csv @@ -0,0 +1 @@ +5941d45a-4d80-41d7-8261-d6e15884c25d \ No newline at end of file diff --git a/metrics-pipeline/metrics_pipeline/pipeline/data/state/latest_processed_time.csv b/metrics-pipeline/metrics_pipeline/pipeline/data/state/latest_processed_time.csv new file mode 100644 index 0000000..fd63aee --- /dev/null +++ b/metrics-pipeline/metrics_pipeline/pipeline/data/state/latest_processed_time.csv @@ -0,0 +1,2 @@ +,latest_processed_time +0,2025-01-03 12:00:00+00:00 diff --git a/metrics-pipeline/metrics_pipeline/pipeline/resources/__init__.py b/metrics-pipeline/metrics_pipeline/pipeline/resources/__init__.py new file mode 100644 index 0000000..4cd9c6b --- /dev/null +++ b/metrics-pipeline/metrics_pipeline/pipeline/resources/__init__.py @@ -0,0 +1,12 @@ +import os +from dagster import EnvVar +from dagster_gcp import BigQueryResource +from upstash_redis import Redis + +bigquery_resource = BigQueryResource( + project="licensor-metrics", gcp_credentials=EnvVar("GOOGLE_CLOUD_API_KEY") +) + +redis_resource = Redis( + url=os.environ["UPSTASH_REDIS_URI"], token=os.environ["UPSTASH_REDIS_TOKEN"] +) diff --git a/metrics-pipeline/metrics_pipeline/dagster_def/schedules/__init__.py b/metrics-pipeline/metrics_pipeline/pipeline/schedules/__init__.py similarity index 100% rename from metrics-pipeline/metrics_pipeline/dagster_def/schedules/__init__.py rename to metrics-pipeline/metrics_pipeline/pipeline/schedules/__init__.py diff --git a/metrics-pipeline/metrics_pipeline/pyproject.toml b/metrics-pipeline/metrics_pipeline/pyproject.toml index cd1eafc..402ecf2 100644 --- a/metrics-pipeline/metrics_pipeline/pyproject.toml +++ b/metrics-pipeline/metrics_pipeline/pyproject.toml @@ -1,5 +1,5 @@ [project] -name = "metrics_pipeline" +name = "pipeline" version = "0.1.0" description = "Add your description here" readme = "README.md" @@ -26,5 +26,5 @@ requires = ["setuptools"] build-backend = "setuptools.build_meta" [tool.dagster] -module_name = "metrics_pipeline" -code_location_name = "metrics_pipeline" \ No newline at end of file +module_name = "pipeline" +code_location_name = "pipeline" \ No newline at end of file diff --git a/metrics-pipeline/metrics_pipeline/requirements.txt b/metrics-pipeline/metrics_pipeline/requirements.txt index aa86c7b..727e097 100644 --- a/metrics-pipeline/metrics_pipeline/requirements.txt +++ b/metrics-pipeline/metrics_pipeline/requirements.txt @@ -7,6 +7,7 @@ pyarrow connectorx dagster-gcp dagster-gcp-pandas +upstash_redis dbt-core dbt-bigquery dbt-postgres \ No newline at end of file diff --git a/metrics-pipeline/metrics_pipeline/setup.py b/metrics-pipeline/metrics_pipeline/setup.py index e9bf32b..d5d2abb 100644 --- a/metrics-pipeline/metrics_pipeline/setup.py +++ b/metrics-pipeline/metrics_pipeline/setup.py @@ -19,6 +19,7 @@ "pyarrow", "connectorx", "loguru", + "upstash_redis", "dbt-snowflake<1.9", "dbt-snowflake<1.9", "dbt-clickhouse<1.9",