diff --git a/terraform/account-wide-infrastructure/modules/glue/glue.tf b/terraform/account-wide-infrastructure/modules/glue/glue.tf index 20f9ac16c..e36433b5c 100644 --- a/terraform/account-wide-infrastructure/modules/glue/glue.tf +++ b/terraform/account-wide-infrastructure/modules/glue/glue.tf @@ -68,7 +68,7 @@ resource "aws_glue_job" "glue_job" { worker_type = "G.1X" timeout = 2880 max_retries = 0 - number_of_workers = 2 + number_of_workers = 4 command { name = "glueetl" python_version = var.python_version diff --git a/terraform/account-wide-infrastructure/modules/glue/iam.tf b/terraform/account-wide-infrastructure/modules/glue/iam.tf index 6d33a1b78..097fe0386 100644 --- a/terraform/account-wide-infrastructure/modules/glue/iam.tf +++ b/terraform/account-wide-infrastructure/modules/glue/iam.tf @@ -63,7 +63,6 @@ data "aws_iam_policy_document" "glue_service" { resources = [ "arn:aws:logs:*:*:*:/aws-glue/*", - # "arn:aws:logs:*:*:*:/customlogs/*" ] effect = "Allow" @@ -81,13 +80,25 @@ data "aws_iam_policy_document" "glue_service" { effect = "Allow" } + statement { + actions = [ + "cloudwatch:Get*", + "cloudwatch:List*", + "cloudwatch:Put*", + ] + resources = [ + "*" + ] + effect = "Allow" + } + statement { actions = [ "iam:PassRole", ] effect = "Allow" resources = [ - "*" + "arn:aws:iam::*:role/AWSGlueServiceRole*" ] } } diff --git a/terraform/account-wide-infrastructure/modules/glue/src/main.py b/terraform/account-wide-infrastructure/modules/glue/src/main.py index 27619bbc5..64f616b59 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/main.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/main.py @@ -1,11 +1,9 @@ import sys from awsglue.utils import getResolvedOptions -from consumer_schemas import consumerSchemaList from pipeline import LogPipeline -from producer_schemas import producerSchemaList from pyspark.context import SparkContext -from transformations import dtype_conversion, flatten_df, resolve_dupes +from transformations import dtype_conversion, rename_cols, resolve_dupes # Get arguments from AWS Glue job args = getResolvedOptions( @@ -17,17 +15,29 @@ partition_cols = args["partition_cols"].split(",") if "partition_cols" in args else [] -consumerSchemaList.update(producerSchemaList) +host_prefixes = [ + "consumer--countDocumentReference", + "consumer--searchPostDocumentReference", + "consumer--searchDocumentReference", + "consumer--readDocumentReference", + "producer--searchPostDocumentReference", + "producer--searchDocumentReference", + "producer--readDocumentReference", + "producer--upsertDocumentReference", + "producer--updateDocumentReference", + "producer--deleteDocumentReference", + "producer--createDocumentReference", +] # Initialize ETL process etl_job = LogPipeline( spark_context=sc, source_path=args["source_path"], target_path=args["target_path"], - schemas=consumerSchemaList, + host_prefixes=host_prefixes, job_name=args["job_name"], partition_cols=partition_cols, - transformations=[flatten_df, resolve_dupes, dtype_conversion], + transformations=[rename_cols, resolve_dupes, dtype_conversion], ) # Run the job diff --git a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py index 5ea40ffb8..f018911ad 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/pipeline.py @@ -2,7 +2,6 @@ import boto3 from instances import GlueContextSingleton, LoggerSingleton -from pyspark.sql.functions import col class LogPipeline: @@ -11,7 +10,7 @@ def __init__( spark_context, source_path, target_path, - schemas, + host_prefixes, job_name, partition_cols=[], transformations=[], @@ -22,7 +21,7 @@ def __init__( self.logger = LoggerSingleton().logger self.source_path = source_path self.target_path = target_path - self.schemas = schemas + self.host_prefixes = host_prefixes self.partition_cols = partition_cols self.transformations = transformations self.glue = boto3.client( @@ -37,10 +36,10 @@ def run(self): """Runs ETL""" try: self.logger.info("ETL Process started.") - data = self.extract() + data = self.extract_dynamic() self.logger.info(f"Data extracted from {self.source_path}.") for name, df in data.items(): - data[name] = self.transform(df) + data[name] = self.transform(df, name) self.logger.info("Data transformed successfully.") self.load(data) self.logger.info(f"Data loaded into {self.target_path}.") @@ -51,6 +50,7 @@ def run(self): raise e def get_last_run(self): + self.logger.info("Retrieving last successful runtime.") all_runs = self.glue.get_job_runs(JobName=self.job_name) if not all_runs["JobRuns"]: return None @@ -59,28 +59,41 @@ def get_last_run(self): if run["JobRunState"] == "SUCCEEDED": return time.mktime(run["StartedOn"].timetuple()) - def extract(self): + return None + + def extract_dynamic(self): """Extract JSON data from S3""" - self.logger.info(f"Extracting data from {self.source_path} as JSON") last_runtime = self.get_last_run() data = {} - for name, schema in self.schemas.items(): + data_source = self.glue_context.getSource("s3", paths=[self.source_path]) + data_source.setFormat("json") + self.logger.info(f"Extracting data from {self.source_path} as JSON") + for name in self.host_prefixes: if last_runtime: - data[name] = ( - self.spark.read.option("recursiveFileLookup", "true") - .schema(schema) - .json(self.source_path) - ).where((col("host").contains(name)) & (col("time") > last_runtime)) + data[name] = self.glue_context.create_dynamic_frame.from_options( + connection_type="s3", + connection_options={"paths": [self.source_path], "recurse": True}, + format="json", + ).filter( + f=lambda x, n=name: (x["host"].endswith(n)) + and (x["time"] > last_runtime) + ) + else: - data[name] = ( - self.spark.read.option("recursiveFileLookup", "true") - .schema(schema) - .json(self.source_path) - ).where(col("host").contains(name)) + data[name] = self.glue_context.create_dynamic_frame.from_options( + connection_type="s3", + connection_options={"paths": [self.source_path], "recurse": True}, + format="json", + ).filter(f=lambda x, n=name: x["host"].endswith(n)) + return data - def transform(self, dataframe): + def transform(self, dataframe, name): """Apply a list of transformations on the dataframe""" + self.spark.conf.set("spark.sql.caseSensitive", True) + dataframe = ( + dataframe.relationalize("root", f"./tmp/{name}").select("root").toDF() + ) for transformation in self.transformations: self.logger.info(f"Applying transformation: {transformation.__name__}") dataframe = transformation(dataframe) @@ -90,13 +103,13 @@ def load(self, data): """Load transformed data into Parquet format""" self.logger.info(f"Loading data into {self.target_path} as Parquet") for name, dataframe in data.items(): - if dataframe.na.drop().count() > 0: - name = name.replace("--", "_") - dataframe.write.mode("append").partitionBy( + name = name.replace("--", "_") + try: + dataframe.coalesce(1).write.mode("append").partitionBy( *self.partition_cols ).parquet(f"{self.target_path}{name}") - else: - self.logger.info(f"Dataframe {name} is null, skipping") + except: + self.logger.info(f"{name} dataframe has no rows. Skipping.") def trigger_crawler(self): self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler") diff --git a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py index 3258dfa9a..3b23d6515 100644 --- a/terraform/account-wide-infrastructure/modules/glue/src/transformations.py +++ b/terraform/account-wide-infrastructure/modules/glue/src/transformations.py @@ -2,7 +2,6 @@ coalesce, col, concat, - explode_outer, from_unixtime, lit, regexp_replace, @@ -10,7 +9,7 @@ to_timestamp, when, ) -from pyspark.sql.types import ArrayType, StructType +from pyspark.sql.types import NullType def resolve_dupes(df): @@ -33,49 +32,39 @@ def resolve_dupes(df): return df -def flatten_df(df): - complex_fields = dict( - [ - (field.name, field.dataType) - for field in df.schema.fields - if isinstance(field.dataType, ArrayType) - or isinstance(field.dataType, StructType) - ] - ) - while len(complex_fields) != 0: - col_name = list(complex_fields.keys())[0] - - if isinstance(complex_fields[col_name], StructType): - expanded = [ - col(col_name + "." + k).alias(col_name + "_" + k) - for k in [n.name for n in complex_fields[col_name]] - ] - df = df.select("*", *expanded).drop(col_name) - - elif isinstance(complex_fields[col_name], ArrayType): - df = df.withColumn(col_name, explode_outer(col_name)) - - complex_fields = dict( - [ - (field.name, field.dataType) - for field in df.schema.fields - if isinstance(field.dataType, ArrayType) - or isinstance(field.dataType, ArrayType) - ] - ) +def rename_cols(df): + for col_name in df.columns: + df = df.withColumnRenamed(col_name, col_name.replace(".", "_")) return df def dtype_conversion(df): - df = ( - df.withColumn( - "event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".") + try: + df = ( + df.withColumn( + "event_timestamp_cleaned", + regexp_replace(col("event_timestamp"), ",", "."), + ) + .withColumn( + "event_timestamp", + to_timestamp( + col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ" + ), + ) + .withColumn("time", from_unixtime(col("time")).cast("timestamp")) + .withColumn("date", to_date(col("time"))) ) - .withColumn( - "event_timestamp", - to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"), - ) - .withColumn("time", from_unixtime(col("time")).cast("timestamp")) - .withColumn("date", to_date(col("time"))) - ) - return df.drop("event_timestamp_cleaned") + + df = df.drop("event_timestamp_cleaned") + except: + ... + + select_exprs = [] + for column_name in df.columns: + column_type = df.schema[column_name].dataType + if isinstance(column_type, NullType): + select_exprs.append(col(column_name).cast("string").alias(column_name)) + else: + select_exprs.append(col(column_name)) + + return df.select(*select_exprs)