From 509fa6293b4eecb037bb875cb2ff955d3e630724 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 20 Jan 2025 14:43:08 +0000 Subject: [PATCH 1/8] NRL-1268 update flatten func to traverse schema for all nested structs --- .../modules/glue/src/transformations.py | 25 +++++++++++++------ 1 file changed, 17 insertions(+), 8 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 64bb4abe6..625e26475 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -1,4 +1,4 @@ -from pyspark.sql.functions import to_timestamp +from pyspark.sql.functions import col, to_timestamp from pyspark.sql.types import ( BooleanType, StringType, @@ -60,13 +60,22 @@ def flatten_df(df): - cols = [] - for c in df.dtypes: - if "struct" in c[1]: - nested_col = c[0] - else: - cols.append(c[0]) - return df.select(*cols, f"{nested_col}.*") + def flatten(schema, prefix=""): + """ + Recursively traverse the schema to extract all nested fields. + """ + fields = [] + for field in schema.fields: + name = f"{prefix}.{field.name}" if prefix else field.name + if isinstance(field.dataType, StructType): + fields += flatten(field.dataType, name) + else: + fields.append((name, field.name)) + return fields + + flat_columns = flatten(df.schema) + + return df.select([col(c).alias(n) for c, n in flat_columns]) def dtype_conversion(df): From 0266f6e32d13d78be8c93583c055ada94d1b8198 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 20 Jan 2025 15:39:50 +0000 Subject: [PATCH 2/8] NRL-1268 update naimg convention of cols --- .../modules/glue/src/transformations.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 625e26475..238971c52 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -70,7 +70,8 @@ def flatten(schema, prefix=""): if isinstance(field.dataType, StructType): fields += flatten(field.dataType, name) else: - fields.append((name, field.name)) + alias_name = name.replace(".", "_") + fields.append((name, alias_name)) return fields flat_columns = flatten(df.schema) From 27c717b28f19cd449ec667179e1aebc58527865f Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 20 Jan 2025 15:44:25 +0000 Subject: [PATCH 3/8] NRL-1268 update naimg convention of cols --- .../modules/glue/src/transformations.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 238971c52..f401bb17a 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -81,6 +81,7 @@ def flatten(schema, prefix=""): def dtype_conversion(df): df = df.withColumn( - "timestamp", to_timestamp(df["timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX") + "event_timestamp", + to_timestamp(df["event_timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX"), ) return df From bc01aeaae339541fd6f10dc91b02727ad111964d Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 20 Jan 2025 16:13:27 +0000 Subject: [PATCH 4/8] NRL-1268 auto trigger crawler --- .../modules/glue/src/main.py | 1 + .../modules/glue/src/pipeline.py | 16 ++++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 416cef5ef..e6012caf6 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -19,6 +19,7 @@ source_path=args["source_path"], target_path=args["target_path"], schema=logSchema, + job_name=args["job_name"], partition_cols=partition_cols, transformations=[flatten_df, dtype_conversion], ) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index e1ba22215..81155fcb7 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -1,3 +1,4 @@ +import boto3 from instances import GlueContextSingleton, LoggerSingleton @@ -8,6 +9,7 @@ def __init__( source_path, target_path, schema, + job_name, partition_cols=[], transformations=[], ): @@ -20,6 +22,12 @@ def __init__( self.schema = schema self.partition_cols = partition_cols self.transformations = transformations + self.glue = boto3.client( + service_name="glue", + region_name="eu-west-2", + endpoint_url="your-endpoint-url", + ) + self.name_prefix = "-".join(job_name.split("-")[:3]) def run(self): """Runs ETL""" @@ -31,6 +39,8 @@ def run(self): self.logger.info("Data transformed successfully.") self.load(df) self.logger.info(f"Data loaded into {self.target_path}.") + self.logger.info("Trigger glue crawler") + self.trigger_crawler() except Exception as e: self.logger.error(f"ETL process failed: {e}") raise e @@ -57,3 +67,9 @@ def load(self, dataframe): dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet( self.target_path ) + + def trigger_crawler(self): + try: + self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") + except Exception as e: + raise e From fabb03f1125961481a127658906660949a6dc4eb Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 20 Jan 2025 16:22:26 +0000 Subject: [PATCH 5/8] NRL-1268 auto trigger crawler --- .../account-wide-infrastructure/modules/glue/src/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 81155fcb7..89b89a606 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -25,7 +25,7 @@ def __init__( self.glue = boto3.client( service_name="glue", region_name="eu-west-2", - endpoint_url="your-endpoint-url", + endpoint_url="https://glue.eu-west-2.amazonaws.com", ) self.name_prefix = "-".join(job_name.split("-")[:3]) From 8896c052b4f52be50cada5d6e31d7269dc96f6dd Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 20 Jan 2025 16:24:10 +0000 Subject: [PATCH 6/8] NRL-1268 remove unnecessary try except --- .../account-wide-infrastructure/modules/glue/src/pipeline.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 89b89a606..d035e7a3a 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -69,7 +69,4 @@ def load(self, dataframe): ) def trigger_crawler(self): - try: - self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") - except Exception as e: - raise e + self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") From 3ad2c3cac3f272d54ddb15beccaf0091a2ded43b Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 20 Jan 2025 17:00:09 +0000 Subject: [PATCH 7/8] NRL-1268 crawler auto start configured --- terraform/account-wide-infrastructure/modules/glue/glue.tf | 2 +- .../account-wide-infrastructure/modules/glue/src/pipeline.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index b2c4e262e..c41efb538 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -51,7 +51,7 @@ resource "aws_glue_job" "glue_job" { "--datalake-formats" = "delta" "--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path "--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path - "--job_name" = "poc-glue-job" + "--job_name" = "${var.name_prefix}-glue-job" "--enable-continuous-log-filter" = "true" "--enable-metrics" = "true" "--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip" diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index d035e7a3a..2fb30ff91 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -27,7 +27,7 @@ def __init__( region_name="eu-west-2", endpoint_url="https://glue.eu-west-2.amazonaws.com", ) - self.name_prefix = "-".join(job_name.split("-")[:3]) + self.name_prefix = "-".join(job_name.split("-")[:4]) def run(self): """Runs ETL""" From 47205c1bab00a702de4492e0f91fcf636bd10cb3 Mon Sep 17 00:00:00 2001 From: jackleary Date: Tue, 21 Jan 2025 14:25:50 +0000 Subject: [PATCH 8/8] NRL-1268 configure timestamp and partition by date --- .../modules/glue/glue.tf | 1 + .../modules/glue/src/main.py | 4 ++- .../modules/glue/src/transformations.py | 27 ++++++++++++++----- 3 files changed, 24 insertions(+), 8 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index c41efb538..e7cd3b810 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -52,6 +52,7 @@ resource "aws_glue_job" "glue_job" { "--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path "--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path "--job_name" = "${var.name_prefix}-glue-job" + "--partition_cols" = "date" "--enable-continuous-log-filter" = "true" "--enable-metrics" = "true" "--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip" diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index e6012caf6..5450da0cd 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -6,7 +6,9 @@ from transformations import dtype_conversion, flatten_df, logSchema # Get arguments from AWS Glue job -args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"]) +args = getResolvedOptions( + sys.argv, ["job_name", "source_path", "target_path", "partition_cols"] +) # Start Glue context sc = SparkContext() diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index f401bb17a..35e2a6b45 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -1,15 +1,21 @@ -from pyspark.sql.functions import col, to_timestamp +from pyspark.sql.functions import ( + col, + from_unixtime, + regexp_replace, + to_date, + to_timestamp, +) from pyspark.sql.types import ( BooleanType, + DoubleType, StringType, StructField, StructType, - TimestampType, ) logSchema = StructType( [ - StructField("time", TimestampType(), True), + StructField("time", DoubleType(), True), StructField("index", StringType(), True), StructField("host", StringType(), True), StructField("source", StringType(), True), @@ -80,8 +86,15 @@ def flatten(schema, prefix=""): def dtype_conversion(df): - df = df.withColumn( - "event_timestamp", - to_timestamp(df["event_timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX"), + df = ( + df.withColumn( + "event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".") + ) + .withColumn( + "event_timestamp", + to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"), + ) + .withColumn("time", from_unixtime(col("time")).cast("timestamp")) + .withColumn("date", to_date(col("time"))) ) - return df + return df.drop("event_timestamp_cleaned")