diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index b2c4e262e..e7cd3b810 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -51,7 +51,8 @@ resource "aws_glue_job" "glue_job" { "--datalake-formats" = "delta" "--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path "--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path - "--job_name" = "poc-glue-job" + "--job_name" = "${var.name_prefix}-glue-job" + "--partition_cols" = "date" "--enable-continuous-log-filter" = "true" "--enable-metrics" = "true" "--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip" diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 416cef5ef..5450da0cd 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -6,7 +6,9 @@ from transformations import dtype_conversion, flatten_df, logSchema # Get arguments from AWS Glue job -args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"]) +args = getResolvedOptions( + sys.argv, ["job_name", "source_path", "target_path", "partition_cols"] +) # Start Glue context sc = SparkContext() @@ -19,6 +21,7 @@ source_path=args["source_path"], target_path=args["target_path"], schema=logSchema, + job_name=args["job_name"], partition_cols=partition_cols, transformations=[flatten_df, dtype_conversion], ) diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index e1ba22215..2fb30ff91 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -1,3 +1,4 @@ +import boto3 from instances import GlueContextSingleton, LoggerSingleton @@ -8,6 +9,7 @@ def __init__( source_path, target_path, schema, + job_name, partition_cols=[], transformations=[], ): @@ -20,6 +22,12 @@ def __init__( self.schema = schema self.partition_cols = partition_cols self.transformations = transformations + self.glue = boto3.client( + service_name="glue", + region_name="eu-west-2", + endpoint_url="https://glue.eu-west-2.amazonaws.com", + ) + self.name_prefix = "-".join(job_name.split("-")[:4]) def run(self): """Runs ETL""" @@ -31,6 +39,8 @@ def run(self): self.logger.info("Data transformed successfully.") self.load(df) self.logger.info(f"Data loaded into {self.target_path}.") + self.logger.info("Trigger glue crawler") + self.trigger_crawler() except Exception as e: self.logger.error(f"ETL process failed: {e}") raise e @@ -57,3 +67,6 @@ def load(self, dataframe): dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet( self.target_path ) + + def trigger_crawler(self): + self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 64bb4abe6..35e2a6b45 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -1,15 +1,21 @@ -from pyspark.sql.functions import to_timestamp +from pyspark.sql.functions import ( + col, + from_unixtime, + regexp_replace, + to_date, + to_timestamp, +) from pyspark.sql.types import ( BooleanType, + DoubleType, StringType, StructField, StructType, - TimestampType, ) logSchema = StructType( [ - StructField("time", TimestampType(), True), + StructField("time", DoubleType(), True), StructField("index", StringType(), True), StructField("host", StringType(), True), StructField("source", StringType(), True), @@ -60,17 +66,35 @@ def flatten_df(df): - cols = [] - for c in df.dtypes: - if "struct" in c[1]: - nested_col = c[0] - else: - cols.append(c[0]) - return df.select(*cols, f"{nested_col}.*") + def flatten(schema, prefix=""): + """ + Recursively traverse the schema to extract all nested fields. + """ + fields = [] + for field in schema.fields: + name = f"{prefix}.{field.name}" if prefix else field.name + if isinstance(field.dataType, StructType): + fields += flatten(field.dataType, name) + else: + alias_name = name.replace(".", "_") + fields.append((name, alias_name)) + return fields + + flat_columns = flatten(df.schema) + + return df.select([col(c).alias(n) for c, n in flat_columns]) def dtype_conversion(df): - df = df.withColumn( - "timestamp", to_timestamp(df["timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX") + df = ( + df.withColumn( + "event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".") + ) + .withColumn( + "event_timestamp", + to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"), + ) + .withColumn("time", from_unixtime(col("time")).cast("timestamp")) + .withColumn("date", to_date(col("time"))) ) - return df + return df.drop("event_timestamp_cleaned")