From 5fd0284ab1c1d995349e170fb5a4e54b41d1e044 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 28 Apr 2025 01:22:52 +0100 Subject: [PATCH 1/8] NRL-1411 use dynamic frame to infer schema --- .../modules/glue/src/main.py | 22 +++++-- .../modules/glue/src/pipeline.py | 62 +++++++++++-------- .../modules/glue/src/transformations.py | 35 +---------- 3 files changed, 55 insertions(+), 64 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 27619bbc5..64f616b59 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -1,11 +1,9 @@ import sys from awsglue.utils import getResolvedOptions -from consumer_schemas import consumerSchemaList from pipeline import LogPipeline -from producer_schemas import producerSchemaList from pyspark.context import SparkContext -from transformations import dtype_conversion, flatten_df, resolve_dupes +from transformations import dtype_conversion, rename_cols, resolve_dupes # Get arguments from AWS Glue job args = getResolvedOptions( @@ -17,17 +15,29 @@ partition_cols = args["partition_cols"].split(",") if "partition_cols" in args else [] -consumerSchemaList.update(producerSchemaList) +host_prefixes = [ + "consumer--countDocumentReference", + "consumer--searchPostDocumentReference", + "consumer--searchDocumentReference", + "consumer--readDocumentReference", + "producer--searchPostDocumentReference", + "producer--searchDocumentReference", + "producer--readDocumentReference", + "producer--upsertDocumentReference", + "producer--updateDocumentReference", + "producer--deleteDocumentReference", + "producer--createDocumentReference", +] # Initialize ETL process etl_job = LogPipeline( spark_context=sc, source_path=args["source_path"], target_path=args["target_path"], - schemas=consumerSchemaList, + host_prefixes=host_prefixes, job_name=args["job_name"], partition_cols=partition_cols, - transformations=[flatten_df, resolve_dupes, dtype_conversion], + transformations=[rename_cols, resolve_dupes, dtype_conversion], ) # Run the job diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 5ea40ffb8..74a50dab6 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -2,7 +2,6 @@ import boto3 from instances import GlueContextSingleton, LoggerSingleton -from pyspark.sql.functions import col class LogPipeline: @@ -11,7 +10,7 @@ def __init__( spark_context, source_path, target_path, - schemas, + host_prefixes, job_name, partition_cols=[], transformations=[], @@ -22,7 +21,7 @@ def __init__( self.logger = LoggerSingleton().logger self.source_path = source_path self.target_path = target_path - self.schemas = schemas + self.host_prefixes = host_prefixes self.partition_cols = partition_cols self.transformations = transformations self.glue = boto3.client( @@ -37,10 +36,10 @@ def run(self): """Runs ETL""" try: self.logger.info("ETL Process started.") - data = self.extract() + data = self.extract_dynamic() self.logger.info(f"Data extracted from {self.source_path}.") for name, df in data.items(): - data[name] = self.transform(df) + data[name] = self.transform(df, name) self.logger.info("Data transformed successfully.") self.load(data) self.logger.info(f"Data loaded into {self.target_path}.") @@ -51,6 +50,7 @@ def run(self): raise e def get_last_run(self): + self.logger.info(f"Retrieving last successful runtime.") all_runs = self.glue.get_job_runs(JobName=self.job_name) if not all_runs["JobRuns"]: return None @@ -59,28 +59,41 @@ def get_last_run(self): if run["JobRunState"] == "SUCCEEDED": return time.mktime(run["StartedOn"].timetuple()) - def extract(self): + return None + + def extract_dynamic(self): """Extract JSON data from S3""" - self.logger.info(f"Extracting data from {self.source_path} as JSON") last_runtime = self.get_last_run() data = {} - for name, schema in self.schemas.items(): + data_source = self.glueContext.getSource("s3", paths=[self.source_path]) + data_source.setFormat("json") + self.logger.info(f"Extracting data from {self.source_path} as JSON") + for name in self.host_prefixes: if last_runtime: - data[name] = ( - self.spark.read.option("recursiveFileLookup", "true") - .schema(schema) - .json(self.source_path) - ).where((col("host").contains(name)) & (col("time") > last_runtime)) + data[name] = self.glueContext.create_dynamic_frame.from_options( + connection_type="s3", + connection_options={"paths": [self.source_path], "recurse": True}, + format="json", + ).filter( + f=lambda x: (x["host"].endswith(name)) + and (x["time"] > last_runtime) + ) + else: - data[name] = ( - self.spark.read.option("recursiveFileLookup", "true") - .schema(schema) - .json(self.source_path) - ).where(col("host").contains(name)) + data[name] = self.glueContext.create_dynamic_frame.from_options( + connection_type="s3", + connection_options={"paths": [self.source_path], "recurse": True}, + format="json", + ).filter(f=lambda x: x["host"].endswith(name)) + return data - def transform(self, dataframe): + def transform(self, dataframe, name): """Apply a list of transformations on the dataframe""" + self.spark.conf.set("spark.sql.caseSensitive", True) + dataframe = ( + dataframe.relationalize("root", f"./tmp/{name}").select("root").toDF() + ) for transformation in self.transformations: self.logger.info(f"Applying transformation: {transformation.__name__}") dataframe = transformation(dataframe) @@ -90,13 +103,10 @@ def load(self, data): """Load transformed data into Parquet format""" self.logger.info(f"Loading data into {self.target_path} as Parquet") for name, dataframe in data.items(): - if dataframe.na.drop().count() > 0: - name = name.replace("--", "_") - dataframe.write.mode("append").partitionBy( - *self.partition_cols - ).parquet(f"{self.target_path}{name}") - else: - self.logger.info(f"Dataframe {name} is null, skipping") + name = name.replace("--", "_") + dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet( + f"{self.target_path}{name}" + ) def trigger_crawler(self): self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 3258dfa9a..0d7b63c3f 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -2,7 +2,6 @@ coalesce, col, concat, - explode_outer, from_unixtime, lit, regexp_replace, @@ -10,7 +9,6 @@ to_timestamp, when, ) -from pyspark.sql.types import ArrayType, StructType def resolve_dupes(df): @@ -33,36 +31,9 @@ def resolve_dupes(df): return df -def flatten_df(df): - complex_fields = dict( - [ - (field.name, field.dataType) - for field in df.schema.fields - if isinstance(field.dataType, ArrayType) - or isinstance(field.dataType, StructType) - ] - ) - while len(complex_fields) != 0: - col_name = list(complex_fields.keys())[0] - - if isinstance(complex_fields[col_name], StructType): - expanded = [ - col(col_name + "." + k).alias(col_name + "_" + k) - for k in [n.name for n in complex_fields[col_name]] - ] - df = df.select("*", *expanded).drop(col_name) - - elif isinstance(complex_fields[col_name], ArrayType): - df = df.withColumn(col_name, explode_outer(col_name)) - - complex_fields = dict( - [ - (field.name, field.dataType) - for field in df.schema.fields - if isinstance(field.dataType, ArrayType) - or isinstance(field.dataType, ArrayType) - ] - ) +def rename_cols(df): + for col_name in df.columns: + df = df.withColumnRenamed(col_name, col_name.replace(".", "_")) return df From d8f37eed02ef180edeebd4e622825b45fb463203 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 28 Apr 2025 01:23:31 +0100 Subject: [PATCH 2/8] NRL-1411 update iam role to allow logging, increase number of workers for performance --- .../account-wide-infrastructure/modules/glue/glue.tf | 2 +- .../account-wide-infrastructure/modules/glue/iam.tf | 10 ++++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index 20f9ac16c..e36433b5c 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -68,7 +68,7 @@ resource "aws_glue_job" "glue_job" { worker_type = "G.1X" timeout = 2880 max_retries = 0 - number_of_workers = 2 + number_of_workers = 4 command { name = "glueetl" python_version = var.python_version diff --git a/terraform/account-wide-infrastructure/modules/glue/iam.tf b/terraform/account-wide-infrastructure/modules/glue/iam.tf index 6d33a1b78..9a97a3a8f 100644 --- a/terraform/account-wide-infrastructure/modules/glue/iam.tf +++ b/terraform/account-wide-infrastructure/modules/glue/iam.tf @@ -81,6 +81,16 @@ data "aws_iam_policy_document" "glue_service" { effect = "Allow" } + statement { + actions = [ + "cloudwatch:*", + ] + resources = [ + "*" + ] + effect = "Allow" + } + statement { actions = [ "iam:PassRole", From 22003b37951ca807b2745847a5592b4042521176 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 28 Apr 2025 01:43:14 +0100 Subject: [PATCH 3/8] NRL-1411 syntax fix --- .../modules/glue/src/pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 74a50dab6..94e260921 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -65,12 +65,12 @@ def extract_dynamic(self): """Extract JSON data from S3""" last_runtime = self.get_last_run() data = {} - data_source = self.glueContext.getSource("s3", paths=[self.source_path]) + data_source = self.glue_context.getSource("s3", paths=[self.source_path]) data_source.setFormat("json") self.logger.info(f"Extracting data from {self.source_path} as JSON") for name in self.host_prefixes: if last_runtime: - data[name] = self.glueContext.create_dynamic_frame.from_options( + data[name] = self.glue_context.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": [self.source_path], "recurse": True}, format="json", @@ -80,7 +80,7 @@ def extract_dynamic(self): ) else: - data[name] = self.glueContext.create_dynamic_frame.from_options( + data[name] = self.glue_context.create_dynamic_frame.from_options( connection_type="s3", connection_options={"paths": [self.source_path], "recurse": True}, format="json", From c1c9dc6ae262c3d6fbb68c826176d37992b4314d Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 28 Apr 2025 09:06:29 +0100 Subject: [PATCH 4/8] NRL-1411 convert null types to strings --- .../modules/glue/src/transformations.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 0d7b63c3f..b7443ed35 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -9,6 +9,7 @@ to_timestamp, when, ) +from pyspark.sql.types import NullType def resolve_dupes(df): @@ -49,4 +50,15 @@ def dtype_conversion(df): .withColumn("time", from_unixtime(col("time")).cast("timestamp")) .withColumn("date", to_date(col("time"))) ) - return df.drop("event_timestamp_cleaned") + + df = df.drop("event_timestamp_cleaned") + + select_exprs = [] + for column_name in df.columns: + column_type = df.schema[column_name].dataType + if isinstance(column_type, NullType): + select_exprs.append(col(column_name).cast("string").alias(column_name)) + else: + select_exprs.append(col(column_name)) + + return df.select(*select_exprs) From 3755b2377992048ba06ca4922008065593b534a4 Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 28 Apr 2025 09:29:58 +0100 Subject: [PATCH 5/8] NRL-1411 pass through loop var to lambda --- .../modules/glue/src/pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 94e260921..e98f6dba1 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -50,7 +50,7 @@ def run(self): raise e def get_last_run(self): - self.logger.info(f"Retrieving last successful runtime.") + self.logger.info("Retrieving last successful runtime.") all_runs = self.glue.get_job_runs(JobName=self.job_name) if not all_runs["JobRuns"]: return None @@ -75,7 +75,7 @@ def extract_dynamic(self): connection_options={"paths": [self.source_path], "recurse": True}, format="json", ).filter( - f=lambda x: (x["host"].endswith(name)) + f=lambda x, n=name: (x["host"].endswith(n)) and (x["time"] > last_runtime) ) @@ -84,7 +84,7 @@ def extract_dynamic(self): connection_type="s3", connection_options={"paths": [self.source_path], "recurse": True}, format="json", - ).filter(f=lambda x: x["host"].endswith(name)) + ).filter(f=lambda x, n=name: x["host"].endswith(n)) return data From 164157a4f0127f0c02adcacec0eb4fda9aae93ac Mon Sep 17 00:00:00 2001 From: jackleary Date: Mon, 28 Apr 2025 10:01:45 +0100 Subject: [PATCH 6/8] NRL-1411 attempt to improve effeciency --- .../modules/glue/src/pipeline.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index e98f6dba1..5028d3c18 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -104,9 +104,9 @@ def load(self, data): self.logger.info(f"Loading data into {self.target_path} as Parquet") for name, dataframe in data.items(): name = name.replace("--", "_") - dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet( - f"{self.target_path}{name}" - ) + dataframe.coalesce(1).write.mode("append").partitionBy( + *self.partition_cols + ).parquet(f"{self.target_path}{name}") def trigger_crawler(self): self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") From 551c08457a08aee99d46610255588edca84b9fc6 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 1 May 2025 16:03:14 +0100 Subject: [PATCH 7/8] NRL-1411 Handle empty dataframes --- .../modules/glue/src/pipeline.py | 9 ++++-- .../modules/glue/src/transformations.py | 28 +++++++++++-------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 5028d3c18..f018911ad 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -104,9 +104,12 @@ def load(self, data): self.logger.info(f"Loading data into {self.target_path} as Parquet") for name, dataframe in data.items(): name = name.replace("--", "_") - dataframe.coalesce(1).write.mode("append").partitionBy( - *self.partition_cols - ).parquet(f"{self.target_path}{name}") + try: + dataframe.coalesce(1).write.mode("append").partitionBy( + *self.partition_cols + ).parquet(f"{self.target_path}{name}") + except: + self.logger.info(f"{name} dataframe has no rows. Skipping.") def trigger_crawler(self): self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index b7443ed35..3b23d6515 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -39,19 +39,25 @@ def rename_cols(df): def dtype_conversion(df): - df = ( - df.withColumn( - "event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".") + try: + df = ( + df.withColumn( + "event_timestamp_cleaned", + regexp_replace(col("event_timestamp"), ",", "."), + ) + .withColumn( + "event_timestamp", + to_timestamp( + col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ" + ), + ) + .withColumn("time", from_unixtime(col("time")).cast("timestamp")) + .withColumn("date", to_date(col("time"))) ) - .withColumn( - "event_timestamp", - to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"), - ) - .withColumn("time", from_unixtime(col("time")).cast("timestamp")) - .withColumn("date", to_date(col("time"))) - ) - df = df.drop("event_timestamp_cleaned") + df = df.drop("event_timestamp_cleaned") + except: + ... select_exprs = [] for column_name in df.columns: From 807525754aef0e9e624dbb0244e26ffe971951c4 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 1 May 2025 16:03:52 +0100 Subject: [PATCH 8/8] NRL-1411 Specify actions in iam role --- terraform/account-wide-infrastructure/modules/glue/iam.tf | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/iam.tf b/terraform/account-wide-infrastructure/modules/glue/iam.tf index 9a97a3a8f..097fe0386 100644 --- a/terraform/account-wide-infrastructure/modules/glue/iam.tf +++ b/terraform/account-wide-infrastructure/modules/glue/iam.tf @@ -63,7 +63,6 @@ data "aws_iam_policy_document" "glue_service" { resources = [ "arn:aws:logs:*:*:*:/aws-glue/*", - # "arn:aws:logs:*:*:*:/customlogs/*" ] effect = "Allow" @@ -83,7 +82,9 @@ data "aws_iam_policy_document" "glue_service" { statement { actions = [ - "cloudwatch:*", + "cloudwatch:Get*", + "cloudwatch:List*", + "cloudwatch:Put*", ] resources = [ "*" @@ -97,7 +98,7 @@ data "aws_iam_policy_document" "glue_service" { ] effect = "Allow" resources = [ - "*" + "arn:aws:iam::*:role/AWSGlueServiceRole*" ] } }