diff --git a/pigz_python/pigz_python.py b/pigz_python/pigz_python.py index 4da266a..c061a1d 100644 --- a/pigz_python/pigz_python.py +++ b/pigz_python/pigz_python.py @@ -2,6 +2,7 @@ Functions and classes to speed up compression of files by utilizing multiple cores on a system. """ + import os import sys import time @@ -27,7 +28,7 @@ class PigzFile: # pylint: disable=too-many-instance-attributes - """ Class to implement Pigz functionality in Python """ + """Class to implement Pigz functionality in Python""" def __init__( self, @@ -128,25 +129,25 @@ def _write_header_id(self): self.output_file.write((0x8B).to_bytes(1, sys.byteorder)) def _write_header_cm(self): - """ Write the CM (compression method) to file header """ + """Write the CM (compression method) to file header""" self.output_file.write((8).to_bytes(1, sys.byteorder)) def _write_header_flg(self, flags): - """ Write FLG (FLaGs) """ + """Write FLG (FLaGs)""" self.output_file.write((flags).to_bytes(1, sys.byteorder)) def _write_header_mtime(self): - """ Write MTIME (Modification time) """ + """Write MTIME (Modification time)""" mtime = self._determine_mtime() self.output_file.write((mtime).to_bytes(4, sys.byteorder)) def _write_header_xfl(self): - """ Write XFL (eXtra FLags) """ + """Write XFL (eXtra FLags)""" extra_flags = self._determine_extra_flags(self.compression_level) self.output_file.write((extra_flags).to_bytes(1, sys.byteorder)) def _write_header_os(self): - """ Write OS """ + """Write OS""" os_number = self._determine_operating_system() self.output_file.write((os_number).to_bytes(1, sys.byteorder)) @@ -235,27 +236,30 @@ def _read_file(self): # Initialize this to 0 so our increment sets first chunk to 1 chunk_num = 0 with open(self.compression_target, "rb") as input_file: - while True: - chunk = input_file.read(self.blocksize) - # Break out of the loop if we didn't read anything - if not chunk: + chunk = input_file.read(self.blocksize) + while chunk: + self.input_size += len(chunk) + chunk_num += 1 + + # Peek ahead to determine if this is the last chunk + next_chunk = input_file.read(self.blocksize) + is_last = not next_chunk + + if is_last: with self._last_chunk_lock: self._last_chunk = chunk_num - break - self.input_size += len(chunk) - chunk_num += 1 - # Apply this chunk to the pool - self.pool.apply_async(self._process_chunk, (chunk_num, chunk)) + # Pass is_last directly to avoid race condition + self.pool.apply_async(self._process_chunk, (chunk_num, chunk, is_last)) + + chunk = next_chunk - def _process_chunk(self, chunk_num: int, chunk: bytes): + def _process_chunk(self, chunk_num: int, chunk: bytes, is_last: bool): """ Overall method to handle the chunk and pass it back to the write thread. This method is run on the pool. """ - with self._last_chunk_lock: - last_chunk = chunk_num == self._last_chunk - compressed_chunk = self._compress_chunk(chunk, last_chunk) + compressed_chunk = self._compress_chunk(chunk, is_last) self.chunk_queue.put((chunk_num, chunk, compressed_chunk)) def _compress_chunk(self, chunk: bytes, is_last_chunk: bool): @@ -355,6 +359,6 @@ def compress_file( blocksize=DEFAULT_BLOCK_SIZE_KB, workers=CPU_COUNT, ): - """ Helper function to call underlying class and compression method """ + """Helper function to call underlying class and compression method""" pigz_file = PigzFile(source_file, compresslevel, blocksize, workers) pigz_file.process_compression_target() diff --git a/tests/test_pigz_python.py b/tests/test_pigz_python.py index edb6732..c721edd 100644 --- a/tests/test_pigz_python.py +++ b/tests/test_pigz_python.py @@ -1,6 +1,7 @@ """ Unit tests for Pigz Python """ + import sys import unittest import zlib @@ -14,7 +15,7 @@ # pylint: disable=protected-access, too-many-public-methods class TestPigzPython(unittest.TestCase): - """ Unit tests for PigzPython class """ + """Unit tests for PigzPython class""" def setUp(self): """ @@ -422,11 +423,12 @@ def test_process_chunk(self): chunk = b"What can I do? I can't take up and down like this no more, babe I need to find out where I am Before I reach the stars Yeah, before I step on Mars" # noqa; pylint: disable=line-too-long compressed_chunk = b"Jamiroquai" self.pigz_file._last_chunk = chunk_num + is_last = True self.pigz_file._compress_chunk = MagicMock() self.pigz_file._compress_chunk.return_value = compressed_chunk self.pigz_file.chunk_queue = MagicMock() - self.pigz_file._process_chunk(chunk_num, chunk) + self.pigz_file._process_chunk(chunk_num, chunk, is_last) # Second arg is True since we've setup the test data as last chunk self.pigz_file._compress_chunk.assert_called_with(chunk, True)