Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 21 additions & 128 deletions pyvertica/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
import logging
import os
import tempfile
import threading
import taskthread
from Queue import Queue
from functools import wraps

Expand Down Expand Up @@ -35,66 +33,6 @@ def inner_func(self, *args, **kwargs):
return inner_func


class Query(object):
"""
An object that executes the ``COPY`` query for batch loading.

:param cursor:
A :py:mod:`!pyodbc` cursor object.

:param sql_query_str:
A ``str`` representing the ``COPY`` query.

:param semaphore_obj:
An instance of :py:class:`!threading.Semaphore`.

:param fifo_path:
A ``str`` representing the path of the fifo file.

:param exc_queue:
An instance of class:`Queue.Queue` instance to put exceptions in.

"""

daemon = True
"""
If the script is interrupted (^C) it will kill all threads marked as daemon
if only those are left.
"""

def __init__(
self, cursor, sql_query_str, fifo_path, exc_queue):
super(Query, self).__init__()
self.cursor = cursor
self.sql_query_str = sql_query_str
self.exc_queue = exc_queue
self.fifo_path = fifo_path

def run_query(self):
"""
Handle executing the SQL query.

This method is intended to be called on a task_thread.

"""
logger.debug('Query started with SQL statement: {0}'.format(
self.sql_query_str))
try:
self.cursor.execute(self.sql_query_str)
except Exception as e:
logger.exception('Something unexpected happened')

# the exception will be re-raised in the main thread
self.exc_queue.put(e)

# we need to consume the fifo, to make sure it isn't blocking the
# write (and thus hanging forever).
for line in codecs.open(self.fifo_path, 'r', 'utf-8'):
pass

logger.debug('Query done')


class VerticaBatch(object):
"""
Object for writing multiple records to Vertica in a batch.
Expand Down Expand Up @@ -236,7 +174,7 @@ def __init__(
self.copy_options_dict.update(copy_options)
self._batch_initialized = False
self._multi_batch = multi_batch

self._csv_file_obj = None
self._total_count = 0
self._batch_count = 0

Expand Down Expand Up @@ -271,100 +209,56 @@ def _truncate_table(self):
self._cursor.execute('TRUNCATE TABLE {0}'.format(self._table_name))

def _initialize_batch(self):

self._query_exc_queue = Queue()

# create FIFO
self._fifo_path = os.path.join(tempfile.mkdtemp(), 'fifo')
os.mkfifo(self._fifo_path)

# create rejected file obj
if self.copy_options_dict['REJECTEDFILE']:
self._rejected_file_obj = tempfile.NamedTemporaryFile(bufsize=0)
self._query = Query(
self._cursor,
self._get_sql_lcopy_str(),
self._fifo_path,
self._query_exc_queue,
)

self._query_thread = taskthread.TaskThread(self._query.run_query)

# Start the thread so run_task can be called
self._query_thread.start()
self._batch_initialized = True

def _start_batch(self):
"""
Start the batch.

This will create the FIFO file, a temporary file for the rejected
inserts and this will setup and start the :py:class:`.QueryThread`.

"""
self._in_batch = True
self._batch_count = 0
if not self._batch_initialized:
self._initialize_batch()

self._query_thread.run_task()

logger.debug('Opening FIFO')
self._fifo_obj = codecs.open(self._fifo_path, 'w', 'utf-8')
self._csv_file_obj = tempfile.NamedTemporaryFile('w')
temp_file_path = self._csv_file_obj.name
self._csv_file_obj.close()
self._csv_file_obj = codecs.open(temp_file_path, 'w', 'utf-8')

logger.debug('Batch started')

@require_started_batch
def _end_batch(self):
"""
End the batch.

This waits for the current query to finish, and executes
close_batch if multi_batch is false.
self._csv_file_obj.close()
self._cursor.execute(self._get_sql_lcopy_str())
temp_file_path = self._csv_file_obj.name
os.remove(temp_file_path)

"""
ended_clean = True

logger.debug('Closing FIFO')
# The Query task will stop when there is nothing writing to
# the fifo. This should force the current task to end.
self._fifo_obj.close()

logger.debug('Waiting for COPY Query to finish')
if not self._query_thread.join_task(2):
logger.warn('Error shutting down task thread!')
else:
logger.debug('Query task finished')

if not self._multi_batch:
ended_clean = self.close_batch() and ended_clean
ended_clean = self.close_batch()

self._in_batch = False
return ended_clean

def close_batch(self):
"""
Close out the batch.

This will remove the FIFO file and stop the ``taskthread.TaskThread``.

"""
try:
if self._csv_file_obj:
temp_file_path = self._csv_file_obj.name
os.remove(temp_file_path)
except:
pass

self._in_batch = False
ended_clean = True
logger.debug('Terminating thread')
self._query_thread.join(2)
if self._query_thread.is_alive():
ended_clean = False
logging.error('Terminating thread timed out!')

os.remove(self._fifo_path)
os.rmdir(os.path.dirname(self._fifo_path))

logger.debug('Batch ended')

if not self._query_exc_queue.empty():
raise self._query_exc_queue.get()

self._batch_initialized = False
return ended_clean

Expand Down Expand Up @@ -430,8 +324,7 @@ def _get_sql_lcopy_str(self):
if self._column_list:
output_str += ' ({0})'.format(', '.join(self._column_list))

# fifo path
output_str += " FROM LOCAL '{0}'".format(self._fifo_path)
output_str += " FROM LOCAL '{0}'".format(self._csv_file_obj.name)

# rejected file
if self.copy_options_dict['REJECTEDFILE']:
Expand Down Expand Up @@ -526,7 +419,7 @@ def insert_lists(self, value_lists, row_count=1):
strings = (self._single_list_to_string(value_list,
suffix=suffix)
for value_list in value_lists)
self._fifo_obj.write(
self._csv_file_obj.write(
"".join(strings)
)
self._total_count += row_count
Expand Down Expand Up @@ -560,7 +453,7 @@ def insert_line(self, line_str):
if __debug__:
logger.debug(u'Inserting line: {0}'.format(line_str))

self._fifo_obj.write(
self._csv_file_obj.write(
line_str + self.copy_options_dict['RECORD TERMINATOR'])

self._total_count += 1
Expand All @@ -578,7 +471,7 @@ def insert_raw(self, raw_str):
if __debug__:
logger.debug(u'Inserting raw: {0}'.format(raw_str))

self._fifo_obj.write(raw_str)
self._csv_file_obj.write(raw_str)

self._total_count += 1
self._batch_count += 1
Expand Down