From c0e4a83259cf41d92a1c6db4f472fed0b14adf33 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Souza=20Pi=C3=B1a?= Date: Mon, 3 Nov 2025 16:03:07 +0000 Subject: [PATCH 1/5] adding the ex script to test the docker image --- src/test.py | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 src/test.py diff --git a/src/test.py b/src/test.py new file mode 100644 index 0000000..bcd7921 --- /dev/null +++ b/src/test.py @@ -0,0 +1,6 @@ +from pyspark.sql import SparkSession + +spark = SparkSession.builder.appName("test").getOrCreate() +df = spark.range(1, 101) +print(f"Count: {df.count()}") +spark.stop() From 9cdce210fc1b1cf1bec95fe8c915dd09a4aaf553 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Souza=20Pi=C3=B1a?= Date: Mon, 10 Nov 2025 14:18:10 +0000 Subject: [PATCH 2/5] features:adding ingestion script from full tables --- src/bronze/get_full_tables.py | 73 +++++++++++++++++++++++++++++++++++ src/test.py | 6 --- src/tmp.txt | 1 - 3 files changed, 73 insertions(+), 7 deletions(-) create mode 100644 src/bronze/get_full_tables.py delete mode 100644 src/test.py delete mode 100644 src/tmp.txt diff --git a/src/bronze/get_full_tables.py b/src/bronze/get_full_tables.py new file mode 100644 index 0000000..387d5e3 --- /dev/null +++ b/src/bronze/get_full_tables.py @@ -0,0 +1,73 @@ +from pyspark.sql import SparkSession +import pyspark.sql.functions as F +import os +import logging + +logging.basicConfig(level=logging.INFO) +################################################################################### +# GET MYSQL CREDENTIALS # +################################################################################### + +MYSQL_DATABASE = os.getenv("MYSQL_DATABASE") +MYSQL_HOST = os.getenv("MYSQL_HOST") +MYSQL_PORT = os.getenv("MYSQL_PORT") +MYSQL_USER = os.getenv("MYSQL_USER") +MYSQL_SECRET = os.getenv("MYSQL_SECRET") +jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}" +SPARK_HOME = os.environ.get("SPARK_HOME", "/opt/spark") + + +################################################################################### +# GET S3 CREDENTIALS # +################################################################################### +S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") +S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") +S3_ENDPOINT = os.getenv("S3_ENDPOINT") +S3_SAVEPATH = os.getenv("S3_SAVEPATH") + +TABLES = [ +"course_overviews_courseoverview", +"student_courseenrollment", +"certificates_generatedcertificate", +"student_courseaccessrole"] + +for table in TABLES: + logging.info(f"getting table {table}") + try: + + spark = SparkSession.builder \ + .appName("MyApp") \ + .config("spark.jars", ",".join([ + f"{SPARK_HOME}/jars/hadoop-aws-3.3.4.jar", + f"{SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.375.jar", + f"{SPARK_HOME}/jars/delta-spark_2.12-3.2.1.jar", + f"{SPARK_HOME}/jars/delta-storage-3.2.1.jar", + f"{SPARK_HOME}/jars/delta-kernel-api-3.2.1.jar", + f"{SPARK_HOME}/jars/mysql-connector-j-8.3.0.jar", + ]))\ + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\ + .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \ + .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \ + .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .getOrCreate() + + df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", table) \ + .load() + + df = df.withColumn("ingestion_date", F.current_timestamp()) \ + .withColumn("source_name", F.lit(table)) + output_path = S3_SAVEPATH + table + #df.write.format("delta").mode("overwrite").save(output_path) + + logger.info(f"Data saved as Delta table to {output_path}") + spark.stop() + except Exception as e: + logger.error(f"Pipeline failed: {e}") diff --git a/src/test.py b/src/test.py deleted file mode 100644 index bcd7921..0000000 --- a/src/test.py +++ /dev/null @@ -1,6 +0,0 @@ -from pyspark.sql import SparkSession - -spark = SparkSession.builder.appName("test").getOrCreate() -df = spark.range(1, 101) -print(f"Count: {df.count()}") -spark.stop() diff --git a/src/tmp.txt b/src/tmp.txt deleted file mode 100644 index 1348a35..0000000 --- a/src/tmp.txt +++ /dev/null @@ -1 +0,0 @@ -temp file \ No newline at end of file From 17b1aa7b1a6ceca5e1bbc674459a5b529a7924e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Souza=20Pi=C3=B1a?= Date: Mon, 10 Nov 2025 14:24:46 +0000 Subject: [PATCH 3/5] fix: resolving the wrong spark version --- Docker/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Docker/Dockerfile b/Docker/Dockerfile index 5d94ed2..3206e08 100644 --- a/Docker/Dockerfile +++ b/Docker/Dockerfile @@ -32,13 +32,13 @@ RUN ln -sf ${PREFIX}/bin/python3.11 /usr/local/bin/python \ ########################################### # Stage 2: Get entrypoint from official Spark ########################################### -FROM apache/spark:3.5.6 AS spark-official +FROM apache/spark:3.5.7 AS spark-official ########################################### # Stage 3: Spark + Delta + Cloud connectors ########################################### FROM ubuntu:22.04 AS spark-base -ARG SPARK_VERSION=3.5.6 +ARG SPARK_VERSION=3.5.7 ARG HADOOP_VERSION=3 ARG DELTA_VERSION=3.2.1 ENV DEBIAN_FRONTEND=noninteractive From 9542705a00d2334460bcdf02a345c73509944b8d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Souza=20Pi=C3=B1a?= Date: Wed, 12 Nov 2025 15:04:29 +0000 Subject: [PATCH 4/5] fix: resolve missing imports and comments --- src/bronze/get_full_tables.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/src/bronze/get_full_tables.py b/src/bronze/get_full_tables.py index 387d5e3..67998f4 100644 --- a/src/bronze/get_full_tables.py +++ b/src/bronze/get_full_tables.py @@ -1,9 +1,9 @@ -from pyspark.sql import SparkSession -import pyspark.sql.functions as F +from pyspark.sql import SparkSession #type:ignore +import pyspark.sql.functions as F #type:ignore import os import logging -logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) ################################################################################### # GET MYSQL CREDENTIALS # ################################################################################### @@ -29,7 +29,8 @@ "course_overviews_courseoverview", "student_courseenrollment", "certificates_generatedcertificate", -"student_courseaccessrole"] +"student_courseaccessrole" +] for table in TABLES: logging.info(f"getting table {table}") @@ -64,8 +65,9 @@ df = df.withColumn("ingestion_date", F.current_timestamp()) \ .withColumn("source_name", F.lit(table)) - output_path = S3_SAVEPATH + table - #df.write.format("delta").mode("overwrite").save(output_path) + output_path = f"{S3_SAVEPATH}/{table}" + + df.write.format("delta").mode("overwrite").save(output_path) logger.info(f"Data saved as Delta table to {output_path}") spark.stop() From 8ba166af6baba5d39adbac131eb29404fd4bc18d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vitor=20Souza=20Pi=C3=B1a?= Date: Mon, 17 Nov 2025 11:32:18 +0000 Subject: [PATCH 5/5] feature: adding the incremental load and ajusting the full table --- .github/workflows/docker-build-push.yml | 3 + .gitignore | 2 + src/bronze/get_full_tables.py | 143 ++++++++++------- src/bronze/incremental_load.py | 198 ++++++++++++++++++++++++ 4 files changed, 289 insertions(+), 57 deletions(-) create mode 100644 .gitignore create mode 100644 src/bronze/incremental_load.py diff --git a/.github/workflows/docker-build-push.yml b/.github/workflows/docker-build-push.yml index 3544a68..87b7af1 100644 --- a/.github/workflows/docker-build-push.yml +++ b/.github/workflows/docker-build-push.yml @@ -4,6 +4,9 @@ on: push: branches: - '**' + paths: + - 'src/**' + - 'Docker/**' workflow_dispatch: jobs: diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..de045ba --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +.env +.mypy_cache \ No newline at end of file diff --git a/src/bronze/get_full_tables.py b/src/bronze/get_full_tables.py index 67998f4..a2745e9 100644 --- a/src/bronze/get_full_tables.py +++ b/src/bronze/get_full_tables.py @@ -1,51 +1,28 @@ from pyspark.sql import SparkSession #type:ignore import pyspark.sql.functions as F #type:ignore +import argparse import os import logging -logger = logging.getLogger(__name__) -################################################################################### -# GET MYSQL CREDENTIALS # -################################################################################### +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] +) +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored") + parser.add_argument("--undesired_column", type = str,required= True, help = " the undesired column for a table") + args = parser.parse_args() + return args -MYSQL_DATABASE = os.getenv("MYSQL_DATABASE") -MYSQL_HOST = os.getenv("MYSQL_HOST") -MYSQL_PORT = os.getenv("MYSQL_PORT") -MYSQL_USER = os.getenv("MYSQL_USER") -MYSQL_SECRET = os.getenv("MYSQL_SECRET") -jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}" -SPARK_HOME = os.environ.get("SPARK_HOME", "/opt/spark") - - -################################################################################### -# GET S3 CREDENTIALS # -################################################################################### -S3_ACCESS_KEY = os.getenv("S3_ACCESS_KEY") -S3_SECRET_KEY = os.getenv("S3_SECRET_KEY") -S3_ENDPOINT = os.getenv("S3_ENDPOINT") -S3_SAVEPATH = os.getenv("S3_SAVEPATH") - -TABLES = [ -"course_overviews_courseoverview", -"student_courseenrollment", -"certificates_generatedcertificate", -"student_courseaccessrole" -] - -for table in TABLES: - logging.info(f"getting table {table}") - try: +def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession: - spark = SparkSession.builder \ - .appName("MyApp") \ - .config("spark.jars", ",".join([ - f"{SPARK_HOME}/jars/hadoop-aws-3.3.4.jar", - f"{SPARK_HOME}/jars/aws-java-sdk-bundle-1.12.375.jar", - f"{SPARK_HOME}/jars/delta-spark_2.12-3.2.1.jar", - f"{SPARK_HOME}/jars/delta-storage-3.2.1.jar", - f"{SPARK_HOME}/jars/delta-kernel-api-3.2.1.jar", - f"{SPARK_HOME}/jars/mysql-connector-j-8.3.0.jar", - ]))\ + spark = SparkSession.builder \ + .appName("incremental_table_ingestion") \ + .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \ .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\ .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\ .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \ @@ -54,22 +31,74 @@ .config("spark.hadoop.fs.s3a.path.style.access", "true") \ .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ .getOrCreate() + return spark + +################################################################################### +# GET MYSQL CREDENTIALS # +################################################################################### +def main() -> None: + MYSQL_DATABASE = os.getenv("MYSQL_DATABASE") + MYSQL_HOST = os.getenv("MYSQL_HOST") + MYSQL_PORT = os.getenv("MYSQL_PORT") + MYSQL_USER = os.getenv("MYSQL_USER") + MYSQL_SECRET = os.getenv("MYSQL_SECRET") + jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}" + - df = spark.read.format("jdbc") \ - .option("url", jdbc_url) \ - .option("user", MYSQL_USER) \ - .option("password", MYSQL_SECRET) \ - .option("driver", "com.mysql.cj.jdbc.Driver") \ - .option("dbtable", table) \ - .load() - - df = df.withColumn("ingestion_date", F.current_timestamp()) \ - .withColumn("source_name", F.lit(table)) - output_path = f"{S3_SAVEPATH}/{table}" + + ################################################################################### + # GET S3 CREDENTIALS # + ################################################################################### + S3_ACCESS_KEY = str(os.getenv("S3_ACCESS_KEY")) + S3_SECRET_KEY = str(os.getenv("S3_SECRET_KEY")) + S3_ENDPOINT = str(os.getenv("S3_ENDPOINT")) + + args = get_args() + S3_SAVEPATH = args.savepath + undesired_column = args.undesired_column + + TABLES = [ + "course_overviews_courseoverview", + "student_courseenrollment", + "certificates_generatedcertificate", + "student_courseaccessrole", + "auth_userprofile", + "student_userattribute", + "organizations_organization", + "auth_user" + ] + + for table in TABLES: + + logging.info(f"getting table {table}") + try: - df.write.format("delta").mode("overwrite").save(output_path) + spark = get_spark_session(S3_ACCESS_KEY=S3_ACCESS_KEY,S3_SECRET_KEY=S3_SECRET_KEY,S3_ENDPOINT=S3_ENDPOINT) + + df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", table) \ + .load() + if table == "auth_user": + df = df.drop(undesired_column) + + df = df.withColumn("ingestion_date", F.current_timestamp()) \ + .withColumn("source_name", F.lit(table)) + if table == "auth_user" and undesired_column and undesired_column in df.columns: + raise Exception("THE undesired column stills in the dataframe") + output_path = f"{S3_SAVEPATH}/{table}" + + df.write.format("delta").mode("append").save(output_path) + + logging.info(f"Data saved as Delta table to {output_path}") + + except Exception as e: + logging.error(f"Pipeline failed: {e}") + spark.stop() + - logger.info(f"Data saved as Delta table to {output_path}") - spark.stop() - except Exception as e: - logger.error(f"Pipeline failed: {e}") +if __name__=="__main__": + main() \ No newline at end of file diff --git a/src/bronze/incremental_load.py b/src/bronze/incremental_load.py new file mode 100644 index 0000000..d472c95 --- /dev/null +++ b/src/bronze/incremental_load.py @@ -0,0 +1,198 @@ +from datetime import datetime +from pyspark.sql import SparkSession # type: ignore +import pyspark.sql.functions as F # type: ignore +import pyspark.sql.types as T # type: ignore +import argparse +import logging +import os +from typing import List, Union, Optional,Tuple + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(levelname)s - %(message)s', + handlers=[ + logging.StreamHandler() + ] +) + + + + +def get_args() -> argparse.Namespace: + parser = argparse.ArgumentParser() + parser.add_argument("--savepath", type = str,required= True, help = "The S3 bucket intended for the data to be stored") + parser.add_argument("--metadatapath", type = str, required = True, help ="The S3 bucket that contains stores the metada for the process") + parser.add_argument("--table", type = str, required = True, help ="The S3 bucket that contains stores the metada for the process") + parser.add_argument("--first_ingestion_flag",type = int,default=0,help="flag to indicate if it is the first ingestion on regular ingestion") + args = parser.parse_args() + return args + +def update_metadata(metadatapath: str ,spark: SparkSession,table:str,last_date:str) -> bool: + try: + tmp_df = spark.createDataFrame( + [ + (table, last_date) + ], + T.StructType([T.StructField("table_name", T.StringType(), True),T.StructField("last_date", T.StringType(), True),]),) + + tmp_df.coalesce(1).write.format("csv").mode("overwrite").save(f"{metadatapath}/{table}/last_updated_date") + return True + except Exception as e: + logging.error(f"Exception {e}") + return False + + + +def get_spark_session(S3_ACCESS_KEY: str,S3_SECRET_KEY: str , S3_ENDPOINT: str) -> SparkSession: + + spark = SparkSession.builder \ + .appName("incremental_table_ingestion") \ + .config("spark.jars", "/opt/spark/jars/hadoop-aws-3.3.4.jar,/opt/spark/jars/aws-java-sdk-bundle-1.12.375.jar,/opt/spark/jars/delta-spark_2.12-3.2.1.jar,/opt/spark/jars/delta-storage-3.2.1.jar,/opt/spark/jars/delta-kernel-api-3.2.1.jar,/opt/spark/jars/mysql-connector-j-8.3.0.jar") \ + .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")\ + .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")\ + .config("spark.hadoop.fs.s3a.access.key", S3_ACCESS_KEY) \ + .config("spark.hadoop.fs.s3a.secret.key", S3_SECRET_KEY) \ + .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT) \ + .config("spark.hadoop.fs.s3a.path.style.access", "true") \ + .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \ + .getOrCreate() + return spark + + + + + + + + + +def full_initial_ingestion(spark: SparkSession, table: str, savepath: str, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str) -> Tuple[bool, str]: + years = [i for i in range(2019,2100)] + months = [i for i in range(1,13)] + current_year = datetime.now().year + current_month = datetime.now().month + path = f"{savepath}/{table}" + + for year in years: + for month in months: + + if (year == 2019 and month ==1): + query = f"(SELECT * FROM {table} WHERE YEAR(created) = {year} AND MONTH(created) = {month}) AS limited_table" + logging.info(query) + df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", query) \ + .load() + + df = df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \ + .withColumn("year", F.year(F.col("created"))) \ + .withColumn("month", F.month(F.col("created"))) + + df = df.withColumn("ingestion_date", F.current_timestamp()) \ + .withColumn("source_name", F.lit(table)) + + df.write.format("delta") \ + .mode("overwrite") \ + .partitionBy("year", "month") \ + .save(path) + continue + + if (year > current_year) or (year == current_year and month > current_month): + break + else: + + query = f"(SELECT * FROM {table} WHERE YEAR(created) = {year} AND MONTH(created) = {month}) AS limited_table" + logging.info(query) + new_df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", query) \ + .load() + if (year == current_year and month == current_month): + last_update = datetime.now().isoformat() + incremental_df = new_df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \ + .withColumn("year", F.year(F.col("created"))) \ + .withColumn("month", F.month(F.col("created"))) + + incremental_df = incremental_df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table)) + + # Append new partitions directly + incremental_df.write.format("delta").mode("append").partitionBy("year", "month").save(path) + return (True, last_update) + +def get_metadata(metadatapath: str, spark: SparkSession, table:str) -> str | bool: + metadatapath = f"{metadatapath}/{table}/last_updated_date" + customSchema = T.StructType([ + T.StructField("table_name", T.StringType(), True), + T.StructField("last_date", T.StringType(), True) + ]) + row = spark.read.csv(metadatapath,schema=customSchema).filter( F.col("table_name") == table).first() + if row is None or row["last_date"] is None: + return False + return str(row["last_date"]) + +def delta_load(spark: SparkSession, jdbc_url:str, MYSQL_USER:str, MYSQL_SECRET:str,last_updated:str,table:str,savepath: str) -> Tuple[bool, str]: + path = f"{savepath}/{table}" + query = f"(SELECT * FROM {table} WHERE created >= '{last_updated}') AS limited_table" + logging.info(query) + new_df = spark.read.format("jdbc") \ + .option("url", jdbc_url) \ + .option("user", MYSQL_USER) \ + .option("password", MYSQL_SECRET) \ + .option("driver", "com.mysql.cj.jdbc.Driver") \ + .option("dbtable", query) \ + .load() + last_update = datetime.now().isoformat() + incremental_df = new_df.withColumn("created", F.date_format("created", "yyyy-MM-dd'T'HH:mm:ss.SSSSSS")) \ + .withColumn("year", F.year(F.col("created"))) \ + .withColumn("month", F.month(F.col("created"))) + + incremental_df = incremental_df.withColumn("ingestion_date", F.current_timestamp()).withColumn("source_name", F.lit(table)) + + # Append new partitions directly + incremental_df.write.format("delta").mode("append").partitionBy("year", "month").save(path) + + return (True,last_update) + + +def main() -> None: + + MYSQL_DATABASE = os.getenv("MYSQL_DATABASE") + MYSQL_HOST = os.getenv("MYSQL_HOST") + MYSQL_PORT = os.getenv("MYSQL_PORT") + MYSQL_USER = str(os.getenv("MYSQL_USER")) + MYSQL_SECRET = str(os.getenv("MYSQL_SECRET")) + jdbc_url = f"jdbc:mysql://{MYSQL_HOST}:{MYSQL_PORT}/{MYSQL_DATABASE}" + + S3_ACCESS_KEY = str(os.getenv("S3_ACCESS_KEY")) + S3_SECRET_KEY = str(os.getenv("S3_SECRET_KEY")) + S3_ENDPOINT = str(os.getenv("S3_ENDPOINT")) + + args = get_args() + savepath = args.savepath + metadata = args.metadatapath + is_full_ingestion_flag = args.first_ingestion_flag + table = args.table + spark = get_spark_session(S3_ACCESS_KEY,S3_SECRET_KEY,S3_ENDPOINT) + if is_full_ingestion_flag == 1: + result = full_initial_ingestion(spark,table,savepath,jdbc_url,MYSQL_USER,MYSQL_SECRET) + logging.info(result) + if result[0]: + update_metadata(metadatapath=metadata,spark=spark,table=table,last_date=result[1]) + if is_full_ingestion_flag == 0: + last_date = get_metadata(metadatapath=metadata,spark=spark,table=table) + if last_date == False: + raise Exception("No date Found") + last_date = str(last_date) + result = delta_load(spark=spark,jdbc_url=jdbc_url,MYSQL_USER=MYSQL_USER,MYSQL_SECRET=MYSQL_SECRET,last_updated=last_date,table=table,savepath=savepath) + if result[0]: + update_metadata(metadatapath=metadata,spark=spark,table=table,last_date=result[1]) + spark.stop() + +if __name__=="__main__": + main() \ No newline at end of file