diff --git a/dagger/dag_creator/airflow/operators/aws_athena_operator.py b/dagger/dag_creator/airflow/operators/aws_athena_operator.py index 149b076..15e1943 100644 --- a/dagger/dag_creator/airflow/operators/aws_athena_operator.py +++ b/dagger/dag_creator/airflow/operators/aws_athena_operator.py @@ -19,6 +19,7 @@ # from uuid import uuid4 +from os import path from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator from airflow.utils.decorators import apply_defaults @@ -60,7 +61,8 @@ def __init__(self, query, database, s3_tmp_results_location, s3_output_location, super(AWSAthenaOperator, self).__init__(*args, **kwargs) self.query = query self.database = database - self.output_location = s3_tmp_results_location + # Make sure that all queries are writing to a different path to avoid Slow Down exceptions + self.s3_tmp_results_location = path.join(s3_tmp_results_location, self. str(uuid4().hex)) self.s3_output_location = s3_output_location self.s3_output_bucket = s3_output_location.split('/')[2] self.s3_output_path = '/'.join(s3_output_location.split('/')[3:]) @@ -140,7 +142,7 @@ def execute(self, context): self.hook.delete_s3_location(self.s3_output_bucket, self.s3_output_path, self.database, self.output_table) self.query_execution_context['Database'] = self.database - self.result_configuration['OutputLocation'] = self.output_location + self.result_configuration['OutputLocation'] = self.s3_tmp_results_location query = self.extend_query(self.query) self.log.info(f"Running query\n{query}") self.query_execution_id = self.hook.run_query(query, self.query_execution_context,