Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions Glue/etl.py
Original file line number Diff line number Diff line change
@@ -1 +1,51 @@
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, avg, date_format
from pyspark.sql.window import Window
from awsglue.context import GlueContext
from pyspark.sql.functions import concat
from pyspark.sql.functions import lit


# Create Spark and Glue contexts
sc = SparkContext()
spark = SparkSession.builder.getOrCreate()
glueContext = GlueContext(sparkContext=sc, sparkSession=spark)

# Extract data from tables using Glue catalog
branch = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_branch")
bank = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_bank")
loans = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_loans")
client = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_client")
account = glueContext.create_dynamic_frame.from_catalog(database="coding_interview_glue_database", table_name="mydatabase_public_account")

# Convert DynamicFrames to DataFrames
branch_df = branch.toDF()
bank_df = bank.toDF()
loans_df = loans.toDF()
client_df = client.toDF()
account_df = account.toDF()

# Join tables
joined_df = branch_df.join(bank_df, branch_df["idbank_bank"] == bank_df["idbank"]) \
.join(client_df, client_df["idbranch_branch"] == branch_df["idbranch"]) \
.join(account_df, (account_df["idclient_client"] == client_df["idclient"])) \
.join(loans_df, loans_df["idaccount_account"] == account_df["idaccount"]) \
.select(bank_df["name"].alias("bank_name"),loans_df["amount"],loans_df["loan_data"])

# Calculate moving average of loan amounts per branch over the last three months
window_spec = Window.partitionBy(joined_df["bank_name"]).orderBy(joined_df["loan_data"].desc()).rowsBetween(-2, 0)
loan_avg_df = joined_df.withColumn("moving_avg", avg(col("amount")).over(window_spec))

# Extract Bank Name, Year, and Month from the loan_date column
loan_avg_df = loan_avg_df.withColumn("year_month", date_format(col("loan_data"), "yyyyMM"))

# Write separate monthly output files partitioned by Bank Name, Year, and Month
output_path = "s3://coding-interview-bucket-riaan-annandale/output-folder/"
loan_avg_df.withColumn("file_name", concat(loan_avg_df["bank_name"], lit("_"), loan_avg_df["year_month"], lit(".csv"))) \
.write.partitionBy("bank_name", "year_month") \
.mode("append") \
.csv(output_path, header=True)
3 changes: 3 additions & 0 deletions SQL/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# SQL Scripts

I have added some dummy data in this directory in order to seed the aurora database. I have left them in here for completeness
116 changes: 116 additions & 0 deletions SQL/ddl.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
-- Database generated with pgModeler (PostgreSQL Database Modeler).
-- pgModeler version: 1.0.3
-- PostgreSQL version: 15.0
-- Project Site: pgmodeler.io
-- Model Author: ---

-- Database creation must be performed outside a multi lined SQL file.
-- These commands were put in this file only as a convenience.
--
-- object: mydatabase | type: DATABASE --
-- DROP DATABASE IF EXISTS mydatabase;
-- CREATE DATABASE mydatabase;
-- ddl-end --


-- object: public.bank | type: TABLE --
-- DROP TABLE IF EXISTS public.bank;
CREATE TABLE public.bank (
idbank integer NOT NULL GENERATED ALWAYS AS IDENTITY ,
name varchar(100),
capitalization integer,
CONSTRAINT bank_pk PRIMARY KEY (idbank)
)
TABLESPACE pg_default;
-- ddl-end --
ALTER TABLE public.bank OWNER TO postgres;
-- ddl-end --

-- object: public.branch | type: TABLE --
-- DROP TABLE IF EXISTS public.branch;
CREATE TABLE public.branch (
idbranch integer NOT NULL GENERATED ALWAYS AS IDENTITY ,
address varchar(255),
idbank_bank integer,
CONSTRAINT branch_pk PRIMARY KEY (idbranch)
)
TABLESPACE pg_default;
-- ddl-end --
ALTER TABLE public.branch OWNER TO postgres;
-- ddl-end --

-- object: bank_fk | type: CONSTRAINT --
-- ALTER TABLE public.branch DROP CONSTRAINT IF EXISTS bank_fk CASCADE;
ALTER TABLE public.branch ADD CONSTRAINT bank_fk FOREIGN KEY (idbank_bank)
REFERENCES public.bank (idbank) MATCH FULL
ON DELETE SET NULL ON UPDATE CASCADE;
-- ddl-end --

-- object: public.client | type: TABLE --
-- DROP TABLE IF EXISTS public.client;
CREATE TABLE public.client (
idclient integer NOT NULL GENERATED ALWAYS AS IDENTITY ,
name varchar(50),
surname varchar(50),
idbranch_branch integer,
CONSTRAINT client_pk PRIMARY KEY (idclient)
)
TABLESPACE pg_default;
-- ddl-end --
ALTER TABLE public.client OWNER TO postgres;
-- ddl-end --

