diff --git a/Glue/etl.py b/Glue/etl.py index 8b13789..014b61a 100644 --- a/Glue/etl.py +++ b/Glue/etl.py @@ -1 +1,51 @@ +import sys +from awsglue.transforms import * +from awsglue.utils import getResolvedOptions +from pyspark.context import SparkContext +from pyspark.sql import SparkSession +from pyspark.sql.functions import col, avg, date_format +from pyspark.sql.window import Window +from awsglue.context import GlueContext +from pyspark.sql.functions import concat +from pyspark.sql.functions import lit + +# Create Spark and Glue contexts +sc = SparkContext() +spark = SparkSession.builder.getOrCreate() +glueContext = GlueContext(sparkContext=sc, sparkSession=spark) + +# Extract data from tables using Glue catalog +branch = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_branch") +bank = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_bank") +loans = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_loans") +client = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_client") +account = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_account") + +# Convert DynamicFrames to DataFrames +branch_df = branch.toDF() +bank_df = bank.toDF() +loans_df = loans.toDF() +client_df = client.toDF() +account_df = account.toDF() + +# Join tables +joined_df = branch_df.join(bank_df, branch_df["idbank_bank"] == bank_df["idbank"]) \ + .join(client_df, client_df["idbranch_branch"] == branch_df["idbranch"]) \ + .join(account_df, (account_df["idclient_client"] == client_df["idclient"])) \ + .join(loans_df, loans_df["idaccount_account"] == account_df["idaccount"]) \ + .select(bank_df["name"].alias("bank_name"),loans_df["amount"],loans_df["loan_data"]) + +# Calculate moving average of loan amounts per branch over the last three months +window_spec = Window.partitionBy(joined_df["bank_name"]).orderBy(joined_df["loan_data"].desc()).rowsBetween(-2, 0) +loan_avg_df = joined_df.withColumn("moving_avg", avg(col("amount")).over(window_spec)) + +# Extract Bank Name, Year, and Month from the loan_date column +loan_avg_df = loan_avg_df.withColumn("year_month", date_format(col("loan_data"), "yyyyMM")) + +# Write separate monthly output files partitioned by Bank Name, Year, and Month +output_path = "s3://coding-interview-bucket-riaan-annandale/output-folder/" +loan_avg_df.withColumn("file_name", concat(loan_avg_df["bank_name"], lit("_"), loan_avg_df["year_month"], lit(".csv"))) \ + .write.partitionBy("bank_name", "year_month") \ + .mode("append") \ + .csv(output_path, header=True) diff --git a/SQL/README.md b/SQL/README.md new file mode 100644 index 0000000..7e1c093 --- /dev/null +++ b/SQL/README.md @@ -0,0 +1,3 @@ +# SQL Scripts + +I have added some dummy data in this directory in order to seed the aurora database. I have left them in here for completeness \ No newline at end of file diff --git a/SQL/ddl.sql b/SQL/ddl.sql new file mode 100644 index 0000000..ff98cb8 --- /dev/null +++ b/SQL/ddl.sql @@ -0,0 +1,116 @@ +-- Database generated with pgModeler (PostgreSQL Database Modeler). +-- pgModeler version: 1.0.3 +-- PostgreSQL version: 15.0 +-- Project Site: pgmodeler.io +-- Model Author: --- + +-- Database creation must be performed outside a multi lined SQL file. +-- These commands were put in this file only as a convenience. +-- +-- object: mydatabase | type: DATABASE -- +-- DROP DATABASE IF EXISTS mydatabase; +-- CREATE DATABASE mydatabase; +-- ddl-end -- + + +-- object: public.bank | type: TABLE -- +-- DROP TABLE IF EXISTS public.bank; +CREATE TABLE public.bank ( + idbank integer NOT NULL GENERATED ALWAYS AS IDENTITY , + name varchar(100), + capitalization integer, + CONSTRAINT bank_pk PRIMARY KEY (idbank) +) +TABLESPACE pg_default; +-- ddl-end -- +ALTER TABLE public.bank OWNER TO postgres; +-- ddl-end -- + +-- object: public.branch | type: TABLE -- +-- DROP TABLE IF EXISTS public.branch; +CREATE TABLE public.branch ( + idbranch integer NOT NULL GENERATED ALWAYS AS IDENTITY , + address varchar(255), + idbank_bank integer, + CONSTRAINT branch_pk PRIMARY KEY (idbranch) +) +TABLESPACE pg_default; +-- ddl-end -- +ALTER TABLE public.branch OWNER TO postgres; +-- ddl-end -- + +-- object: bank_fk | type: CONSTRAINT -- +-- ALTER TABLE public.branch DROP CONSTRAINT IF EXISTS bank_fk CASCADE; +ALTER TABLE public.branch ADD CONSTRAINT bank_fk FOREIGN KEY (idbank_bank) +REFERENCES public.bank (idbank) MATCH FULL +ON DELETE SET NULL ON UPDATE CASCADE; +-- ddl-end -- + +-- object: public.client | type: TABLE -- +-- DROP TABLE IF EXISTS public.client; +CREATE TABLE public.client ( + idclient integer NOT NULL GENERATED ALWAYS AS IDENTITY , + name varchar(50), + surname varchar(50), + idbranch_branch integer, + CONSTRAINT client_pk PRIMARY KEY (idclient) +) +TABLESPACE pg_default; +-- ddl-end -- +ALTER TABLE public.client OWNER TO postgres; +-- ddl-end -- + +-- object: branch_fk | type: CONSTRAINT -- +-- ALTER TABLE public.client DROP CONSTRAINT IF EXISTS branch_fk CASCADE; +ALTER TABLE public.client ADD CONSTRAINT branch_fk FOREIGN KEY (idbranch_branch) +REFERENCES public.branch (idbranch) MATCH FULL +ON DELETE SET NULL ON UPDATE CASCADE; +-- ddl-end -- + +-- object: public.account | type: TABLE -- +-- DROP TABLE IF EXISTS public.account; +CREATE TABLE public.account ( + idaccount integer NOT NULL GENERATED ALWAYS AS IDENTITY , + balance integer, + open_data date, + idclient_client integer, + CONSTRAINT account_pk PRIMARY KEY (idaccount) +) +TABLESPACE pg_default; +-- ddl-end -- +ALTER TABLE public.account OWNER TO postgres; +-- ddl-end -- + +-- object: public.loans | type: TABLE -- +-- DROP TABLE IF EXISTS public.loans; +CREATE TABLE public.loans ( + idloan integer NOT NULL GENERATED ALWAYS AS IDENTITY , + amount integer, + loan_data date, + idaccount_account integer, + CONSTRAINT loans_pk PRIMARY KEY (idloan) +) +TABLESPACE pg_default; +-- ddl-end -- +ALTER TABLE public.loans OWNER TO postgres; +-- ddl-end -- + +-- object: client_fk | type: CONSTRAINT -- +-- ALTER TABLE public.account DROP CONSTRAINT IF EXISTS client_fk CASCADE; +ALTER TABLE public.account ADD CONSTRAINT client_fk FOREIGN KEY (idclient_client) +REFERENCES public.client (idclient) MATCH FULL +ON DELETE SET NULL ON UPDATE CASCADE; +-- ddl-end -- + +-- object: account_uq | type: CONSTRAINT -- +-- ALTER TABLE public.account DROP CONSTRAINT IF EXISTS account_uq CASCADE; +ALTER TABLE public.account ADD CONSTRAINT account_uq UNIQUE (idclient_client); +-- ddl-end -- + +-- object: account_fk | type: CONSTRAINT -- +-- ALTER TABLE public.loans DROP CONSTRAINT IF EXISTS account_fk CASCADE; +ALTER TABLE public.loans ADD CONSTRAINT account_fk FOREIGN KEY (idaccount_account) +REFERENCES public.account (idaccount) MATCH FULL +ON DELETE SET NULL ON UPDATE CASCADE; +-- ddl-end -- + diff --git a/SQL/insert.sql b/SQL/insert.sql new file mode 100644 index 0000000..3474bd6 --- /dev/null +++ b/SQL/insert.sql @@ -0,0 +1,60 @@ +-- Insert statements for Bank table +INSERT INTO public.bank ("name", "capitalization") VALUES + ('Bank A', 100000), + ('Bank B', 200000), + ('Bank C', 150000), + ('Bank D', 180000), + ('Bank E', 120000); + +-- Insert statements for Branch table +INSERT INTO public.branch ("address", "idbank_bank") VALUES + ('Branch A1', 1), + ('Branch A2', 1), + ('Branch B1', 2), + ('Branch B2', 2), + ('Branch C1', 3), + ('Branch C2', 3), + ('Branch D1', 4), + ('Branch D2', 4), + ('Branch E1', 5), + ('Branch E2', 5); + +-- Insert statements for Client table +INSERT INTO public.client ("name", "surname", "idbranch_branch") VALUES + ('John', 'Doe', 1), + ('Jane', 'Smith', 1), + ('Michael', 'Johnson', 2), + ('Sarah', 'Williams', 2), + ('David', 'Brown', 3), + ('Emily', 'Davis', 3), + ('Daniel', 'Miller', 4), + ('Olivia', 'Taylor', 4), + ('Alexander', 'Anderson', 5), + ('Sophia', 'Moore', 5); + +-- Insert statements for Account table +INSERT INTO public.account ("balance", "open_data", "idclient_client") VALUES + (10000, '2023-01-01', 1), + (20000, '2023-01-01', 2), + (15000, '2023-02-01', 3), + (25000, '2023-02-01', 4), + (12000, '2023-03-01', 5), + (18000, '2023-03-01', 6), + (22000, '2023-04-01', 7), + (16000, '2023-04-01', 8), + (19000, '2023-05-01', 9), + (21000, '2023-05-01', 10); + +-- Insert statements for Loans table +INSERT INTO public.loans ("amount", "loan_date", "idaccount_account") VALUES + (5000, '2023-01-15', 1), + (8000, '2023-01-20', 2), + (6000, '2023-02-10', 3), + (7000, '2023-02-25', 4), + (4000, '2023-03-05', 5), + (9000, '2023-03-15', 6), + (3500, '2023-04-07', 7), + (5500, '2023-04-25', 8), + (7500, '2023-05-10', 9), + (6500, '2023-05-22', 10); + diff --git a/Terraform/main.tf b/Terraform/main.tf index b6d828f..88f8611 100644 --- a/Terraform/main.tf +++ b/Terraform/main.tf @@ -1,12 +1,403 @@ +#af-south-1 cuz local is lekker +provider "aws" { + region = "af-south-1" +} + +data "aws_vpc" "main" { + id = "vpc-c322c7aa" # my personal VPC +} + + +variable "email_endpoint" { + description = "Email address to receive Glue job failure notifications" + type = string + default = "riaana@mundane.co.za" # my personal email +} + +# Some extra vars in order to meet the spec +variable "client_aurora_endpoint" { + description = "The client's aurora cluster" + type = string + default = "mycluster.cluster-123456789012.us-east-1.rds.amazonaws.com" +} + +################## +# Aurora Cluster # +################## +# I know I didn't have to make an aurora cluster of my own, but if I don't, how will I know if my etl script works? +# I have left the terraform pertaining to the cluster in here. But I have commented it out, because we're using the client cluster +# as part of the scope of work + + +# Let's rather use a secret that's not in the code... +# right now it contains something else, but to make this "work" with the client's aurora cluster, +# this secret would have to get updated to contain the value `5Y67bg#r#` +data "aws_secretsmanager_secret" "aurora_master_password" { + name = "aurora_master_password" +} + + + +# This was used to instantiate the passwords. But the above stanza has been changed to "data" so that I don't have to worry about +# destroying it all the time (by accident mostly...) + +# How we made up the password +resource "random_password" "aurora_master_password" { + length = 16 + special = true + override_special = "!@#$%^&*()_+-=" +} + +resource "aws_secretsmanager_secret_version" "aurora_master_password" { + secret_id = data.aws_secretsmanager_secret.aurora_master_password.id + secret_string = random_password.aurora_master_password.result +} + +# # TODO: +# # The security group(s) need some more thought/ splitting up. +resource "aws_security_group" "aurora_sg" { + name = "aurora-access" + description = "Allow access to Aurora from specific IP" + + ingress { + from_port = 5432 + to_port = 5432 + protocol = "tcp" + cidr_blocks = ["196.209.244.241/32"] + description = "Access from Riaan Home" + } + ingress { + from_port = 0 + to_port = 65535 + protocol = "tcp" + cidr_blocks = ["0.0.0.0/0"] + description = "All TCP" + } + egress { + from_port = 0 + to_port = 0 + protocol = "-1" + cidr_blocks = ["0.0.0.0/0"] + ipv6_cidr_blocks = ["::/0"] + } +} + + +# resource "aws_rds_cluster" "aurora_cluster" { +# cluster_identifier = "coding-interview-aurora" +# engine = "aurora-postgresql" +# engine_mode = "provisioned" +# engine_version = "13.10" #Interestingly, the glue crawler does not like 14.6... +# availability_zones = ["af-south-1a","af-south-1b","af-south-1c"] +# database_name = "mydatabase" +# master_username = "postgres" +# master_password = aws_secretsmanager_secret_version.aurora_master_password.secret_string +# preferred_backup_window = "07:00-09:00" +# backup_retention_period = 1 +# preferred_maintenance_window = "mon:03:00-mon:04:00" +# deletion_protection = false +# skip_final_snapshot = true +# vpc_security_group_ids = [aws_security_group.aurora_sg.id] + +# serverlessv2_scaling_configuration { +# max_capacity = 1.0 +# min_capacity = 0.5 +# } +# } + + +# resource "aws_rds_cluster_instance" "aurora_instance" { +# cluster_identifier = aws_rds_cluster.aurora_cluster.id +# publicly_accessible = true +# instance_class = "db.t3.medium" +# engine = aws_rds_cluster.aurora_cluster.engine +# engine_version = aws_rds_cluster.aurora_cluster.engine_version +# identifier = "coding-interview-aurora-instance" +# } + + ################## # Glue Catalog # ################## +resource "aws_glue_catalog_database" "coding_interview_database" { + name = "coding_interview_glue_database" +} + +resource "aws_glue_connection" "aurora_connection" { + name = "coding_interview_aurora_connection" + physical_connection_requirements { + availability_zone = "af-south-1a" + security_group_id_list = [aws_security_group.aurora_sg.id] # this SG is still configured as if it's _our_ aurora cluster. But the values are pretty similar + subnet_id = "subnet-10d93c79" + } + connection_properties = { + USERNAME = "postgres", + PASSWORD = "${aws_secretsmanager_secret_version.aurora_master_password.secret_string}", + #JDBC_CONNECTION_URL = "jdbc:postgresql://${aws_rds_cluster.aurora_cluster.endpoint}:5432/mydatabase", + JDBC_CONNECTION_URL = "jdbc:postgresql://${var.client_aurora_endpoint}:5432/mydatabase", + #The requirement doesn't explicitly state the database name, so I'm just leaving it as 'mydatabase' + } + +} ################## # Glue Crawler # ################## +resource "aws_glue_crawler" "crawler" { + name = "my_glue_crawler" + database_name = aws_glue_catalog_database.coding_interview_database.name + role = aws_iam_role.crawler_role.arn + + configuration = <