diff --git a/pyvertica/batch.py b/pyvertica/batch.py index 573a51c..631de29 100644 --- a/pyvertica/batch.py +++ b/pyvertica/batch.py @@ -3,8 +3,6 @@ import logging import os import tempfile -import threading -import taskthread from Queue import Queue from functools import wraps @@ -35,66 +33,6 @@ def inner_func(self, *args, **kwargs): return inner_func -class Query(object): - """ - An object that executes the ``COPY`` query for batch loading. - - :param cursor: - A :py:mod:`!pyodbc` cursor object. - - :param sql_query_str: - A ``str`` representing the ``COPY`` query. - - :param semaphore_obj: - An instance of :py:class:`!threading.Semaphore`. - - :param fifo_path: - A ``str`` representing the path of the fifo file. - - :param exc_queue: - An instance of class:`Queue.Queue` instance to put exceptions in. - - """ - - daemon = True - """ - If the script is interrupted (^C) it will kill all threads marked as daemon - if only those are left. - """ - - def __init__( - self, cursor, sql_query_str, fifo_path, exc_queue): - super(Query, self).__init__() - self.cursor = cursor - self.sql_query_str = sql_query_str - self.exc_queue = exc_queue - self.fifo_path = fifo_path - - def run_query(self): - """ - Handle executing the SQL query. - - This method is intended to be called on a task_thread. - - """ - logger.debug('Query started with SQL statement: {0}'.format( - self.sql_query_str)) - try: - self.cursor.execute(self.sql_query_str) - except Exception as e: - logger.exception('Something unexpected happened') - - # the exception will be re-raised in the main thread - self.exc_queue.put(e) - - # we need to consume the fifo, to make sure it isn't blocking the - # write (and thus hanging forever). - for line in codecs.open(self.fifo_path, 'r', 'utf-8'): - pass - - logger.debug('Query done') - - class VerticaBatch(object): """ Object for writing multiple records to Vertica in a batch. @@ -236,7 +174,7 @@ def __init__( self.copy_options_dict.update(copy_options) self._batch_initialized = False self._multi_batch = multi_batch - + self._csv_file_obj = None self._total_count = 0 self._batch_count = 0 @@ -271,73 +209,38 @@ def _truncate_table(self): self._cursor.execute('TRUNCATE TABLE {0}'.format(self._table_name)) def _initialize_batch(self): - - self._query_exc_queue = Queue() - - # create FIFO - self._fifo_path = os.path.join(tempfile.mkdtemp(), 'fifo') - os.mkfifo(self._fifo_path) - # create rejected file obj if self.copy_options_dict['REJECTEDFILE']: self._rejected_file_obj = tempfile.NamedTemporaryFile(bufsize=0) - self._query = Query( - self._cursor, - self._get_sql_lcopy_str(), - self._fifo_path, - self._query_exc_queue, - ) - - self._query_thread = taskthread.TaskThread(self._query.run_query) - - # Start the thread so run_task can be called - self._query_thread.start() self._batch_initialized = True def _start_batch(self): """ Start the batch. - This will create the FIFO file, a temporary file for the rejected - inserts and this will setup and start the :py:class:`.QueryThread`. - """ self._in_batch = True self._batch_count = 0 if not self._batch_initialized: self._initialize_batch() - self._query_thread.run_task() - - logger.debug('Opening FIFO') - self._fifo_obj = codecs.open(self._fifo_path, 'w', 'utf-8') + self._csv_file_obj = tempfile.NamedTemporaryFile('w') + temp_file_path = self._csv_file_obj.name + self._csv_file_obj.close() + self._csv_file_obj = codecs.open(temp_file_path, 'w', 'utf-8') logger.debug('Batch started') @require_started_batch def _end_batch(self): - """ - End the batch. - - This waits for the current query to finish, and executes - close_batch if multi_batch is false. + self._csv_file_obj.close() + self._cursor.execute(self._get_sql_lcopy_str()) + temp_file_path = self._csv_file_obj.name + os.remove(temp_file_path) - """ ended_clean = True - - logger.debug('Closing FIFO') - # The Query task will stop when there is nothing writing to - # the fifo. This should force the current task to end. - self._fifo_obj.close() - - logger.debug('Waiting for COPY Query to finish') - if not self._query_thread.join_task(2): - logger.warn('Error shutting down task thread!') - else: - logger.debug('Query task finished') - if not self._multi_batch: - ended_clean = self.close_batch() and ended_clean + ended_clean = self.close_batch() self._in_batch = False return ended_clean @@ -345,26 +248,17 @@ def _end_batch(self): def close_batch(self): """ Close out the batch. - - This will remove the FIFO file and stop the ``taskthread.TaskThread``. - """ + try: + if self._csv_file_obj: + temp_file_path = self._csv_file_obj.name + os.remove(temp_file_path) + except: + pass + self._in_batch = False ended_clean = True - logger.debug('Terminating thread') - self._query_thread.join(2) - if self._query_thread.is_alive(): - ended_clean = False - logging.error('Terminating thread timed out!') - - os.remove(self._fifo_path) - os.rmdir(os.path.dirname(self._fifo_path)) - logger.debug('Batch ended') - - if not self._query_exc_queue.empty(): - raise self._query_exc_queue.get() - self._batch_initialized = False return ended_clean @@ -430,8 +324,7 @@ def _get_sql_lcopy_str(self): if self._column_list: output_str += ' ({0})'.format(', '.join(self._column_list)) - # fifo path - output_str += " FROM LOCAL '{0}'".format(self._fifo_path) + output_str += " FROM LOCAL '{0}'".format(self._csv_file_obj.name) # rejected file if self.copy_options_dict['REJECTEDFILE']: @@ -526,7 +419,7 @@ def insert_lists(self, value_lists, row_count=1): strings = (self._single_list_to_string(value_list, suffix=suffix) for value_list in value_lists) - self._fifo_obj.write( + self._csv_file_obj.write( "".join(strings) ) self._total_count += row_count @@ -560,7 +453,7 @@ def insert_line(self, line_str): if __debug__: logger.debug(u'Inserting line: {0}'.format(line_str)) - self._fifo_obj.write( + self._csv_file_obj.write( line_str + self.copy_options_dict['RECORD TERMINATOR']) self._total_count += 1 @@ -578,7 +471,7 @@ def insert_raw(self, raw_str): if __debug__: logger.debug(u'Inserting raw: {0}'.format(raw_str)) - self._fifo_obj.write(raw_str) + self._csv_file_obj.write(raw_str) self._total_count += 1 self._batch_count += 1