-- object: branch_fk | type: CONSTRAINT --
-- ALTER TABLE public.client DROP CONSTRAINT IF EXISTS branch_fk CASCADE;
ALTER TABLE public.client ADD CONSTRAINT branch_fk FOREIGN KEY (idbranch_branch)
REFERENCES public.branch (idbranch) MATCH FULL
ON DELETE SET NULL ON UPDATE CASCADE;
-- ddl-end --

-- object: public.account | type: TABLE --
-- DROP TABLE IF EXISTS public.account;
CREATE TABLE public.account (
idaccount integer NOT NULL GENERATED ALWAYS AS IDENTITY ,
balance integer,
open_data date,
idclient_client integer,
CONSTRAINT account_pk PRIMARY KEY (idaccount)
)
TABLESPACE pg_default;
-- ddl-end --
ALTER TABLE public.account OWNER TO postgres;
-- ddl-end --

-- object: public.loans | type: TABLE --
-- DROP TABLE IF EXISTS public.loans;
CREATE TABLE public.loans (
idloan integer NOT NULL GENERATED ALWAYS AS IDENTITY ,
amount integer,
loan_data date,
idaccount_account integer,
CONSTRAINT loans_pk PRIMARY KEY (idloan)
)
TABLESPACE pg_default;
-- ddl-end --
ALTER TABLE public.loans OWNER TO postgres;
-- ddl-end --

-- object: client_fk | type: CONSTRAINT --
-- ALTER TABLE public.account DROP CONSTRAINT IF EXISTS client_fk CASCADE;
ALTER TABLE public.account ADD CONSTRAINT client_fk FOREIGN KEY (idclient_client)
REFERENCES public.client (idclient) MATCH FULL
ON DELETE SET NULL ON UPDATE CASCADE;
-- ddl-end --

-- object: account_uq | type: CONSTRAINT --
-- ALTER TABLE public.account DROP CONSTRAINT IF EXISTS account_uq CASCADE;
ALTER TABLE public.account ADD CONSTRAINT account_uq UNIQUE (idclient_client);
-- ddl-end --

-- object: account_fk | type: CONSTRAINT --
-- ALTER TABLE public.loans DROP CONSTRAINT IF EXISTS account_fk CASCADE;
ALTER TABLE public.loans ADD CONSTRAINT account_fk FOREIGN KEY (idaccount_account)
REFERENCES public.account (idaccount) MATCH FULL
ON DELETE SET NULL ON UPDATE CASCADE;
-- ddl-end --

60 changes: 60 additions & 0 deletions SQL/insert.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
-- Insert statements for Bank table
INSERT INTO public.bank ("name", "capitalization") VALUES
('Bank A', 100000),
('Bank B', 200000),
('Bank C', 150000),
('Bank D', 180000),
('Bank E', 120000);

-- Insert statements for Branch table
INSERT INTO public.branch ("address", "idbank_bank") VALUES
('Branch A1', 1),
('Branch A2', 1),
('Branch B1', 2),
('Branch B2', 2),
('Branch C1', 3),
('Branch C2', 3),
('Branch D1', 4),
('Branch D2', 4),
('Branch E1', 5),
('Branch E2', 5);

-- Insert statements for Client table
INSERT INTO public.client ("name", "surname", "idbranch_branch") VALUES
('John', 'Doe', 1),
('Jane', 'Smith', 1),
('Michael', 'Johnson', 2),
('Sarah', 'Williams', 2),
('David', 'Brown', 3),
('Emily', 'Davis', 3),
('Daniel', 'Miller', 4),
('Olivia', 'Taylor', 4),
('Alexander', 'Anderson', 5),
('Sophia', 'Moore', 5);

-- Insert statements for Account table
INSERT INTO public.account ("balance", "open_data", "idclient_client") VALUES
(10000, '2023-01-01', 1),
(20000, '2023-01-01', 2),
(15000, '2023-02-01', 3),
(25000, '2023-02-01', 4),
(12000, '2023-03-01', 5),
(18000, '2023-03-01', 6),
(22000, '2023-04-01', 7),
(16000, '2023-04-01', 8),
(19000, '2023-05-01', 9),
(21000, '2023-05-01', 10);

-- Insert statements for Loans table
INSERT INTO public.loans ("amount", "loan_date", "idaccount_account") VALUES
(5000, '2023-01-15', 1),
(8000, '2023-01-20', 2),
(6000, '2023-02-10', 3),
(7000, '2023-02-25', 4),
(4000, '2023-03-05', 5),
(9000, '2023-03-15', 6),
(3500, '2023-04-07', 7),
(5500, '2023-04-25', 8),
(7500, '2023-05-10', 9),
(6500, '2023-05-22', 10);

Loading