A Lightweight Custom Spark Exception Logger decorator class written in python to log the Exceptions details occurred during a job run.
A detailed technical blog on this: Building a Lightweight SparkExceptionLogger
To run it locally check the instruction on Playground README.md
The main motivation to create this Custom Spark Logger is:
- To make the debugging of Spark Jobs faster.
- To log all the occured exceptions in a table that can be queried via any query engines like DuckDB, Athena, Trino, etc.
- Easier integration into already existing scripts with minimal code introduction.
Logger logs the below mentioned details:
process_name- Functional Process Name. Can be defined on script level.sub_process- Functional Sub Process Name. Can be defined on script level. Default is empty string.script_name- Name of the .py script.cluster_id- Spark Cluster ID on which spark job is submitted. For e.g., for job running on EMR, emr_id will be here.application_id- Spark Application ID of the job.status- Final status of the spark job.completedorfailedstart_ts- Start timestamp of the spark job run.end_ts- End timestamp of the spark job run.error- In case job status isfailed, short description of the error.error_desc- In case job status isfailed, Full stacktrace of the error.time_taken- Total time taken by the script execution.execution_dt- Execution date of the spark job.
SparkExceptionLogger supports writing logs into:
- A Table, defined via
LOG_TABLEparameter in Decorator class. - A common path, defined via
LOG_PATHparameter in Decorator class.
In case, users are interested to write into a table directly. Sample Create table statement for this table can be found in batch_logs.hql.
In case the log table is defined with some other name. LOG_TABLE defined in SparkExceptionLogger.py can be updated to the new log table name.
It's defined as a decorator class so to make it easier to integrate in the scripts with least amount of code introduction.
# quaterly_sales.py
from pyspark.sql import SparkSession
from SparkExceptionLogger import SparkExceptionLogger
process_name = "sales"
app_name = "quaterly"
spark = SparkSession.builder.master("yarn").enableHiveSupport().getOrCreate()
@SparkExceptionLogger(spark, process_name, __file__, sub_process=app_name)
def quaterly_sales_process():
try:
# all the spark code with transformation and stuffs
print("Transformation and other code stuffs.")
except Exception as e:
raise e
if __name__ == "__main__":
quaterly_sales_process()# sample spark-submit
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAttempts=1 --driver-memory=4g --conf spark.driver.cores=1 --executor-cores 5 --conf spark.executor.instances=4 --conf spark.dynamicAllocation.minExecutors=2 --conf spark.dynamicAllocation.maxExecutors=10 --executor-memory 14g --py-files <path-to-folder>/SparkExceptionLogger.py <path-to-pyfiles>/quaterly_sales.pyTo overwrite or define a custom process name by concatenating other parameters requires args_tracking to be set to True
# quaterly_sales.py
import argparse
from pyspark.sql import SparkSession
from SparkExceptionLogger import SparkExceptionLogger
# process name will be overwritten passed in args
process_name = "dummy_process"
# sub process name if passed via as arguments will be overwritten
app_name = "quaterly"
spark = SparkSession.builder.master("yarn").enableHiveSupport().getOrCreate()
@SparkExceptionLogger(spark, process_name, __file__, sub_process=app_name, args_tracking=True)
def quaterly_sales_process(args):
try:
# all the spark code with transformation and stuffs
print("Transformation and other code stuffs.")
except Exception as e:
raise e
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("-p", "--process_name", type=str, required=True, help="Process Name")
parser.add_argument("-sp", "--sub_process_name", type=str, required=False, help="Sub Process Name")
parser.add_argument("-x", "--argument X", type=int, required=False, help="Other Argument X")
args = parser.parse_args()
# calling main function
quaterly_sales_process(args)# sample spark-submit
spark-submit --master yarn --deploy-mode cluster --conf spark.yarn.maxAttempts=1 --driver-memory=4g --conf spark.driver.cores=1 --executor-cores 5 --conf spark.executor.instances=4 --conf spark.dynamicAllocation.minExecutors=2 --conf spark.dynamicAllocation.maxExecutors=10 --executor-memory 14g --py-files <path-to-folder>/SparkExceptionLogger.py <path-to-pyfiles>/quaterly_sales.py -p sales -sp q_region_sales- For defining custom
process_nameorsub_processname by concatenating some other fields can be achieved by modifying the_update_processmethod inSparkExceptionLogger.py args_trackingmust be set toTruefor readingprocess_nameandsub_processor any other argument from the method parameters.@SparkExceptionLoggeronly needs to be defined on the main function that is being called on script execution and NOT on the helper methods.
SparkExceptionLoggernow supportservice_nameandloggingto a path instead of table.- To enable logging to a path instead of on a table:
log_to_path: Set this toTrue. Default:Falselog_path: Set this to path where logs will be written. Default:"s3://spark-logs-bucket/logs/"- Default
log_pathcan be updated by updatingLOG_PATHinSparkExceptionLogger.py
- Default
- To running it for other service other than EMR
service_name, can have other possible values:- Default:
EMR=> populatescluster_idcolumn withEMR ID. local=> populatecluster_idcolumn with Driverhostname.- Currently, any other value, just fetches the
hostnamefor the driver.
- Currently, any other value, just fetches the
- Main idea to add
service_nameparameter is to extend the logger functionality to get thecluster_idfor other services in future.
- Default:
- To enable logging to a path instead of on a table:
| process_name | sub_process | script_name | cluster_id | application_id | status | start_ts | end_ts | error | error_desc | time_taken | execution_dt |
|---|---|---|---|---|---|---|---|---|---|---|---|
| sales | quaterly | quaterly_sales.py | j-LM8CHLQYLJ2Q | application_1715654774853_0436 | failed | 2024-05-14 06:08:32.947 | 2024-05-14 06:08:40.669 | AnalysisException | Table or view not found: blogs_db.daily_sales; 'UnresolvedRelation [blogs_db, daily_sales], [], false | 7 secs | 2024-05-14 |
| sales | quaterly | quaterly_sales.py | j-LM8CHLQYLJ2Q | application_1715654774853_0441 | failed | 2024-05-14 06:14:38.922 | 2024-05-14 06:14:41.992 | AnalysisException | User: arn:aws:sts::aws-acct:assumed-role/role-emr-ec2-service/i-0dd9f8257a143b9c3 is not authorized to perform: glue:GetDatabase on resource: arn:aws:glue:region:aws-acct:database/test_db because no identity-based policy allows the glue:GetDatabase action (Service: AWSGlue; Status Code: 400; Error Code: AccessDeniedException; Request ID: b0a1d47a-1fe8-42dc-8f9a-6b1b60d10423; Proxy: null) | 3 secs | 2024-05-14 |
| sales | q_region_sales | quaterly_sales.py | j-LM8CHLQYLJ2Q | application_1715654774853_0396 | completed | 2024-05-14 05:58:37.491 | 2024-05-14 05:58:42.495 | 5 secs | 2024-05-14 | ||
| wsl | rwa_engine | job.py | j-NYHXOTMH2O74 | application_1730770422001_0882 | failed | 2024-11-05 10:29:48.809 | 2024-11-05 10:30:55.753 | AnalysisException | org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. Update table failed due to concurrent modifications. (Service: AWSGlue; Status Code: 400; Error Code: ConcurrentModificationException; Request ID: af79ffb4-baaf-47be-aba0-6e340bc549df; Proxy: null) | 1 min6 secs | 2024-11-05 |
| wsl | postcalc | exposures.py | j-NYHXOTMH2O74 | application_1730770422001_1262 | failed | 2024-11-05 11:14:12.126 | 2024-11-05 11:15:08.824 | AnalysisException | Found duplicate column(s) in the right attributes: defaulted_asset_flag |
56 secs | 2024-11-05 |
| wsl | set_id | dim_run_rk.py | j-NYHXOTMH2O74 | application_1730770422001_0293 | failed | 2024-11-05 06:32:23.581 | 2024-11-05 06:33:01.585 | AttributeError | 'NoneType' object has no attribute 'strip' | 38 secs | 2024-11-05 |
| wsl | rwa_engine | job.py | j-NYHXOTMH2O74 | application_1730770422001_1190 | failed | 2024-11-05 11:00:05.463 | 2024-11-05 11:01:09.779 | AnalysisException | org.apache.hadoop.hive.ql.metadata.HiveException: Unable to alter table. Update table failed due to concurrent modifications. (Service: AWSGlue; Status Code: 400; Error Code: ConcurrentModificationException; Request ID: 84891aad-1a60-4312-a4f7-27fa87918bb5; Proxy: null) | 1 min4 secs | 2024-11-05 |
| audit | count_amount | count_amount_check.py | j-NYHXOTMH2O74 | application_1730770422001_0750 | failed | 2024-11-05 09:51:25.433 | 2024-11-05 09:52:09.112 | TypeError | Column is not iterable | 43 secs | 2024-11-05 |