Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ resource "aws_glue_job" "glue_job" {
"--datalake-formats" = "delta"
"--source_path" = "s3://${aws_s3_bucket.source-data-bucket.id}/" # Specify the source S3 path
"--target_path" = "s3://${aws_s3_bucket.target-data-bucket.id}/logs" # Specify the destination S3 path
"--job_name" = "poc-glue-job"
"--job_name" = "${var.name_prefix}-glue-job"
"--partition_cols" = "date"
"--enable-continuous-log-filter" = "true"
"--enable-metrics" = "true"
"--extra-py-files" = "s3://${aws_s3_bucket.code-bucket.id}/src.zip"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
from transformations import dtype_conversion, flatten_df, logSchema

# Get arguments from AWS Glue job
args = getResolvedOptions(sys.argv, ["job_name", "source_path", "target_path"])
args = getResolvedOptions(
sys.argv, ["job_name", "source_path", "target_path", "partition_cols"]
)

# Start Glue context
sc = SparkContext()
Expand All @@ -19,6 +21,7 @@
source_path=args["source_path"],
target_path=args["target_path"],
schema=logSchema,
job_name=args["job_name"],
partition_cols=partition_cols,
transformations=[flatten_df, dtype_conversion],
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import boto3
from instances import GlueContextSingleton, LoggerSingleton


Expand All @@ -8,6 +9,7 @@ def __init__(
source_path,
target_path,
schema,
job_name,
partition_cols=[],
transformations=[],
):
Expand All @@ -20,6 +22,12 @@ def __init__(
self.schema = schema
self.partition_cols = partition_cols
self.transformations = transformations
self.glue = boto3.client(
service_name="glue",
region_name="eu-west-2",
endpoint_url="https://glue.eu-west-2.amazonaws.com",
)
self.name_prefix = "-".join(job_name.split("-")[:4])

def run(self):
"""Runs ETL"""
Expand All @@ -31,6 +39,8 @@ def run(self):
self.logger.info("Data transformed successfully.")
self.load(df)
self.logger.info(f"Data loaded into {self.target_path}.")
self.logger.info("Trigger glue crawler")
self.trigger_crawler()
except Exception as e:
self.logger.error(f"ETL process failed: {e}")
raise e
Expand All @@ -57,3 +67,6 @@ def load(self, dataframe):
dataframe.write.mode("append").partitionBy(*self.partition_cols).parquet(
self.target_path
)

def trigger_crawler(self):
self.glue.start_crawler(Name=f"{self.name_prefix}-log-crawler")
Original file line number Diff line number Diff line change
@@ -1,15 +1,21 @@
from pyspark.sql.functions import to_timestamp
from pyspark.sql.functions import (
col,
from_unixtime,
regexp_replace,
to_date,
to_timestamp,
)
from pyspark.sql.types import (
BooleanType,
DoubleType,
StringType,
StructField,
StructType,
TimestampType,
)

logSchema = StructType(
[
StructField("time", TimestampType(), True),
StructField("time", DoubleType(), True),
StructField("index", StringType(), True),
StructField("host", StringType(), True),
StructField("source", StringType(), True),
Expand Down Expand Up @@ -60,17 +66,35 @@


def flatten_df(df):
cols = []
for c in df.dtypes:
if "struct" in c[1]:
nested_col = c[0]
else:
cols.append(c[0])
return df.select(*cols, f"{nested_col}.*")
def flatten(schema, prefix=""):
"""
Recursively traverse the schema to extract all nested fields.
"""
fields = []
for field in schema.fields:
name = f"{prefix}.{field.name}" if prefix else field.name
if isinstance(field.dataType, StructType):
fields += flatten(field.dataType, name)
else:
alias_name = name.replace(".", "_")
fields.append((name, alias_name))
return fields

flat_columns = flatten(df.schema)

return df.select([col(c).alias(n) for c, n in flat_columns])


def dtype_conversion(df):
df = df.withColumn(
"timestamp", to_timestamp(df["timestamp"], "yyyy-MM-dd HH:mm:ss,SSSXXX")
df = (
df.withColumn(
"event_timestamp_cleaned", regexp_replace(col("event_timestamp"), ",", ".")
)
.withColumn(
"event_timestamp",
to_timestamp(col("event_timestamp_cleaned"), "yyyy-MM-dd HH:mm:ss.SSSZ"),
)
.withColumn("time", from_unixtime(col("time")).cast("timestamp"))
.withColumn("date", to_date(col("time")))
)
return df
return df.drop("event_timestamp_cleaned")