Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/docker-build-push.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ on:
push:
branches:
- '**'
paths:
- 'src/**'
- 'Docker/**'
workflow_dispatch:

jobs:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.env
.mypy_cache
4 changes: 2 additions & 2 deletions Docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ RUN ln -sf ${PREFIX}/bin/python3.11 /usr/local/bin/python \
###########################################
# Stage 2: Get entrypoint from official Spark
###########################################
FROM apache/spark:3.5.6 AS spark-official
FROM apache/spark:3.5.7 AS spark-official

###########################################
# Stage 3: Spark + Delta + Cloud connectors
###########################################
FROM ubuntu:22.04 AS spark-base
ARG SPARK_VERSION=3.5.6
ARG SPARK_VERSION=3.5.7
ARG HADOOP_VERSION=3
ARG DELTA_VERSION=3.2.1
ENV DEBIAN_FRONTEND=noninteractive
Expand Down
104 changes: 104 additions & 0 deletions src/bronze/get_full_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from pyspark.sql import SparkSession #type:ignore
import pyspark.sql.functions as F #type:ignore
import argparse
import os
import logging

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)
def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored")
parser.add_argument("--undesired_column", type = str,required= True, help = " the undesired column for a table")
args = parser.parse_args()
return args

def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession:

spark = SparkSession.builder \
.appName("incremental_table_ingestion") \
.config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
return spark

###################################################################################
# GET MYSQL CREDENTIALS #
###################################################################################
def main() -> None:
MYSQL_DATABASE = os.getenv("MYSQL_DATABASE")
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_PORT = os.getenv("MYSQL_PORT")
MYSQL_USER = os.getenv("MYSQL_USER")
MYSQL_SECRET = os.getenv("MYSQL_SECRET")
jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"



###################################################################################
# GET S3 CREDENTIALS #
###################################################################################
S3_ACCESS_KEY = str(os.getenv("S3_ACCESS_KEY"))
S3_SECRET_KEY = str(os.getenv("S3_SECRET_KEY"))
S3_ENDPOINT = str(os.getenv("S3_ENDPOINT"))

args = get_args()
S3_SAVEPATH = args.savepath
undesired_column = args.undesired_column

TABLES = [
"course_overviews_courseoverview",
"student_courseenrollment",
"certificates_generatedcertificate",
"student_courseaccessrole",
"auth_userprofile",
"student_userattribute",
"organizations_organization",
"auth_user"
]

for table in TABLES:

logging.info(f"getting table {table}")
try:

spark = get_spark_session(S3_ACCESS_KEY=S3_ACCESS_KEY,S3_SECRET_KEY=S3_SECRET_KEY,S3_ENDPOINT=S3_ENDPOINT)

df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("user", MYSQL_USER) \
.option("password", MYSQL_SECRET) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", table) \
.load()
if table == "auth_user":
df = df.drop(undesired_column)

df = df.withColumn("ingestion_date", F.current_timestamp()) \
.withColumn("source_name", F.lit(table))
if table == "auth_user" and undesired_column and undesired_column in df.columns:
raise Exception("THE undesired column stills in the dataframe")
output_path = f"{S3_SAVEPATH}/{table}"

df.write.format("delta").mode("append").save(output_path)

logging.info(f"Data saved as Delta table to {output_path}")

except Exception as e:
logging.error(f"Pipeline failed: {e}")
spark.stop()


if __name__=="__main__":
main()
198 changes: 198 additions & 0 deletions src/bronze/incremental_load.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
from datetime import datetime
from pyspark.sql import SparkSession # type: ignore
import pyspark.sql.functions as F # type: ignore
import pyspark.sql.types as T # type: ignore
import argparse
import logging
import os
from typing import List, Union, Optional,Tuple

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler()
]
)




def get_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored")
parser.add_argument("--metadatapath", type = str, required = True, help ="The S3 bucket that contains stores the metada for the process")
parser.add_argument("--table", type = str, required = True, help ="The S3 bucket that contains stores the metada for the process")
parser.add_argument("--first_ingestion_flag",type = int,default=0,help="flag to indicate if it is the first ingestion on regular ingestion")
args = parser.parse_args()
return args

def update_metadata(metadatapath: str ,spark: SparkSession,table:str,last_date:str) -> bool:
try:
tmp_df = spark.createDataFrame(
[
(table, last_date)
],
T.StructType([T.StructField("table_name", T.StringType(), True),T.StructField("last_date", T.StringType(), True),]),)

tmp_df.coalesce(1).write.format("csv").mode("overwrite").save(f"{metadatapath}/{table}/last_updated_date")
return True
except Exception as e:
logging.error(f"Exception {e}")
return False



def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession:

spark = SparkSession.builder \
.appName("incremental_table_ingestion") \
.config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\
.config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \
.config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \
.config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \
.config("spark.hadoop.fs.s3a.path.style.access", "true") \
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.getOrCreate()
return spark









