Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Log Schema Generation

The Glue script uses pyspark to process log data. Due to the structure of each json document inside of a log group differing, we need to account for this variance.

The notebook provides a way to automatically generate a pyspark schema for a log group without manual intervention. Point it at the desired group, and hit run all, then copy and paste the output into either producer_schema.py or consumer_schema.py.
6 changes: 3 additions & 3 deletions terraform/account-wide-infrastructure/modules/glue/glue.tf
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ resource "aws_glue_crawler" "log_crawler" {
path = "${aws_s3_bucket.target-data-bucket.id}/producer_updateDocumentReference/"
}
s3_target {
path = "${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference//"
path = "${aws_s3_bucket.target-data-bucket.id}/producer_upsertDocumentReference/"
}
schema_change_policy {
delete_behavior = "LOG"
Expand All @@ -64,10 +64,10 @@ resource "aws_glue_job" "glue_job" {
name = "${var.name_prefix}-glue-job"
role_arn = aws_iam_role.glue_service_role.arn
description = "Transfer logs from source to bucket"
glue_version = "4.0"
glue_version = "5.0"
worker_type = "G.1X"
timeout = 2880
max_retries = 1
max_retries = 0
number_of_workers = 2
command {
name = "glueetl"
Expand Down
10 changes: 10 additions & 0 deletions terraform/account-wide-infrastructure/modules/glue/iam.tf
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,16 @@ data "aws_iam_policy_document" "glue_service" {

effect = "Allow"
}

statement {
actions = [
"iam:PassRole",
]
effect = "Allow"
resources = [
"*"
]
}
}

resource "aws_iam_policy" "glue_service" {
Expand Down
18 changes: 18 additions & 0 deletions terraform/account-wide-infrastructure/modules/glue/locals.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
locals {
s3 = {
transition_storage = {
infrequent_access = {
storage_class = "STANDARD_IA"
days = 150
}
glacier = {
storage_class = "GLACIER"
days = 200
}
}

expiration = {
days = 1095
}
}
}
29 changes: 29 additions & 0 deletions terraform/account-wide-infrastructure/modules/glue/s3.tf
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,35 @@ resource "aws_s3_bucket_public_access_block" "source-data-bucket-public-access-b
restrict_public_buckets = true
}

resource "aws_s3_bucket_lifecycle_configuration" "source-data-bucket-lifecycle" {
bucket = aws_s3_bucket.source-data-bucket.id


rule {
id = "bucket-versioning-rule"
status = "Enabled"

transition {
days = local.s3.transition_storage.infrequent_access.days
storage_class = local.s3.transition_storage.infrequent_access.storage_class
}
transition {
days = local.s3.transition_storage.glacier.days
storage_class = local.s3.transition_storage.glacier.storage_class
}
expiration {
days = local.s3.expiration.days
}
}
}

resource "aws_s3_bucket_versioning" "source-data-bucket-versioning" {
bucket = aws_s3_bucket.source-data-bucket.id
versioning_configuration {
status = "Enabled"
}
}


# S3 Bucket for Processed Data
resource "aws_s3_bucket" "target-data-bucket" {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ class GlueContextSingleton:
def __new__(cls, spark_context):
if not cls._instance:
cls._instance = super().__new__(cls)
cls._instance.spark = SparkSession.builder.getOrCreate()
cls._instance.spark = SparkSession.builder.config(
"spark.sql.caseSensitive", "true"
).getOrCreate()
cls._instance.context = GlueContext(spark_context)
return cls._instance

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from pipeline import LogPipeline
from producer_schemas import producerSchemaList
from pyspark.context import SparkContext
from transformations import dtype_conversion, flatten_df
from transformations import dtype_conversion, flatten_df, resolve_dupes

# Get arguments from AWS Glue job
args = getResolvedOptions(
Expand All @@ -27,7 +27,7 @@
schemas=consumerSchemaList,
job_name=args["job_name"],
partition_cols=partition_cols,
transformations=[flatten_df, dtype_conversion],
transformations=[flatten_df, resolve_dupes, dtype_conversion],
)

# Run the job
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import time

import boto3
from instances import GlueContextSingleton, LoggerSingleton
from pyspark.sql.functions import col
Expand Down Expand Up @@ -28,6 +30,7 @@ def __init__(
region_name="eu-west-2",
endpoint_url="https://glue.eu-west-2.amazonaws.com",
)
self.job_name = job_name
self.name_prefix = "-".join(job_name.split("-")[:4])

def run(self):
Expand All @@ -47,16 +50,33 @@ def run(self):
self.logger.error(f"ETL process failed: {e}")
raise e

def get_last_run(self):
all_runs = self.glue.get_job_runs(JobName=self.job_name)
if not all_runs["JobRuns"]:
return None

for run in all_runs["JobRuns"]:
if run["JobRunState"] == "SUCCEEDED":
return time.mktime(run["StartedOn"].timetuple())

def extract(self):
"""Extract JSON data from S3"""
self.logger.info(f"Extracting data from {self.source_path} as JSON")
last_runtime = self.get_last_run()
data = {}
for name, schema in self.schemas.items():
data[name] = (
self.spark.read.option("recursiveFileLookup", "true")
.schema(schema)
.json(self.source_path)
).where(col("host").contains(name))
if last_runtime:
data[name] = (
self.spark.read.option("recursiveFileLookup", "true")
.schema(schema)
.json(self.source_path)
).where((col("host").contains(name)) & (col("time") > last_runtime))
else:
data[name] = (
self.spark.read.option("recursiveFileLookup", "true")
.schema(schema)
.json(self.source_path)
).where(col("host").contains(name))
return data

def transform(self, dataframe):
Expand Down
Loading
Loading