From 53e2cc05eff9a2309e90411efc318796a63b0ee3 Mon Sep 17 00:00:00 2001 From: bguise987 Date: Thu, 22 Jan 2026 16:49:45 -0500 Subject: [PATCH 1/3] Fix race condition causing intermittent gzip corruption The _process_chunk method checked _last_chunk to determine whether to use Z_FINISH, but _last_chunk wasn't set until after the read thread submitted the final chunk. This caused the last chunk to sometimes be compressed with Z_SYNC_FLUSH instead of Z_FINISH, producing invalid gzip files with unterminated deflate streams (00 00 FF FF marker). Fix by peeking ahead in _read_file to determine is_last before submitting to the pool, and passing the flag directly to _process_chunk. Co-Authored-By: Claude Opus 4.5 --- pigz_python/pigz_python.py | 31 ++++++++++++++++++------------- 1 file changed, 18 insertions(+), 13 deletions(-) diff --git a/pigz_python/pigz_python.py b/pigz_python/pigz_python.py index 4da266a..c2d31e2 100644 --- a/pigz_python/pigz_python.py +++ b/pigz_python/pigz_python.py @@ -235,27 +235,32 @@ def _read_file(self): # Initialize this to 0 so our increment sets first chunk to 1 chunk_num = 0 with open(self.compression_target, "rb") as input_file: - while True: - chunk = input_file.read(self.blocksize) - # Break out of the loop if we didn't read anything - if not chunk: + chunk = input_file.read(self.blocksize) + while chunk: + self.input_size += len(chunk) + chunk_num += 1 + + # Peek ahead to determine if this is the last chunk + next_chunk = input_file.read(self.blocksize) + is_last = not next_chunk + + if is_last: with self._last_chunk_lock: self._last_chunk = chunk_num - break - self.input_size += len(chunk) - chunk_num += 1 - # Apply this chunk to the pool - self.pool.apply_async(self._process_chunk, (chunk_num, chunk)) + # Pass is_last directly to avoid race condition + self.pool.apply_async( + self._process_chunk, (chunk_num, chunk, is_last) + ) + + chunk = next_chunk - def _process_chunk(self, chunk_num: int, chunk: bytes): + def _process_chunk(self, chunk_num: int, chunk: bytes, is_last: bool): """ Overall method to handle the chunk and pass it back to the write thread. This method is run on the pool. """ - with self._last_chunk_lock: - last_chunk = chunk_num == self._last_chunk - compressed_chunk = self._compress_chunk(chunk, last_chunk) + compressed_chunk = self._compress_chunk(chunk, is_last) self.chunk_queue.put((chunk_num, chunk, compressed_chunk)) def _compress_chunk(self, chunk: bytes, is_last_chunk: bool): From e66093be32ca85762136f2518897f48cd9eda670 Mon Sep 17 00:00:00 2001 From: bguise987 Date: Thu, 22 Jan 2026 17:40:36 -0500 Subject: [PATCH 2/3] Update tests to reflect passing in is_last to _process_chunk --- tests/test_pigz_python.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_pigz_python.py b/tests/test_pigz_python.py index edb6732..5a2f597 100644 --- a/tests/test_pigz_python.py +++ b/tests/test_pigz_python.py @@ -422,11 +422,12 @@ def test_process_chunk(self): chunk = b"What can I do? I can't take up and down like this no more, babe I need to find out where I am Before I reach the stars Yeah, before I step on Mars" # noqa; pylint: disable=line-too-long compressed_chunk = b"Jamiroquai" self.pigz_file._last_chunk = chunk_num + is_last = True self.pigz_file._compress_chunk = MagicMock() self.pigz_file._compress_chunk.return_value = compressed_chunk self.pigz_file.chunk_queue = MagicMock() - self.pigz_file._process_chunk(chunk_num, chunk) + self.pigz_file._process_chunk(chunk_num, chunk, is_last) # Second arg is True since we've setup the test data as last chunk self.pigz_file._compress_chunk.assert_called_with(chunk, True) From 9e384b4ef833cae2dd5f417026d42af372eaeb65 Mon Sep 17 00:00:00 2001 From: bguise987 Date: Thu, 22 Jan 2026 17:47:59 -0500 Subject: [PATCH 3/3] Black formatting --- pigz_python/pigz_python.py | 19 +++++++++---------- tests/test_pigz_python.py | 3 ++- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pigz_python/pigz_python.py b/pigz_python/pigz_python.py index c2d31e2..c061a1d 100644 --- a/pigz_python/pigz_python.py +++ b/pigz_python/pigz_python.py @@ -2,6 +2,7 @@ Functions and classes to speed up compression of files by utilizing multiple cores on a system. """ + import os import sys import time @@ -27,7 +28,7 @@ class PigzFile: # pylint: disable=too-many-instance-attributes - """ Class to implement Pigz functionality in Python """ + """Class to implement Pigz functionality in Python""" def __init__( self, @@ -128,25 +129,25 @@ def _write_header_id(self): self.output_file.write((0x8B).to_bytes(1, sys.byteorder)) def _write_header_cm(self): - """ Write the CM (compression method) to file header """ + """Write the CM (compression method) to file header""" self.output_file.write((8).to_bytes(1, sys.byteorder)) def _write_header_flg(self, flags): - """ Write FLG (FLaGs) """ + """Write FLG (FLaGs)""" self.output_file.write((flags).to_bytes(1, sys.byteorder)) def _write_header_mtime(self): - """ Write MTIME (Modification time) """ + """Write MTIME (Modification time)""" mtime = self._determine_mtime() self.output_file.write((mtime).to_bytes(4, sys.byteorder)) def _write_header_xfl(self): - """ Write XFL (eXtra FLags) """ + """Write XFL (eXtra FLags)""" extra_flags = self._determine_extra_flags(self.compression_level) self.output_file.write((extra_flags).to_bytes(1, sys.byteorder)) def _write_header_os(self): - """ Write OS """ + """Write OS""" os_number = self._determine_operating_system() self.output_file.write((os_number).to_bytes(1, sys.byteorder)) @@ -249,9 +250,7 @@ def _read_file(self): self._last_chunk = chunk_num # Pass is_last directly to avoid race condition - self.pool.apply_async( - self._process_chunk, (chunk_num, chunk, is_last) - ) + self.pool.apply_async(self._process_chunk, (chunk_num, chunk, is_last)) chunk = next_chunk @@ -360,6 +359,6 @@ def compress_file( blocksize=DEFAULT_BLOCK_SIZE_KB, workers=CPU_COUNT, ): - """ Helper function to call underlying class and compression method """ + """Helper function to call underlying class and compression method""" pigz_file = PigzFile(source_file, compresslevel, blocksize, workers) pigz_file.process_compression_target() diff --git a/tests/test_pigz_python.py b/tests/test_pigz_python.py index 5a2f597..c721edd 100644 --- a/tests/test_pigz_python.py +++ b/tests/test_pigz_python.py @@ -1,6 +1,7 @@ """ Unit tests for Pigz Python """ + import sys import unittest import zlib @@ -14,7 +15,7 @@ # pylint: disable=protected-access, too-many-public-methods class TestPigzPython(unittest.TestCase): - """ Unit tests for PigzPython class """ + """Unit tests for PigzPython class""" def setUp(self): """