def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str) -> Tuple[bool, str]:
years = [i for i in range(2019,2100)]
months = [i for i in range(1,13)]
current_year = datetime.now().year
current_month = datetime.now().month
path = f"{savepath}/{table}"

for year in years:
for month in months:

if (year == 2019 and month ==1):
query = f"(SELECT * FROM {table} WHERE YEAR(created) = {year} AND MONTH(created) = {month}) AS limited_table"
logging.info(query)
df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("user", MYSQL_USER) \
.option("password", MYSQL_SECRET) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", query) \
.load()

df = df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \
.withColumn("year", F.year(F.col("created"))) \
.withColumn("month", F.month(F.col("created")))

df = df.withColumn("ingestion_date", F.current_timestamp()) \
.withColumn("source_name", F.lit(table))

df.write.format("delta") \
.mode("overwrite") \
.partitionBy("year", "month") \
.save(path)
continue

if (year > current_year) or (year == current_year and month > current_month):
break
else:

query = f"(SELECT * FROM {table} WHERE YEAR(created) = {year} AND MONTH(created) = {month}) AS limited_table"
logging.info(query)
new_df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("user", MYSQL_USER) \
.option("password", MYSQL_SECRET) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", query) \
.load()
if (year == current_year and month == current_month):
last_update = datetime.now().isoformat()
incremental_df = new_df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \
.withColumn("year", F.year(F.col("created"))) \
.withColumn("month", F.month(F.col("created")))

incremental_df = incremental_df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table))

# Append new partitions directly
incremental_df.write.format("delta").mode("append").partitionBy("year", "month").save(path)
return (True, last_update)

def get_metadata(metadatapath: str, spark: SparkSession, table:str) -> str | bool:
metadatapath = f"{metadatapath}/{table}/last_updated_date"
customSchema = T.StructType([
T.StructField("table_name", T.StringType(), True),
T.StructField("last_date", T.StringType(), True)
])
row = spark.read.csv(metadatapath,schema=customSchema).filter( F.col("table_name") == table).first()
if row is None or row["last_date"] is None:
return False
return str(row["last_date"])

def delta_load(spark: SparkSession, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str,last_updated:str,table:str,savepath: str) -> Tuple[bool, str]:
path = f"{savepath}/{table}"
query = f"(SELECT * FROM {table} WHERE created >= '{last_updated}') AS limited_table"
logging.info(query)
new_df = spark.read.format("jdbc") \
.option("url", jdbc_url) \
.option("user", MYSQL_USER) \
.option("password", MYSQL_SECRET) \
.option("driver", "com.mysql.cj.jdbc.Driver") \
.option("dbtable", query) \
.load()
last_update = datetime.now().isoformat()
incremental_df = new_df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \
.withColumn("year", F.year(F.col("created"))) \
.withColumn("month", F.month(F.col("created")))

incremental_df = incremental_df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table))

# Append new partitions directly
incremental_df.write.format("delta").mode("append").partitionBy("year", "month").save(path)

return (True,last_update)


def main() -> None:

MYSQL_DATABASE = os.getenv("MYSQL_DATABASE")
MYSQL_HOST = os.getenv("MYSQL_HOST")
MYSQL_PORT = os.getenv("MYSQL_PORT")
MYSQL_USER = str(os.getenv("MYSQL_USER"))
MYSQL_SECRET = str(os.getenv("MYSQL_SECRET"))
jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}"

S3_ACCESS_KEY = str(os.getenv("S3_ACCESS_KEY"))
S3_SECRET_KEY = str(os.getenv("S3_SECRET_KEY"))
S3_ENDPOINT = str(os.getenv("S3_ENDPOINT"))

args = get_args()
savepath = args.savepath
metadata = args.metadatapath
is_full_ingestion_flag = args.first_ingestion_flag
table = args.table
spark = get_spark_session(S3_ACCESS_KEY,S3_SECRET_KEY,S3_ENDPOINT)
if is_full_ingestion_flag == 1:
result = full_initial_ingestion(spark,table,savepath,jdbc_url,MYSQL_USER,MYSQL_SECRET)
logging.info(result)
if result[0]:
update_metadata(metadatapath=metadata,spark=spark,table=table,last_date=result[1])
if is_full_ingestion_flag == 0:
last_date = get_metadata(metadatapath=metadata,spark=spark,table=table)
if last_date == False:
raise Exception("No date Found")
last_date = str(last_date)
result = delta_load(spark=spark,jdbc_url=jdbc_url,MYSQL_USER=MYSQL_USER,MYSQL_SECRET=MYSQL_SECRET,last_updated=last_date,table=table,savepath=savepath)
if result[0]:
update_metadata(metadatapath=metadata,spark=spark,table=table,last_date=result[1])
spark.stop()

if __name__=="__main__":
main()
1 change: 0 additions & 1 deletion src/tmp.txt

This file was deleted.