Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions terraform/account-wide-infrastructure/dev/glue.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ module "dev-glue" {
is_enabled = var.enable_reporting
source = "../modules/glue"
name_prefix = "nhsd-nrlf--dev"
schedule = false
python_version = 3
}
16 changes: 16 additions & 0 deletions terraform/account-wide-infrastructure/modules/glue/glue.tf
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ resource "aws_glue_crawler" "log_crawler" {
s3_target {
path = "s3://${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference/"
}
s3_target {
path = "s3://${aws_s3_bucket.target-data-bucket.id}/spine_sspDocumentRetrieval/"
}
schema_change_policy {
delete_behavior = "LOG"
}
Expand All @@ -56,6 +59,7 @@ resource "aws_glue_crawler" "log_crawler" {
}
})
}

resource "aws_glue_trigger" "log_trigger" {
count = var.is_enabled ? 1 : 0

Expand All @@ -66,6 +70,18 @@ resource "aws_glue_trigger" "log_trigger" {
}
}

resource "aws_glue_trigger" "glue_trigger" {
count = var.schedule && var.is_enabled ? 1 : 0

name = "${var.name_prefix}-glue-trigger"
type = "SCHEDULED"
schedule = "cron(0 1 * * ? *)"

actions {
job_name = aws_glue_job.glue_job[0].name
}
}

resource "aws_glue_job" "glue_job" {
count = var.is_enabled ? 1 : 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from awsglue.utils import getResolvedOptions
from pipeline import LogPipeline
from pyspark.sql import SparkSession
from transformations import dtype_conversion, rename_cols, resolve_dupes
from transformations import dtype_conversion, format_ssp, rename_cols, resolve_dupes

# Spark and Glue Context initialization
spark = SparkSession.builder.config("spark.sql.caseSensitive", "true").getOrCreate()
Expand Down Expand Up @@ -37,6 +37,7 @@
"producer--updateDocumentReference",
"producer--deleteDocumentReference",
"producer--createDocumentReference",
"s2c",
]

# Initialize ETL process
Expand All @@ -49,7 +50,7 @@
host_prefixes=host_prefixes,
job_name=args["job_name"],
partition_cols=partition_cols,
transformations=[rename_cols, resolve_dupes, dtype_conversion],
transformations=[rename_cols, resolve_dupes, dtype_conversion, format_ssp],
)

# Run the job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def extract_dynamic(self):
},
format="json",
).filter(
f=lambda x, n=name: (x["host"].endswith(n))
f=lambda x, n=name: (x["host"] is not None and n in x["host"])
and (x["time"] > last_runtime)
)

Expand All @@ -95,7 +95,7 @@ def extract_dynamic(self):
"groupSize": "134217728",
},
format="json",
).filter(f=lambda x, n=name: x["host"].endswith(n))
).filter(f=lambda x, n=name: (x["host"] is not None and n in x["host"]))

return data

Expand All @@ -107,23 +107,29 @@ def transform(self, dataframe, name):
)
for transformation in self.transformations:
self.logger.info(f"Applying transformation: {transformation.__name__}")
dataframe = transformation(dataframe, self.logger)
dataframe = transformation(dataframe, self.logger, name)
return dataframe

def load(self, data):
"""Load transformed data into Parquet format"""
self.logger.info(f"Loading data into {self.target_path} as Parquet")
for name, dataframe in data.items():
name = name.replace("--", "_")
if name == "s2c":
name = "spine_sspDocumentRetrieval"
try:
if dataframe.rdd.isEmpty():
self.logger.info(f"{name} dataframe has no rows. Skipping.")
continue

self.logger.info(
f"Attempting to load dataframe {name} into {self.target_path}{name}"
)
dataframe.write.mode("append").partitionBy(
*self.partition_cols
).parquet(f"{self.target_path}{name}")
except:
self.logger.info(f"{name} dataframe has no rows. Skipping.")
except Exception as e:
self.logger.info(f"{name} failed to write with error: {e}")

def trigger_crawler(self):
self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler")
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,48 @@
from pyspark.sql.types import NullType


def resolve_dupes(df, logger):
def format_ssp(df, logger, name):
if name != "s2c":
logger.info(f"Not SSP logs, returning df: {name}")
return df

