diff --git a/.github/workflows/docker-build-push.yml b/.github/workflows/docker-build-push.yml index 3544a68..87b7af1 100644 --- a/.github/workflows/docker-build-push.yml +++ b/.github/workflows/docker-build-push.yml @@ -4,6 +4,9 @@ on: push: branches: - '**' + paths: + - 'src/**' + - 'Docker/**' workflow_dispatch: jobs: diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..de045ba --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +.mypy_cache \ No newline at end of file diff --git a/Docker/Dockerfile b/Docker/Dockerfile index 5d94ed2..3206e08 100644 --- a/Docker/Dockerfile +++ b/Docker/Dockerfile @@ -32,13 +32,13 @@ RUN ln -sf ${PREFIX}/bin/python3.11 /usr/local/bin/python \ ########################################### # Stage 2: Get entrypoint from official Spark ########################################### -FROM apache/spark:3.5.6 AS spark-official +FROM apache/spark:3.5.7 AS spark-official ########################################### # Stage 3: Spark + Delta + Cloud connectors ########################################### FROM ubuntu:22.04 AS spark-base -ARG SPARK_VERSION=3.5.6 +ARG SPARK_VERSION=3.5.7 ARG HADOOP_VERSION=3 ARG DELTA_VERSION=3.2.1 ENV DEBIAN_FRONTEND=noninteractive diff --git a/src/bronze/get_full_tables.py b/src/bronze/get_full_tables.py new file mode 100644 index 0000000..a2745e9 --- /dev/null +++ b/src/bronze/get_full_tables.py @@ -0,0 +1,104 @@ +from pyspark.sql import SparkSession #type:ignore +import pyspark.sql.functions as F #type:ignore +import argparse +import os +import logging + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] +) +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored") + parser.add_argument("--undesired_column", type = str,required= True, help = " the undesired column for a table") + args = parser.parse_args() + return args + +def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession: + + spark = SparkSession.builder \ + .appName("incremental_table_ingestion") \ + .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \ + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\ + .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \ + .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \ + .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .getOrCreate() + return spark + +################################################################################### +# GET MYSQL CREDENTIALS # +################################################################################### +def main() -> None: + MYSQL_DATABASE = os.getenv("MYSQL_DATABASE") + MYSQL_HOST = os.getenv("MYSQL_HOST") + MYSQL_PORT = os.getenv("MYSQL_PORT") + MYSQL_USER = os.getenv("MYSQL_USER") + MYSQL_SECRET = os.getenv("MYSQL_SECRET") + jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}" + + + + ################################################################################### + # GET S3 CREDENTIALS # + ################################################################################### + S3_ACCESS_KEY = str(os.getenv("S3_ACCESS_KEY")) + S3_SECRET_KEY = str(os.getenv("S3_SECRET_KEY")) + S3_ENDPOINT = str(os.getenv("S3_ENDPOINT")) + + args = get_args() + S3_SAVEPATH = args.savepath + undesired_column = args.undesired_column + + TABLES = [ + "course_overviews_courseoverview", + "student_courseenrollment", + "certificates_generatedcertificate", + "student_courseaccessrole", + "auth_userprofile", + "student_userattribute", + "organizations_organization", + "auth_user" + ] + + for table in TABLES: + + logging.info(f"getting table {table}") + try: + + spark = get_spark_session(S3_ACCESS_KEY=S3_ACCESS_KEY,S3_SECRET_KEY=S3_SECRET_KEY,S3_ENDPOINT=S3_ENDPOINT) + + df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", table) \ + .load() + if table == "auth_user": + df = df.drop(undesired_column) + + df = df.withColumn("ingestion_date", F.current_timestamp()) \ + .withColumn("source_name", F.lit(table)) + if table == "auth_user" and undesired_column and undesired_column in df.columns: + raise Exception("THE undesired column stills in the dataframe") + output_path = f"{S3_SAVEPATH}/{table}" + + df.write.format("delta").mode("append").save(output_path) + + logging.info(f"Data saved as Delta table to {output_path}") + + except Exception as e: + logging.error(f"Pipeline failed: {e}") + spark.stop() + + +if __name__=="__main__": + main() \ No newline at end of file diff --git a/src/bronze/incremental_load.py b/src/bronze/incremental_load.py new file mode 100644 index 0000000..d472c95 --- /dev/null +++ b/src/bronze/incremental_load.py @@ -0,0 +1,198 @@ +from datetime import datetime +from pyspark.sql import SparkSession # type: ignore +import pyspark.sql.functions as F # type: ignore +import pyspark.sql.types as T # type: ignore +import argparse +import logging +import os +from typing import List, Union, Optional,Tuple + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] +) + + + + +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored") + parser.add_argument("--metadatapath", type = str, required = True, help ="The S3 bucket that contains stores the metada for the process") + parser.add_argument("--table", type = str, required = True, help ="The S3 bucket that contains stores the metada for the process") + parser.add_argument("--first_ingestion_flag",type = int,default=0,help="flag to indicate if it is the first ingestion on regular ingestion") + args = parser.parse_args() + return args + +def update_metadata(metadatapath: str ,spark: SparkSession,table:str,last_date:str) -> bool: + try: + tmp_df = spark.createDataFrame( + [ + (table, last_date) + ], + T.StructType([T.StructField("table_name", T.StringType(), True),T.StructField("last_date", T.StringType(), True),]),) + + tmp_df.coalesce(1).write.format("csv").mode("overwrite").save(f"{metadatapath}/{table}/last_updated_date") + return True + except Exception as e: + logging.error(f"Exception {e}") + return False + + + +def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession: + + spark = SparkSession.builder \ + .appName("incremental_table_ingestion") \ + .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \ + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\ + .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \ + .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \ + .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .getOrCreate() + return spark + + + + + + + + + +def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str) -> Tuple[bool, str]: + years = [i for i in range(2019,2100)] + months = [i for i in range(1,13)] + current_year = datetime.now().year + current_month = datetime.now().month + path = f"{savepath}/{table}" + + for year in years: + for month in months: + + if (year == 2019 and month ==1): + query = f"(SELECT * FROM {table} WHERE YEAR(created) = {year} AND MONTH(created) = {month}) AS limited_table" + logging.info(query) + df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", query) \ + .load() + + df = df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \ + .withColumn("year", F.year(F.col("created"))) \ + .withColumn("month", F.month(F.col("created"))) + + df = df.withColumn("ingestion_date", F.current_timestamp()) \ + .withColumn("source_name", F.lit(table)) + + df.write.format("delta") \ + .mode("overwrite") \ + .partitionBy("year", "month") \ + .save(path) + continue + + if (year > current_year) or (year == current_year and month > current_month): + break + else: + + query = f"(SELECT * FROM {table} WHERE YEAR(created) = {year} AND MONTH(created) = {month}) AS limited_table" + logging.info(query) + new_df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", query) \ + .load() + if (year == current_year and month == current_month): + last_update = datetime.now().isoformat() + incremental_df = new_df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \ + .withColumn("year", F.year(F.col("created"))) \ + .withColumn("month", F.month(F.col("created"))) + + incremental_df = incremental_df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table)) + + # Append new partitions directly + incremental_df.write.format("delta").mode("append").partitionBy("year", "month").save(path) + return (True, last_update) + +def get_metadata(metadatapath: str, spark: SparkSession, table:str) -> str | bool: + metadatapath = f"{metadatapath}/{table}/last_updated_date" + customSchema = T.StructType([ + T.StructField("table_name", T.StringType(), True), + T.StructField("last_date", T.StringType(), True) + ]) + row = spark.read.csv(metadatapath,schema=customSchema).filter( F.col("table_name") == table).first() + if row is None or row["last_date"] is None: + return False + return str(row["last_date"]) + +def delta_load(spark: SparkSession, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str,last_updated:str,table:str,savepath: str) -> Tuple[bool, str]: + path = f"{savepath}/{table}" + query = f"(SELECT * FROM {table} WHERE created >= '{last_updated}') AS limited_table" + logging.info(query) + new_df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", query) \ + .load() + last_update = datetime.now().isoformat() + incremental_df = new_df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \ + .withColumn("year", F.year(F.col("created"))) \ + .withColumn("month", F.month(F.col("created"))) + + incremental_df = incremental_df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table)) + + # Append new partitions directly + incremental_df.write.format("delta").mode("append").partitionBy("year", "month").save(path) + + return (True,last_update) + + +def main() -> None: + + MYSQL_DATABASE = os.getenv("MYSQL_DATABASE") + MYSQL_HOST = os.getenv("MYSQL_HOST") + MYSQL_PORT = os.getenv("MYSQL_PORT") + MYSQL_USER = str(os.getenv("MYSQL_USER")) + MYSQL_SECRET = str(os.getenv("MYSQL_SECRET")) + jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}" + + S3_ACCESS_KEY = str(os.getenv("S3_ACCESS_KEY")) + S3_SECRET_KEY = str(os.getenv("S3_SECRET_KEY")) + S3_ENDPOINT = str(os.getenv("S3_ENDPOINT")) + + args = get_args() + savepath = args.savepath + metadata = args.metadatapath + is_full_ingestion_flag = args.first_ingestion_flag + table = args.table + spark = get_spark_session(S3_ACCESS_KEY,S3_SECRET_KEY,S3_ENDPOINT) + if is_full_ingestion_flag == 1: + result = full_initial_ingestion(spark,table,savepath,jdbc_url,MYSQL_USER,MYSQL_SECRET) + logging.info(result) + if result[0]: + update_metadata(metadatapath=metadata,spark=spark,table=table,last_date=result[1]) + if is_full_ingestion_flag == 0: + last_date = get_metadata(metadatapath=metadata,spark=spark,table=table) + if last_date == False: + raise Exception("No date Found") + last_date = str(last_date) + result = delta_load(spark=spark,jdbc_url=jdbc_url,MYSQL_USER=MYSQL_USER,MYSQL_SECRET=MYSQL_SECRET,last_updated=last_date,table=table,savepath=savepath) + if result[0]: + update_metadata(metadatapath=metadata,spark=spark,table=table,last_date=result[1]) + spark.stop() + +if __name__=="__main__": + main() \ No newline at end of file diff --git a/src/tmp.txt b/src/tmp.txt deleted file mode 100644 index 1348a35..0000000 --- a/src/tmp.txt +++ /dev/null @@ -1 +0,0 @@ -temp file \ No newline at end of file