Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 24 additions & 20 deletions pigz_python/pigz_python.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
Functions and classes to speed up compression of files by utilizing
multiple cores on a system.
"""

import os
import sys
import time
Expand All @@ -27,7 +28,7 @@


class PigzFile: # pylint: disable=too-many-instance-attributes
""" Class to implement Pigz functionality in Python """
"""Class to implement Pigz functionality in Python"""

def __init__(
self,
Expand Down Expand Up @@ -128,25 +129,25 @@ def _write_header_id(self):
self.output_file.write((0x8B).to_bytes(1, sys.byteorder))

def _write_header_cm(self):
""" Write the CM (compression method) to file header """
"""Write the CM (compression method) to file header"""
self.output_file.write((8).to_bytes(1, sys.byteorder))

def _write_header_flg(self, flags):
""" Write FLG (FLaGs) """
"""Write FLG (FLaGs)"""
self.output_file.write((flags).to_bytes(1, sys.byteorder))

def _write_header_mtime(self):
""" Write MTIME (Modification time) """
"""Write MTIME (Modification time)"""
mtime = self._determine_mtime()
self.output_file.write((mtime).to_bytes(4, sys.byteorder))

def _write_header_xfl(self):
""" Write XFL (eXtra FLags) """
"""Write XFL (eXtra FLags)"""
extra_flags = self._determine_extra_flags(self.compression_level)
self.output_file.write((extra_flags).to_bytes(1, sys.byteorder))

def _write_header_os(self):
""" Write OS """
"""Write OS"""
os_number = self._determine_operating_system()
self.output_file.write((os_number).to_bytes(1, sys.byteorder))

Expand Down Expand Up @@ -235,27 +236,30 @@ def _read_file(self):
# Initialize this to 0 so our increment sets first chunk to 1
chunk_num = 0
with open(self.compression_target, "rb") as input_file:
while True:
chunk = input_file.read(self.blocksize)
# Break out of the loop if we didn't read anything
if not chunk:
chunk = input_file.read(self.blocksize)
while chunk:
self.input_size += len(chunk)
chunk_num += 1

# Peek ahead to determine if this is the last chunk
next_chunk = input_file.read(self.blocksize)
is_last = not next_chunk

if is_last:
with self._last_chunk_lock:
self._last_chunk = chunk_num
break

self.input_size += len(chunk)
chunk_num += 1
# Apply this chunk to the pool
self.pool.apply_async(self._process_chunk, (chunk_num, chunk))
# Pass is_last directly to avoid race condition
self.pool.apply_async(self._process_chunk, (chunk_num, chunk, is_last))

chunk = next_chunk

def _process_chunk(self, chunk_num: int, chunk: bytes):
def _process_chunk(self, chunk_num: int, chunk: bytes, is_last: bool):
"""
Overall method to handle the chunk and pass it back to the write thread.
This method is run on the pool.
"""
with self._last_chunk_lock:
last_chunk = chunk_num == self._last_chunk
compressed_chunk = self._compress_chunk(chunk, last_chunk)
compressed_chunk = self._compress_chunk(chunk, is_last)
self.chunk_queue.put((chunk_num, chunk, compressed_chunk))

def _compress_chunk(self, chunk: bytes, is_last_chunk: bool):
Expand Down Expand Up @@ -355,6 +359,6 @@ def compress_file(
blocksize=DEFAULT_BLOCK_SIZE_KB,
workers=CPU_COUNT,
):
""" Helper function to call underlying class and compression method """
"""Helper function to call underlying class and compression method"""
pigz_file = PigzFile(source_file, compresslevel, blocksize, workers)
pigz_file.process_compression_target()
6 changes: 4 additions & 2 deletions tests/test_pigz_python.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Unit tests for Pigz Python
"""

import sys
import unittest
import zlib
Expand All @@ -14,7 +15,7 @@

# pylint: disable=protected-access, too-many-public-methods
class TestPigzPython(unittest.TestCase):
""" Unit tests for PigzPython class """
"""Unit tests for PigzPython class"""

def setUp(self):
"""
Expand Down Expand Up @@ -422,11 +423,12 @@ def test_process_chunk(self):
chunk = b"What can I do? I can't take up and down like this no more, babe I need to find out where I am Before I reach the stars Yeah, before I step on Mars" # noqa; pylint: disable=line-too-long
compressed_chunk = b"Jamiroquai"
self.pigz_file._last_chunk = chunk_num
is_last = True
self.pigz_file._compress_chunk = MagicMock()
self.pigz_file._compress_chunk.return_value = compressed_chunk
self.pigz_file.chunk_queue = MagicMock()

self.pigz_file._process_chunk(chunk_num, chunk)
self.pigz_file._process_chunk(chunk_num, chunk, is_last)

# Second arg is True since we've setup the test data as last chunk
self.pigz_file._compress_chunk.assert_called_with(chunk, True)
Expand Down