if df.rdd.isEmpty():
logger.info(f"{name} dataframe has no rows. Skipping format_ssp.")
return df

logger.info("Processing SSP logs")
no_ods_code = df.filter(col("logReference") != "SSP0001").select(
"time",
"host",
"internalID",
"logReference",
"interaction",
"responseCode",
"responseErrorMessage",
"totalDuration",
)
ods_code = df.filter(col("logReference") == "SSP0001").select(
"sspFrom",
"fromOrgName",
"fromOdsCode",
"fromPostCode",
"sspTo",
"toOrgName",
"toOdsCode",
"toPostCode",
"internalID",
)

df = no_ods_code.join(ods_code, on="internalID", how="left")

return df


def resolve_dupes(df, logger, name):
if df.rdd.isEmpty():
logger.info(f"{name} dataframe has no rows. Skipping resolve_dupes.")
return df

column_groups = defaultdict(list)
for column_name in df.columns:
normalised_name = column_name.lower().rstrip("_")
Expand All @@ -27,7 +68,9 @@ def resolve_dupes(df, logger):
if len(original_names) == 1:
final_select_exprs.append(col(original_names[0]).alias(lower_name))
else:
logger.info(f"Resolving duplicate group '{lower_name}': {original_names}")
logger.info(
f"Resolving duplicate group '{lower_name}': {original_names} for df: {name}"
)

merge_logic = lambda col1, col2: when(
col1.isNull() | col2.isNull(), coalesce(col1, col2)
Expand All @@ -40,34 +83,50 @@ def resolve_dupes(df, logger):
return df.select(*final_select_exprs)


def rename_cols(df, logger):
logger.info("Replacing '.' with '_'")
def rename_cols(df, logger, name):
if df.rdd.isEmpty():
logger.info(f"{name} dataframe has no rows. Skipping rename_cols.")
return df

logger.info(f"Replacing '.' with '_' for df: {name}")
for col_name in df.columns:
df = df.withColumnRenamed(col_name, col_name.replace(".", "_"))
return df


def dtype_conversion(df, logger):
def dtype_conversion(df, logger, name):
if df.rdd.isEmpty():
logger.info(f"{name} dataframe has no rows. Skipping dtype_conversion.")
return df
try:
logger.info("Formatting event_timestamp")
df = (
df.withColumn(
logger.info(f"Formatting event_timestamp, time and date columns for df: {name}")
if "event_timestamp" in df.columns:
df = df.withColumn(
"event_timestamp_cleaned",
regexp_replace(col("event_timestamp"), ",", "."),
)
.withColumn(
).withColumn(
"event_timestamp",
to_timestamp(
col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"
),
)
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
.withColumn("date", to_date(col("time")))
)

df = df.drop("event_timestamp_cleaned")
df = df.drop("event_timestamp_cleaned")

if "time" in df.columns:
df = df.withColumn(
"time", from_unixtime(col("time")).cast("timestamp")
).withColumn("date", to_date(col("time")))

if "_time" in df.columns:
df = df.withColumn(
"time", to_timestamp(col("_time"), "yyyy-MM-dd HH:mm:ss.SSSZ")
).withColumn("date", to_date(col("time")))

df = df.drop("_time")

except Exception as e:
logger.info(f"Failed formatting of timestamp column with error: {e}")
logger.info(f"Failed formatting of timestamp columns with error: {e}")

logger.info("Handling Null Type columns")
select_exprs = []
Expand Down
6 changes: 6 additions & 0 deletions terraform/account-wide-infrastructure/modules/glue/vars.tf
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,9 @@ variable "is_enabled" {
description = "Flag to enable or disable the Glue module"
default = true
}

variable "schedule" {
type = bool
description = "Flag to enable or disable the Glue schedule"
default = false
}
1 change: 1 addition & 0 deletions terraform/account-wide-infrastructure/prod/glue.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ module "prod-glue" {
is_enabled = var.enable_reporting
source = "../modules/glue"
name_prefix = "nhsd-nrlf--prod"
schedule = true
python_version = 3
}
1 change: 1 addition & 0 deletions terraform/account-wide-infrastructure/test/glue.tf
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ module "test-glue" {
is_enabled = var.enable_reporting
source = "../modules/glue"
name_prefix = "nhsd-nrlf--test"
schedule = false
python_version = 3
}