diff --git a/terraform/account-wide-infrastructure/dev/glue.tf b/terraform/account-wide-infrastructure/dev/glue.tf index 9f52c1f9a..11212f43f 100644 --- a/terraform/account-wide-infrastructure/dev/glue.tf +++ b/terraform/account-wide-infrastructure/dev/glue.tf @@ -2,5 +2,6 @@ module "dev-glue" { is_enabled = var.enable_reporting source = "../modules/glue" name_prefix = "nhsd-nrlf--dev" + schedule = false python_version = 3 } diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index 64a3c5d99..aced0651d 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -46,6 +46,9 @@ resource "aws_glue_crawler" "log_crawler" { s3_target { path = "s3://${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference/" } + s3_target { + path = "s3://${aws_s3_bucket.target-data-bucket.id}/spine_sspDocumentRetrieval/" + } schema_change_policy { delete_behavior = "LOG" } @@ -56,6 +59,7 @@ resource "aws_glue_crawler" "log_crawler" { } }) } + resource "aws_glue_trigger" "log_trigger" { count = var.is_enabled ? 1 : 0 @@ -66,6 +70,18 @@ resource "aws_glue_trigger" "log_trigger" { } } +resource "aws_glue_trigger" "glue_trigger" { + count = var.schedule && var.is_enabled ? 1 : 0 + + name = "${var.name_prefix}-glue-trigger" + type = "SCHEDULED" + schedule = "cron(0 1 * * ? *)" + + actions { + job_name = aws_glue_job.glue_job[0].name + } +} + resource "aws_glue_job" "glue_job" { count = var.is_enabled ? 1 : 0 diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 712be8877..06b7aa519 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -5,7 +5,7 @@ from awsglue.utils import getResolvedOptions from pipeline import LogPipeline from pyspark.sql import SparkSession -from transformations import dtype_conversion, rename_cols, resolve_dupes +from transformations import dtype_conversion, format_ssp, rename_cols, resolve_dupes # Spark and Glue Context initialization spark = SparkSession.builder.config("spark.sql.caseSensitive", "true").getOrCreate() @@ -37,6 +37,7 @@ "producer--updateDocumentReference", "producer--deleteDocumentReference", "producer--createDocumentReference", + "s2c", ] # Initialize ETL process @@ -49,7 +50,7 @@ host_prefixes=host_prefixes, job_name=args["job_name"], partition_cols=partition_cols, - transformations=[rename_cols, resolve_dupes, dtype_conversion], + transformations=[rename_cols, resolve_dupes, dtype_conversion, format_ssp], ) # Run the job diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 7c1b1597c..6082a99d5 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -81,7 +81,7 @@ def extract_dynamic(self): }, format="json", ).filter( - f=lambda x, n=name: (x["host"].endswith(n)) + f=lambda x, n=name: (x["host"] is not None and n in x["host"]) and (x["time"] > last_runtime) ) @@ -95,7 +95,7 @@ def extract_dynamic(self): "groupSize": "134217728", }, format="json", - ).filter(f=lambda x, n=name: x["host"].endswith(n)) + ).filter(f=lambda x, n=name: (x["host"] is not None and n in x["host"])) return data @@ -107,7 +107,7 @@ def transform(self, dataframe, name): ) for transformation in self.transformations: self.logger.info(f"Applying transformation: {transformation.__name__}") - dataframe = transformation(dataframe, self.logger) + dataframe = transformation(dataframe, self.logger, name) return dataframe def load(self, data): @@ -115,15 +115,21 @@ def load(self, data): self.logger.info(f"Loading data into {self.target_path} as Parquet") for name, dataframe in data.items(): name = name.replace("--", "_") + if name == "s2c": + name = "spine_sspDocumentRetrieval" try: + if dataframe.rdd.isEmpty(): + self.logger.info(f"{name} dataframe has no rows. Skipping.") + continue + self.logger.info( f"Attempting to load dataframe {name} into {self.target_path}{name}" ) dataframe.write.mode("append").partitionBy( *self.partition_cols ).parquet(f"{self.target_path}{name}") - except: - self.logger.info(f"{name} dataframe has no rows. Skipping.") + except Exception as e: + self.logger.info(f"{name} failed to write with error: {e}") def trigger_crawler(self): self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 425e5a061..1b890a2cc 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -15,7 +15,48 @@ from pyspark.sql.types import NullType -def resolve_dupes(df, logger): +def format_ssp(df, logger, name): + if name != "s2c": + logger.info(f"Not SSP logs, returning df: {name}") + return df + + if df.rdd.isEmpty(): + logger.info(f"{name} dataframe has no rows. Skipping format_ssp.") + return df + + logger.info("Processing SSP logs") + no_ods_code = df.filter(col("logReference") != "SSP0001").select( + "time", + "host", + "internalID", + "logReference", + "interaction", + "responseCode", + "responseErrorMessage", + "totalDuration", + ) + ods_code = df.filter(col("logReference") == "SSP0001").select( + "sspFrom", + "fromOrgName", + "fromOdsCode", + "fromPostCode", + "sspTo", + "toOrgName", + "toOdsCode", + "toPostCode", + "internalID", + ) + + df = no_ods_code.join(ods_code, on="internalID", how="left") + + return df + + +def resolve_dupes(df, logger, name): + if df.rdd.isEmpty(): + logger.info(f"{name} dataframe has no rows. Skipping resolve_dupes.") + return df + column_groups = defaultdict(list) for column_name in df.columns: normalised_name = column_name.lower().rstrip("_") @@ -27,7 +68,9 @@ def resolve_dupes(df, logger): if len(original_names) == 1: final_select_exprs.append(col(original_names[0]).alias(lower_name)) else: - logger.info(f"Resolving duplicate group '{lower_name}': {original_names}") + logger.info( + f"Resolving duplicate group '{lower_name}': {original_names} for df: {name}" + ) merge_logic = lambda col1, col2: when( col1.isNull() | col2.isNull(), coalesce(col1, col2) @@ -40,34 +83,50 @@ def resolve_dupes(df, logger): return df.select(*final_select_exprs) -def rename_cols(df, logger): - logger.info("Replacing '.' with '_'") +def rename_cols(df, logger, name): + if df.rdd.isEmpty(): + logger.info(f"{name} dataframe has no rows. Skipping rename_cols.") + return df + + logger.info(f"Replacing '.' with '_' for df: {name}") for col_name in df.columns: df = df.withColumnRenamed(col_name, col_name.replace(".", "_")) return df -def dtype_conversion(df, logger): +def dtype_conversion(df, logger, name): + if df.rdd.isEmpty(): + logger.info(f"{name} dataframe has no rows. Skipping dtype_conversion.") + return df try: - logger.info("Formatting event_timestamp") - df = ( - df.withColumn( + logger.info(f"Formatting event_timestamp, time and date columns for df: {name}") + if "event_timestamp" in df.columns: + df = df.withColumn( "event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", "."), - ) - .withColumn( + ).withColumn( "event_timestamp", to_timestamp( col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ" ), ) - .withColumn("time", from_unixtime(col("time")).cast("timestamp")) - .withColumn("date", to_date(col("time"))) - ) - df = df.drop("event_timestamp_cleaned") + df = df.drop("event_timestamp_cleaned") + + if "time" in df.columns: + df = df.withColumn( + "time", from_unixtime(col("time")).cast("timestamp") + ).withColumn("date", to_date(col("time"))) + + if "_time" in df.columns: + df = df.withColumn( + "time", to_timestamp(col("_time"), "yyyy-MM-dd HH:mm:ss.SSSZ") + ).withColumn("date", to_date(col("time"))) + + df = df.drop("_time") + except Exception as e: - logger.info(f"Failed formatting of timestamp column with error: {e}") + logger.info(f"Failed formatting of timestamp columns with error: {e}") logger.info("Handling Null Type columns") select_exprs = [] diff --git a/terraform/account-wide-infrastructure/modules/glue/vars.tf b/terraform/account-wide-infrastructure/modules/glue/vars.tf index ae3281303..d96d06c41 100644 --- a/terraform/account-wide-infrastructure/modules/glue/vars.tf +++ b/terraform/account-wide-infrastructure/modules/glue/vars.tf @@ -28,3 +28,9 @@ variable "is_enabled" { description = "Flag to enable or disable the Glue module" default = true } + +variable "schedule" { + type = bool + description = "Flag to enable or disable the Glue schedule" + default = false +} diff --git a/terraform/account-wide-infrastructure/prod/glue.tf b/terraform/account-wide-infrastructure/prod/glue.tf index 34c7f540d..9e956720b 100644 --- a/terraform/account-wide-infrastructure/prod/glue.tf +++ b/terraform/account-wide-infrastructure/prod/glue.tf @@ -2,5 +2,6 @@ module "prod-glue" { is_enabled = var.enable_reporting source = "../modules/glue" name_prefix = "nhsd-nrlf--prod" + schedule = true python_version = 3 } diff --git a/terraform/account-wide-infrastructure/test/glue.tf b/terraform/account-wide-infrastructure/test/glue.tf index 86e714de3..3256ba574 100644 --- a/terraform/account-wide-infrastructure/test/glue.tf +++ b/terraform/account-wide-infrastructure/test/glue.tf @@ -2,5 +2,6 @@ module "test-glue" { is_enabled = var.enable_reporting source = "../modules/glue" name_prefix = "nhsd-nrlf--test" + schedule = false python_version = 3 }