Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions dagger/dag_creator/airflow/operators/aws_athena_operator.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#

from uuid import uuid4
from os import path

from dagger.dag_creator.airflow.operators.dagger_base_operator import DaggerBaseOperator
from airflow.utils.decorators import apply_defaults
Expand Down Expand Up @@ -60,7 +61,8 @@ def __init__(self, query, database, s3_tmp_results_location, s3_output_location,
super(AWSAthenaOperator, self).__init__(*args, **kwargs)
self.query = query
self.database = database
self.output_location = s3_tmp_results_location
# Make sure that all queries are writing to a different path to avoid Slow Down exceptions
self.s3_tmp_results_location = path.join(s3_tmp_results_location, self. str(uuid4().hex))
self.s3_output_location = s3_output_location
self.s3_output_bucket = s3_output_location.split('/')[2]
self.s3_output_path = '/'.join(s3_output_location.split('/')[3:])
Expand Down Expand Up @@ -140,7 +142,7 @@ def execute(self, context):
self.hook.delete_s3_location(self.s3_output_bucket, self.s3_output_path, self.database, self.output_table)

self.query_execution_context['Database'] = self.database
self.result_configuration['OutputLocation'] = self.output_location
self.result_configuration['OutputLocation'] = self.s3_tmp_results_location
query = self.extend_query(self.query)
self.log.info(f"Running query\n{query}")
self.query_execution_id = self.hook.run_query(query, self.query_execution_context,
Expand Down