From 6e15f38129aca446b2053e9d89f8936dce32440c Mon Sep 17 00:00:00 2001 From: Guillermo Ambrosio Date: Thu, 26 May 2016 16:27:09 -0600 Subject: [PATCH 1/3] remove threads from batch insert code tested multiple times and worked fine. --- pyvertica/batch.py | 156 +++++++-------------------------------------- 1 file changed, 24 insertions(+), 132 deletions(-) diff --git a/pyvertica/batch.py b/pyvertica/batch.py index 573a51c..4dec6c2 100644 --- a/pyvertica/batch.py +++ b/pyvertica/batch.py @@ -3,8 +3,6 @@ import logging import os import tempfile -import threading -import taskthread from Queue import Queue from functools import wraps @@ -34,67 +32,6 @@ def inner_func(self, *args, **kwargs): return func(self, *args, **kwargs) return inner_func - -class Query(object): - """ - An object that executes the ``COPY`` query for batch loading. - - :param cursor: - A :py:mod:`!pyodbc` cursor object. - - :param sql_query_str: - A ``str`` representing the ``COPY`` query. - - :param semaphore_obj: - An instance of :py:class:`!threading.Semaphore`. - - :param fifo_path: - A ``str`` representing the path of the fifo file. - - :param exc_queue: - An instance of class:`Queue.Queue` instance to put exceptions in. - - """ - - daemon = True - """ - If the script is interrupted (^C) it will kill all threads marked as daemon - if only those are left. - """ - - def __init__( - self, cursor, sql_query_str, fifo_path, exc_queue): - super(Query, self).__init__() - self.cursor = cursor - self.sql_query_str = sql_query_str - self.exc_queue = exc_queue - self.fifo_path = fifo_path - - def run_query(self): - """ - Handle executing the SQL query. - - This method is intended to be called on a task_thread. - - """ - logger.debug('Query started with SQL statement: {0}'.format( - self.sql_query_str)) - try: - self.cursor.execute(self.sql_query_str) - except Exception as e: - logger.exception('Something unexpected happened') - - # the exception will be re-raised in the main thread - self.exc_queue.put(e) - - # we need to consume the fifo, to make sure it isn't blocking the - # write (and thus hanging forever). - for line in codecs.open(self.fifo_path, 'r', 'utf-8'): - pass - - logger.debug('Query done') - - class VerticaBatch(object): """ Object for writing multiple records to Vertica in a batch. @@ -236,7 +173,7 @@ def __init__( self.copy_options_dict.update(copy_options) self._batch_initialized = False self._multi_batch = multi_batch - + self._csv_file_obj = None self._total_count = 0 self._batch_count = 0 @@ -271,73 +208,38 @@ def _truncate_table(self): self._cursor.execute('TRUNCATE TABLE {0}'.format(self._table_name)) def _initialize_batch(self): - - self._query_exc_queue = Queue() - - # create FIFO - self._fifo_path = os.path.join(tempfile.mkdtemp(), 'fifo') - os.mkfifo(self._fifo_path) - # create rejected file obj if self.copy_options_dict['REJECTEDFILE']: self._rejected_file_obj = tempfile.NamedTemporaryFile(bufsize=0) - self._query = Query( - self._cursor, - self._get_sql_lcopy_str(), - self._fifo_path, - self._query_exc_queue, - ) - - self._query_thread = taskthread.TaskThread(self._query.run_query) - - # Start the thread so run_task can be called - self._query_thread.start() self._batch_initialized = True def _start_batch(self): """ Start the batch. - This will create the FIFO file, a temporary file for the rejected - inserts and this will setup and start the :py:class:`.QueryThread`. - """ self._in_batch = True self._batch_count = 0 if not self._batch_initialized: self._initialize_batch() - self._query_thread.run_task() - - logger.debug('Opening FIFO') - self._fifo_obj = codecs.open(self._fifo_path, 'w', 'utf-8') - + self._csv_file_obj = tempfile.NamedTemporaryFile('w') + temp_file_path= self._csv_file_obj.name + self._csv_file_obj.close() + self._csv_file_obj = codecs.open(temp_file_path, 'w', 'utf-8') + logger.debug('Batch started') @require_started_batch def _end_batch(self): - """ - End the batch. + self._csv_file_obj.close() + self._cursor.execute(self._get_sql_lcopy_str()) + temp_file_path = self._csv_file_obj.name + os.remove(temp_file_path) - This waits for the current query to finish, and executes - close_batch if multi_batch is false. - - """ ended_clean = True - - logger.debug('Closing FIFO') - # The Query task will stop when there is nothing writing to - # the fifo. This should force the current task to end. - self._fifo_obj.close() - - logger.debug('Waiting for COPY Query to finish') - if not self._query_thread.join_task(2): - logger.warn('Error shutting down task thread!') - else: - logger.debug('Query task finished') - if not self._multi_batch: - ended_clean = self.close_batch() and ended_clean + ended_clean = self.close_batch() self._in_batch = False return ended_clean @@ -345,26 +247,17 @@ def _end_batch(self): def close_batch(self): """ Close out the batch. - - This will remove the FIFO file and stop the ``taskthread.TaskThread``. - """ - + try: + if self._csv_file_obj: + temp_file_path = self._csv_file_obj.name + os.remove(temp_file_path) + except: + pass + + self._in_batch = False ended_clean = True - logger.debug('Terminating thread') - self._query_thread.join(2) - if self._query_thread.is_alive(): - ended_clean = False - logging.error('Terminating thread timed out!') - - os.remove(self._fifo_path) - os.rmdir(os.path.dirname(self._fifo_path)) - logger.debug('Batch ended') - - if not self._query_exc_queue.empty(): - raise self._query_exc_queue.get() - self._batch_initialized = False return ended_clean @@ -430,8 +323,7 @@ def _get_sql_lcopy_str(self): if self._column_list: output_str += ' ({0})'.format(', '.join(self._column_list)) - # fifo path - output_str += " FROM LOCAL '{0}'".format(self._fifo_path) + output_str += " FROM LOCAL '{0}'".format(self._csv_file_obj.name) # rejected file if self.copy_options_dict['REJECTEDFILE']: @@ -526,7 +418,7 @@ def insert_lists(self, value_lists, row_count=1): strings = (self._single_list_to_string(value_list, suffix=suffix) for value_list in value_lists) - self._fifo_obj.write( + self._csv_file_obj.write( "".join(strings) ) self._total_count += row_count @@ -560,7 +452,7 @@ def insert_line(self, line_str): if __debug__: logger.debug(u'Inserting line: {0}'.format(line_str)) - self._fifo_obj.write( + self._csv_file_obj.write( line_str + self.copy_options_dict['RECORD TERMINATOR']) self._total_count += 1 @@ -578,7 +470,7 @@ def insert_raw(self, raw_str): if __debug__: logger.debug(u'Inserting raw: {0}'.format(raw_str)) - self._fifo_obj.write(raw_str) + self._csv_file_obj.write(raw_str) self._total_count += 1 self._batch_count += 1 @@ -691,4 +583,4 @@ def get_cursor(self): Instance of :py:class:`!pyodbc.Cursor`. """ - return self._connection.cursor() + return self._connection.cursor() From 6c88f9c326bc915bd017b39f090028d5a905d8ed Mon Sep 17 00:00:00 2001 From: Guillermo Ambrosio Date: Thu, 26 May 2016 16:36:59 -0600 Subject: [PATCH 2/3] fix bad whitespace stuff --- pyvertica/batch.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/pyvertica/batch.py b/pyvertica/batch.py index 4dec6c2..91e978c 100644 --- a/pyvertica/batch.py +++ b/pyvertica/batch.py @@ -32,6 +32,7 @@ def inner_func(self, *args, **kwargs): return func(self, *args, **kwargs) return inner_func + class VerticaBatch(object): """ Object for writing multiple records to Vertica in a batch. @@ -224,10 +225,10 @@ def _start_batch(self): self._initialize_batch() self._csv_file_obj = tempfile.NamedTemporaryFile('w') - temp_file_path= self._csv_file_obj.name + temp_file_path = self._csv_file_obj.name self._csv_file_obj.close() self._csv_file_obj = codecs.open(temp_file_path, 'w', 'utf-8') - + logger.debug('Batch started') @require_started_batch @@ -583,4 +584,4 @@ def get_cursor(self): Instance of :py:class:`!pyodbc.Cursor`. """ - return self._connection.cursor() + return self._connection.cursor() From c6eef8c12f21119ca465d4ed729c52412c6c3f44 Mon Sep 17 00:00:00 2001 From: Guillermo Ambrosio Date: Thu, 26 May 2016 22:42:03 -0600 Subject: [PATCH 3/3] fix bad whitespace stuff --- pyvertica/batch.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyvertica/batch.py b/pyvertica/batch.py index 91e978c..631de29 100644 --- a/pyvertica/batch.py +++ b/pyvertica/batch.py @@ -254,8 +254,8 @@ def close_batch(self): temp_file_path = self._csv_file_obj.name os.remove(temp_file_path) except: - pass - + pass + self._in_batch = False ended_clean = True logger.debug('Batch ended')