From c129e38cffac1ebd8e6d991bac1f7f2c068be204 Mon Sep 17 00:00:00 2001 From: jackleary Date: Wed, 16 Jul 2025 11:54:04 +0100 Subject: [PATCH 1/9] NRL-1346 Handle SSP logs --- .../modules/glue/glue.tf | 3 + .../modules/glue/src/main.py | 5 +- .../modules/glue/src/pipeline.py | 12 +-- .../modules/glue/src/transformations.py | 77 +++++++++++++++---- 4 files changed, 75 insertions(+), 22 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index 64a3c5d99..587106014 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -46,6 +46,9 @@ resource "aws_glue_crawler" "log_crawler" { s3_target { path = "s3://${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference/" } + s3_target { + path = "s3://${aws_s3_bucket.target-data-bucket.id}/ssp/" + } schema_change_policy { delete_behavior = "LOG" } diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 712be8877..06b7aa519 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -5,7 +5,7 @@ from awsglue.utils import getResolvedOptions from pipeline import LogPipeline from pyspark.sql import SparkSession -from transformations import dtype_conversion, rename_cols, resolve_dupes +from transformations import dtype_conversion, format_ssp, rename_cols, resolve_dupes # Spark and Glue Context initialization spark = SparkSession.builder.config("spark.sql.caseSensitive", "true").getOrCreate() @@ -37,6 +37,7 @@ "producer--updateDocumentReference", "producer--deleteDocumentReference", "producer--createDocumentReference", + "s2c", ] # Initialize ETL process @@ -49,7 +50,7 @@ host_prefixes=host_prefixes, job_name=args["job_name"], partition_cols=partition_cols, - transformations=[rename_cols, resolve_dupes, dtype_conversion], + transformations=[rename_cols, resolve_dupes, dtype_conversion, format_ssp], ) # Run the job diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 7c1b1597c..4732c6607 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -81,7 +81,7 @@ def extract_dynamic(self): }, format="json", ).filter( - f=lambda x, n=name: (x["host"].endswith(n)) + f=lambda x, n=name: (x["host"] is not None and n in x["host"]) and (x["time"] > last_runtime) ) @@ -95,7 +95,7 @@ def extract_dynamic(self): "groupSize": "134217728", }, format="json", - ).filter(f=lambda x, n=name: x["host"].endswith(n)) + ).filter(f=lambda x, n=name: (x["host"] is not None and n in x["host"])) return data @@ -107,7 +107,7 @@ def transform(self, dataframe, name): ) for transformation in self.transformations: self.logger.info(f"Applying transformation: {transformation.__name__}") - dataframe = transformation(dataframe, self.logger) + dataframe = transformation(dataframe, self.logger, name) return dataframe def load(self, data): @@ -115,6 +115,8 @@ def load(self, data): self.logger.info(f"Loading data into {self.target_path} as Parquet") for name, dataframe in data.items(): name = name.replace("--", "_") + if name == "s2c": + name = "ssp" try: self.logger.info( f"Attempting to load dataframe {name} into {self.target_path}{name}" @@ -122,8 +124,8 @@ def load(self, data): dataframe.write.mode("append").partitionBy( *self.partition_cols ).parquet(f"{self.target_path}{name}") - except: - self.logger.info(f"{name} dataframe has no rows. Skipping.") + except Exception as e: + self.logger.info(f"{name} failed to write with error: {e}") def trigger_crawler(self): self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 425e5a061..cf539e56d 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -15,7 +15,43 @@ from pyspark.sql.types import NullType -def resolve_dupes(df, logger): +def format_ssp(df, logger, name): + if name != "s2c": + logger.info(f"Not SSP logs, returning df: {name}") + return df + + logger.info(f"Processing SSP logs") + noODSCode = df.filter(col("logReference") != "SSP0001") + ODSCode = df.filter(col("logReference") == "SSP0001") + + noODSCode = noODSCode.select( + "time", + "host", + "internalID", + "logReference", + "interaction", + "responseCode", + "responseErrorMessage", + "totalDuration", + ) + ODSCode = ODSCode.select( + "sspFrom", + "fromOrgName", + "fromOdsCode", + "fromPostCode", + "sspTo", + "toOrgName", + "toOdsCode", + "toPostCode", + "internalID", + ) + + df = noODSCode.join(ODSCode, on="internalID", how="left") + + return df + + +def resolve_dupes(df, logger, name): column_groups = defaultdict(list) for column_name in df.columns: normalised_name = column_name.lower().rstrip("_") @@ -27,7 +63,9 @@ def resolve_dupes(df, logger): if len(original_names) == 1: final_select_exprs.append(col(original_names[0]).alias(lower_name)) else: - logger.info(f"Resolving duplicate group '{lower_name}': {original_names}") + logger.info( + f"Resolving duplicate group '{lower_name}': {original_names} for df: {name}" + ) merge_logic = lambda col1, col2: when( col1.isNull() | col2.isNull(), coalesce(col1, col2) @@ -40,34 +78,43 @@ def resolve_dupes(df, logger): return df.select(*final_select_exprs) -def rename_cols(df, logger): - logger.info("Replacing '.' with '_'") +def rename_cols(df, logger, name): + logger.info(f"Replacing '.' with '_' for df: {name}") for col_name in df.columns: df = df.withColumnRenamed(col_name, col_name.replace(".", "_")) return df -def dtype_conversion(df, logger): +def dtype_conversion(df, logger, name): try: - logger.info("Formatting event_timestamp") - df = ( - df.withColumn( + logger.info(f"Formatting event_timestamp, time and date columns for df: {name}") + if "event_timestamp" in df.columns: + df = df.withColumn( "event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", "."), - ) - .withColumn( + ).withColumn( "event_timestamp", to_timestamp( col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ" ), ) - .withColumn("time", from_unixtime(col("time")).cast("timestamp")) - .withColumn("date", to_date(col("time"))) - ) - df = df.drop("event_timestamp_cleaned") + df = df.drop("event_timestamp_cleaned") + + if "time" in df.columns: + df = df.withColumn( + "time", from_unixtime(col("time")).cast("timestamp") + ).withColumn("date", to_date(col("time"))) + + if "_time" in df.columns: + df = df.withColumn( + "time", to_timestamp(col("_time"), "yyyy-MM-dd HH:mm:ss.SSSZ") + ).withColumn("date", to_date(col("time"))) + + df = df.drop("_time") + except Exception as e: - logger.info(f"Failed formatting of timestamp column with error: {e}") + logger.info(f"Failed formatting of timestamp columns with error: {e}") logger.info("Handling Null Type columns") select_exprs = [] From 3b3d035c71fd3d17ee10492abdb1705e66ffc9f0 Mon Sep 17 00:00:00 2001 From: jackleary Date: Wed, 16 Jul 2025 16:48:16 +0100 Subject: [PATCH 2/9] NRL-1346 Improved error handling --- .../account-wide-infrastructure/modules/glue/src/pipeline.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 4732c6607..c2635cc27 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -118,6 +118,10 @@ def load(self, data): if name == "s2c": name = "ssp" try: + if dataframe.rdd.isEmpty(): + self.logger.info(f"{name} dataframe has no rows. Skipping.") + continue + self.logger.info( f"Attempting to load dataframe {name} into {self.target_path}{name}" ) From 079bcfd78499961cdfe4f340a53ecf516891ad36 Mon Sep 17 00:00:00 2001 From: jackleary Date: Wed, 16 Jul 2025 17:16:51 +0100 Subject: [PATCH 3/9] NRL-1346 Skip transformations if dfs are empty --- .../modules/glue/src/transformations.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index cf539e56d..28cdcde62 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -20,6 +20,10 @@ def format_ssp(df, logger, name): logger.info(f"Not SSP logs, returning df: {name}") return df + if df.rdd.isEmpty(): + logger.info(f"{name} dataframe has no rows. Skipping format_ssp.") + return df + logger.info(f"Processing SSP logs") noODSCode = df.filter(col("logReference") != "SSP0001") ODSCode = df.filter(col("logReference") == "SSP0001") @@ -52,6 +56,10 @@ def format_ssp(df, logger, name): def resolve_dupes(df, logger, name): + if df.rdd.isEmpty(): + logger.info(f"{name} dataframe has no rows. Skipping resolve_dupes.") + return df + column_groups = defaultdict(list) for column_name in df.columns: normalised_name = column_name.lower().rstrip("_") @@ -79,6 +87,10 @@ def resolve_dupes(df, logger, name): def rename_cols(df, logger, name): + if df.rdd.isEmpty(): + logger.info(f"{name} dataframe has no rows. Skipping rename_cols.") + return df + logger.info(f"Replacing '.' with '_' for df: {name}") for col_name in df.columns: df = df.withColumnRenamed(col_name, col_name.replace(".", "_")) @@ -86,6 +98,9 @@ def rename_cols(df, logger, name): def dtype_conversion(df, logger, name): + if df.rdd.isEmpty(): + logger.info(f"{name} dataframe has no rows. Skipping dtype_conversion.") + return df try: logger.info(f"Formatting event_timestamp, time and date columns for df: {name}") if "event_timestamp" in df.columns: From a3f47fef8d1d5ce119825d1c26ae95d0968a0ac7 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 17 Jul 2025 08:58:56 +0100 Subject: [PATCH 4/9] NRL-1346 Sonar changes --- .../modules/glue/src/transformations.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 28cdcde62..3787a7516 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -24,11 +24,11 @@ def format_ssp(df, logger, name): logger.info(f"{name} dataframe has no rows. Skipping format_ssp.") return df - logger.info(f"Processing SSP logs") - noODSCode = df.filter(col("logReference") != "SSP0001") - ODSCode = df.filter(col("logReference") == "SSP0001") + logger.info("Processing SSP logs") + noodsCode = df.filter(col("logReference") != "SSP0001") + odsCode = df.filter(col("logReference") == "SSP0001") - noODSCode = noODSCode.select( + noodsCode = noodsCode.select( "time", "host", "internalID", @@ -38,7 +38,7 @@ def format_ssp(df, logger, name): "responseErrorMessage", "totalDuration", ) - ODSCode = ODSCode.select( + odsCode = odsCode.select( "sspFrom", "fromOrgName", "fromOdsCode", @@ -50,7 +50,7 @@ def format_ssp(df, logger, name): "internalID", ) - df = noODSCode.join(ODSCode, on="internalID", how="left") + df = noodsCode.join(odsCode, on="internalID", how="left") return df From dac9ff4192db0942d3c56fd78b5f238838925be8 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 17 Jul 2025 08:59:46 +0100 Subject: [PATCH 5/9] NRL-1346 Sonar changes --- .../modules/glue/src/transformations.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 3787a7516..ca1a7ce4f 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -25,10 +25,10 @@ def format_ssp(df, logger, name): return df logger.info("Processing SSP logs") - noodsCode = df.filter(col("logReference") != "SSP0001") + noOdsCode = df.filter(col("logReference") != "SSP0001") odsCode = df.filter(col("logReference") == "SSP0001") - noodsCode = noodsCode.select( + noOdsCode = noOdsCode.select( "time", "host", "internalID", @@ -50,7 +50,7 @@ def format_ssp(df, logger, name): "internalID", ) - df = noodsCode.join(odsCode, on="internalID", how="left") + df = noOdsCode.join(odsCode, on="internalID", how="left") return df From 6acfbf4673bf9ef8d1992ffd3b4a5d1baff7bdb2 Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 17 Jul 2025 09:03:17 +0100 Subject: [PATCH 6/9] NRL-1346 Table name update --- terraform/account-wide-infrastructure/modules/glue/glue.tf | 2 +- .../account-wide-infrastructure/modules/glue/src/pipeline.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index 587106014..0bac81081 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -47,7 +47,7 @@ resource "aws_glue_crawler" "log_crawler" { path = "s3://${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference/" } s3_target { - path = "s3://${aws_s3_bucket.target-data-bucket.id}/ssp/" + path = "s3://${aws_s3_bucket.target-data-bucket.id}/spine_sspDocumentRetrieval/" } schema_change_policy { delete_behavior = "LOG" diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index c2635cc27..6082a99d5 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -116,7 +116,7 @@ def load(self, data): for name, dataframe in data.items(): name = name.replace("--", "_") if name == "s2c": - name = "ssp" + name = "spine_sspDocumentRetrieval" try: if dataframe.rdd.isEmpty(): self.logger.info(f"{name} dataframe has no rows. Skipping.") From 03651d23e10b9396a256e78a6a7ae7baf24f9cae Mon Sep 17 00:00:00 2001 From: jackleary Date: Thu, 17 Jul 2025 09:07:44 +0100 Subject: [PATCH 7/9] NRL-1346 No camel case --- .../modules/glue/src/transformations.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index ca1a7ce4f..22b1dd111 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -25,10 +25,10 @@ def format_ssp(df, logger, name): return df logger.info("Processing SSP logs") - noOdsCode = df.filter(col("logReference") != "SSP0001") - odsCode = df.filter(col("logReference") == "SSP0001") + no_ods_code = df.filter(col("logReference") != "SSP0001") + ods_code = df.filter(col("logReference") == "SSP0001") - noOdsCode = noOdsCode.select( + no_ods_code = no_ods_code.select( "time", "host", "internalID", @@ -38,7 +38,7 @@ def format_ssp(df, logger, name): "responseErrorMessage", "totalDuration", ) - odsCode = odsCode.select( + ods_code = ods_code.select( "sspFrom", "fromOrgName", "fromOdsCode", @@ -50,7 +50,7 @@ def format_ssp(df, logger, name): "internalID", ) - df = noOdsCode.join(odsCode, on="internalID", how="left") + df = no_ods_code.join(ods_code, on="internalID", how="left") return df From 5386726a16929724313c0f348337e9e292fd5197 Mon Sep 17 00:00:00 2001 From: jackleary Date: Tue, 22 Jul 2025 18:38:28 +0100 Subject: [PATCH 8/9] NRL-1346 Create and set glue trigger --- terraform/account-wide-infrastructure/dev/glue.tf | 1 + .../modules/glue/glue.tf | 13 +++++++++++++ .../modules/glue/vars.tf | 6 ++++++ terraform/account-wide-infrastructure/prod/glue.tf | 1 + terraform/account-wide-infrastructure/test/glue.tf | 1 + 5 files changed, 22 insertions(+) diff --git a/terraform/account-wide-infrastructure/dev/glue.tf b/terraform/account-wide-infrastructure/dev/glue.tf index 9f52c1f9a..9a658448b 100644 --- a/terraform/account-wide-infrastructure/dev/glue.tf +++ b/terraform/account-wide-infrastructure/dev/glue.tf @@ -2,5 +2,6 @@ module "dev-glue" { is_enabled = var.enable_reporting source = "../modules/glue" name_prefix = "nhsd-nrlf--dev" + schedule = true python_version = 3 } diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index 0bac81081..3d899379a 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -59,6 +59,7 @@ resource "aws_glue_crawler" "log_crawler" { } }) } + resource "aws_glue_trigger" "log_trigger" { count = var.is_enabled ? 1 : 0 @@ -69,6 +70,18 @@ resource "aws_glue_trigger" "log_trigger" { } } +resource "aws_glue_trigger" "glue_trigger" { + count = var.schedule && var.is_enabled ? 1 : 0 + + name = "${var.name_prefix}-glue-trigger" + type = "SCHEDULED" + schedule = "cron(0 0 1 * * *)" + + actions { + crawler_name = aws_glue_crawler.glue_job[0].name + } +} + resource "aws_glue_job" "glue_job" { count = var.is_enabled ? 1 : 0 diff --git a/terraform/account-wide-infrastructure/modules/glue/vars.tf b/terraform/account-wide-infrastructure/modules/glue/vars.tf index ae3281303..d96d06c41 100644 --- a/terraform/account-wide-infrastructure/modules/glue/vars.tf +++ b/terraform/account-wide-infrastructure/modules/glue/vars.tf @@ -28,3 +28,9 @@ variable "is_enabled" { description = "Flag to enable or disable the Glue module" default = true } + +variable "schedule" { + type = bool + description = "Flag to enable or disable the Glue schedule" + default = false +} diff --git a/terraform/account-wide-infrastructure/prod/glue.tf b/terraform/account-wide-infrastructure/prod/glue.tf index 34c7f540d..9e956720b 100644 --- a/terraform/account-wide-infrastructure/prod/glue.tf +++ b/terraform/account-wide-infrastructure/prod/glue.tf @@ -2,5 +2,6 @@ module "prod-glue" { is_enabled = var.enable_reporting source = "../modules/glue" name_prefix = "nhsd-nrlf--prod" + schedule = true python_version = 3 } diff --git a/terraform/account-wide-infrastructure/test/glue.tf b/terraform/account-wide-infrastructure/test/glue.tf index 86e714de3..3256ba574 100644 --- a/terraform/account-wide-infrastructure/test/glue.tf +++ b/terraform/account-wide-infrastructure/test/glue.tf @@ -2,5 +2,6 @@ module "test-glue" { is_enabled = var.enable_reporting source = "../modules/glue" name_prefix = "nhsd-nrlf--test" + schedule = false python_version = 3 } From 1a97292b02a3c1368b9f454e54b6a8032d98033b Mon Sep 17 00:00:00 2001 From: jackleary Date: Tue, 22 Jul 2025 19:01:26 +0100 Subject: [PATCH 9/9] NRL-1346 Syntax fixes --- terraform/account-wide-infrastructure/dev/glue.tf | 2 +- terraform/account-wide-infrastructure/modules/glue/glue.tf | 4 ++-- .../modules/glue/src/transformations.py | 7 ++----- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/terraform/account-wide-infrastructure/dev/glue.tf b/terraform/account-wide-infrastructure/dev/glue.tf index 9a658448b..11212f43f 100644 --- a/terraform/account-wide-infrastructure/dev/glue.tf +++ b/terraform/account-wide-infrastructure/dev/glue.tf @@ -2,6 +2,6 @@ module "dev-glue" { is_enabled = var.enable_reporting source = "../modules/glue" name_prefix = "nhsd-nrlf--dev" - schedule = true + schedule = false python_version = 3 } diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index 3d899379a..aced0651d 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -75,10 +75,10 @@ resource "aws_glue_trigger" "glue_trigger" { name = "${var.name_prefix}-glue-trigger" type = "SCHEDULED" - schedule = "cron(0 0 1 * * *)" + schedule = "cron(0 1 * * ? *)" actions { - crawler_name = aws_glue_crawler.glue_job[0].name + job_name = aws_glue_job.glue_job[0].name } } diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 22b1dd111..1b890a2cc 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -25,10 +25,7 @@ def format_ssp(df, logger, name): return df logger.info("Processing SSP logs") - no_ods_code = df.filter(col("logReference") != "SSP0001") - ods_code = df.filter(col("logReference") == "SSP0001") - - no_ods_code = no_ods_code.select( + no_ods_code = df.filter(col("logReference") != "SSP0001").select( "time", "host", "internalID", @@ -38,7 +35,7 @@ def format_ssp(df, logger, name): "responseErrorMessage", "totalDuration", ) - ods_code = ods_code.select( + ods_code = df.filter(col("logReference") == "SSP0001").select( "sspFrom", "fromOrgName", "fromOdsCode",