From d04e5369cce69c5b7f5f066c0f22c44c0f055899 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Thu, 13 Feb 2025 07:18:14 -0500 Subject: [PATCH 01/14] Add streaming decompression for ZSTD_CONTENTSIZE_UNKNOWN case --- numcodecs/zstd.pyx | 133 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 128 insertions(+), 5 deletions(-) diff --git a/numcodecs/zstd.pyx b/numcodecs/zstd.pyx index efd12fa2..f1a84011 100644 --- a/numcodecs/zstd.pyx +++ b/numcodecs/zstd.pyx @@ -8,12 +8,12 @@ from cpython.buffer cimport PyBUF_ANY_CONTIGUOUS, PyBUF_WRITEABLE from cpython.bytes cimport PyBytes_FromStringAndSize, PyBytes_AS_STRING - from .compat_ext cimport Buffer from .compat_ext import Buffer from .compat import ensure_contiguous_ndarray from .abc import Codec +from libc.stdlib cimport malloc, realloc, free cdef extern from "zstd.h": @@ -22,6 +22,23 @@ cdef extern from "zstd.h": struct ZSTD_CCtx_s: pass ctypedef ZSTD_CCtx_s ZSTD_CCtx + + struct ZSTD_DStream_s: + pass + ctypedef ZSTD_DStream_s ZSTD_DStream + + struct ZSTD_inBuffer_s: + const void* src + size_t size + size_t pos + ctypedef ZSTD_inBuffer_s ZSTD_inBuffer + + struct ZSTD_outBuffer_s: + void* dst + size_t size + size_t pos + ctypedef ZSTD_outBuffer_s ZSTD_outBuffer + cdef enum ZSTD_cParameter: ZSTD_c_compressionLevel=100 ZSTD_c_checksumFlag=201 @@ -37,12 +54,20 @@ cdef extern from "zstd.h": size_t dstCapacity, const void* src, size_t srcSize) nogil - size_t ZSTD_decompress(void* dst, size_t dstCapacity, const void* src, size_t compressedSize) nogil + size_t ZSTD_decompressStream(ZSTD_DStream* zds, + ZSTD_outBuffer* output, + ZSTD_inBuffer* input) nogil + + size_t ZSTD_DStreamOutSize() nogil + ZSTD_DStream* ZSTD_createDStream() nogil + size_t ZSTD_freeDStream(ZSTD_DStream* zds) nogil + size_t ZSTD_initDStream(ZSTD_DStream* zds) nogil + cdef long ZSTD_CONTENTSIZE_UNKNOWN cdef long ZSTD_CONTENTSIZE_ERROR unsigned long long ZSTD_getFrameContentSize(const void* src, @@ -56,7 +81,7 @@ cdef extern from "zstd.h": unsigned ZSTD_isError(size_t code) nogil - const char* ZSTD_getErrorName(size_t code) + const char* ZSTD_getErrorName(size_t code) nogil VERSION_NUMBER = ZSTD_versionNumber() @@ -156,7 +181,8 @@ def decompress(source, dest=None): source : bytes-like Compressed data. Can be any object supporting the buffer protocol. dest : array-like, optional - Object to decompress into. + Object to decompress into. If the content size is unknown, the + length of dest must match the decompressed size. Returns ------- @@ -180,9 +206,12 @@ def decompress(source, dest=None): # determine uncompressed size dest_size = ZSTD_getFrameContentSize(source_ptr, source_size) - if dest_size == 0 or dest_size == ZSTD_CONTENTSIZE_UNKNOWN or dest_size == ZSTD_CONTENTSIZE_ERROR: + if dest_size == 0 or dest_size == ZSTD_CONTENTSIZE_ERROR: raise RuntimeError('Zstd decompression error: invalid input data') + if dest_size == ZSTD_CONTENTSIZE_UNKNOWN and dest is None: + return stream_decompress(source_buffer) + # setup destination buffer if dest is None: # allocate memory @@ -192,6 +221,8 @@ def decompress(source, dest=None): arr = ensure_contiguous_ndarray(dest) dest_buffer = Buffer(arr, PyBUF_ANY_CONTIGUOUS | PyBUF_WRITEABLE) dest_ptr = dest_buffer.ptr + if dest_size == ZSTD_CONTENTSIZE_UNKNOWN: + dest_size = dest_buffer.nbytes if dest_buffer.nbytes < dest_size: raise ValueError('destination buffer too small; expected at least %s, ' 'got %s' % (dest_size, dest_buffer.nbytes)) @@ -217,6 +248,98 @@ def decompress(source, dest=None): return dest +cdef stream_decompress(Buffer source_buffer): + """Decompress data of unknown size + + Parameters + ---------- + source : Buffer + Compressed data buffer + + Returns + ------- + dest : bytes + Object containing decompressed data. + """ + + cdef: + char *source_ptr + void *dest_ptr + void *new_dst + Buffer dest_buffer = None + size_t source_size, dest_size, decompressed_size + size_t DEST_GROWTH_SIZE, status + ZSTD_inBuffer input + ZSTD_outBuffer output + ZSTD_DStream *zds + + # Recommended size for output buffer, guaranteed to flush at least + # one completely block in all circumstances + DEST_GROWTH_SIZE = ZSTD_DStreamOutSize(); + + source_ptr = source_buffer.ptr + source_size = source_buffer.nbytes + + # unknown content size, guess it is twice the size as the source + dest_size = source_size * 2 + + if dest_size < DEST_GROWTH_SIZE: + # minimum dest_size is DEST_GROWTH_SIZE + dest_size = DEST_GROWTH_SIZE + + dest_ptr = malloc(dest_size) + zds = ZSTD_createDStream() + + try: + + with nogil: + + status = ZSTD_initDStream(zds) + if ZSTD_isError(status): + error = ZSTD_getErrorName(status) + ZSTD_freeDStream(zds); + raise RuntimeError('Zstd stream decompression error on ZSTD_initDStream: %s' % error) + + input = ZSTD_inBuffer(source_ptr, source_size, 0) + output = ZSTD_outBuffer(dest_ptr, dest_size, 0) + + # Initialize to 1 to force a loop iteration + status = 1 + while(status > 0 or input.pos < input.size): + # Possible returned values of ZSTD_decompressStream: + # 0: frame is completely decoded and fully flushed + # error (<0) + # >0: suggested next input size + status = ZSTD_decompressStream(zds, &output, &input) + + if ZSTD_isError(status): + error = ZSTD_getErrorName(status) + raise RuntimeError('Zstd stream decompression error on ZSTD_decompressStream: %s' % error) + + # There is more to decompress, grow the buffer + if status > 0 and output.pos == output.size: + new_size = output.size + DEST_GROWTH_SIZE + + if new_size < output.size or new_size < DEST_GROWTH_SIZE: + raise RuntimeError('Zstd stream decompression error: output buffer overflow') + + new_dst = realloc(output.dst, new_size) + + if new_dst == NULL: + # output.dst freed in finally block + raise RuntimeError('Zstd stream decompression error on realloc: could not expand output buffer') + + output.dst = new_dst + output.size = new_size + + # Copy the output to a bytes object + dest = PyBytes_FromStringAndSize(output.dst, output.pos) + + finally: + ZSTD_freeDStream(zds) + free(output.dst) + + return dest class Zstd(Codec): """Codec providing compression using Zstandard. From 1096df7e01b1798f5d1248841846153c3156008a Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Fri, 27 Jun 2025 15:43:20 -0400 Subject: [PATCH 02/14] Add tests and documentation for streaming Zstd --- docs/compression/zstd.rst | 3 +++ docs/release.rst | 2 ++ numcodecs/tests/test_zstd.py | 24 ++++++++++++++++++++++++ numcodecs/zstd.pyx | 4 +++- 4 files changed, 32 insertions(+), 1 deletion(-) diff --git a/docs/compression/zstd.rst b/docs/compression/zstd.rst index 611b0e83..51f9628d 100644 --- a/docs/compression/zstd.rst +++ b/docs/compression/zstd.rst @@ -7,6 +7,9 @@ Zstd .. autoattribute:: codec_id .. automethod:: encode .. automethod:: decode + .. note:: + If the compressed data does not contain the decompressed size, streaming + decompression will be used. .. automethod:: get_config .. automethod:: from_config diff --git a/docs/release.rst b/docs/release.rst index c5ba3919..32de52d7 100644 --- a/docs/release.rst +++ b/docs/release.rst @@ -19,6 +19,8 @@ Unreleased Improvements ~~~~~~~~~~~~ +* Add streaming decompression for ZSTD (:issue:`699`) + By :user:`Mark Kittisopikul `. * Raise a custom `UnknownCodecError` when trying to retrieve an unavailable codec. By :user:`Cas Wognum `. diff --git a/numcodecs/tests/test_zstd.py b/numcodecs/tests/test_zstd.py index de42d9e1..9cd364db 100644 --- a/numcodecs/tests/test_zstd.py +++ b/numcodecs/tests/test_zstd.py @@ -90,3 +90,27 @@ def test_native_functions(): assert Zstd.default_level() == 3 assert Zstd.min_level() == -131072 assert Zstd.max_level() == 22 + + +def test_streaming_decompression(): + codec = Zstd() + # Bytes from streaming compression + bytes_val = bytes(bytearray([ + 40, 181, 47, 253, 0, 88, 97, 0, 0, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33, + ])) + dec = codec.decode(bytes_val) + assert dec == b'Hello World!' + + bytes2 = bytes(bytearray([ + 40, 181, 47, 253, 0, 88, 36, 2, 0, 164, 3, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, + 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, + 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 1, 0, 58, 252, 223, 115, 5, 5, 76, 0, 0, 8, + 115, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 107, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 99, 1, 0, 252, 255, 57, + 16, 2, 76, 0, 0, 8, 91, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 83, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 75, 1, + 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 67, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 117, 1, 0, 252, 255, 57, 16, 2, 76, + 0, 0, 8, 109, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 101, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 93, 1, 0, 252, + 255, 57, 16, 2, 76, 0, 0, 8, 85, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 77, 1, 0, 252, 255, 57, 16, 2, 77, 0, 0, 8, + 69, 1, 0, 252, 127, 29, 8, 1, + ])) + dec2 = codec.decode(bytes2) + assert dec2 == b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 1024 * 32 diff --git a/numcodecs/zstd.pyx b/numcodecs/zstd.pyx index f1a84011..013fb981 100644 --- a/numcodecs/zstd.pyx +++ b/numcodecs/zstd.pyx @@ -182,7 +182,9 @@ def decompress(source, dest=None): Compressed data. Can be any object supporting the buffer protocol. dest : array-like, optional Object to decompress into. If the content size is unknown, the - length of dest must match the decompressed size. + length of dest must match the decompressed size. If the content size + is unknown and dest is not provided, streaming decompression will be + used. Returns ------- From a903ee5a04a1a3fa202d7aded09f3feb2e7fd421 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Fri, 27 Jun 2025 17:20:46 -0400 Subject: [PATCH 03/14] Add better tests from numcodecs.js --- numcodecs/tests/test_zstd.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/numcodecs/tests/test_zstd.py b/numcodecs/tests/test_zstd.py index 9cd364db..0ed7cb74 100644 --- a/numcodecs/tests/test_zstd.py +++ b/numcodecs/tests/test_zstd.py @@ -47,8 +47,8 @@ np.random.randint(0, 2**25, size=1000, dtype='u8').view('m8[m]'), np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[ns]'), np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[ns]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[m]'), - np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[m]'), + np.random.randint(-(2**63), -(2**63) + 20, dtype='i8').view('M8[m]'), + np.random.randint(-(2**63), -(2**63) + 20, dtype='i8').view('m8[m]'), ] @@ -93,15 +93,23 @@ def test_native_functions(): def test_streaming_decompression(): + # Test input frames with unknown frame content size codec = Zstd() - # Bytes from streaming compression + + # Encode bytes directly that were the result of streaming compression bytes_val = bytes(bytearray([ 40, 181, 47, 253, 0, 88, 97, 0, 0, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33, ])) dec = codec.decode(bytes_val) assert dec == b'Hello World!' - bytes2 = bytes(bytearray([ + # Two consecutive frames given as input + bytes2 = bytes(bytearray(bytes_val * 2)) + dec2 = codec.decode(bytes2) + assert dec2 == b'Hello World!Hello World!' + + # Single long frame that decompresses to a large output + bytes3 = bytes(bytearray([ 40, 181, 47, 253, 0, 88, 36, 2, 0, 164, 3, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 1, 0, 58, 252, 223, 115, 5, 5, 76, 0, 0, 8, @@ -112,5 +120,10 @@ def test_streaming_decompression(): 255, 57, 16, 2, 76, 0, 0, 8, 85, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 77, 1, 0, 252, 255, 57, 16, 2, 77, 0, 0, 8, 69, 1, 0, 252, 127, 29, 8, 1, ])) - dec2 = codec.decode(bytes2) - assert dec2 == b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 1024 * 32 + dec3 = codec.decode(bytes3) + assert dec3 == b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 1024 * 32 + + # Garbage input results in an error + bytes4 = bytes(bytearray([0, 0, 0, 0, 0, 0, 0, 0])) + with pytest.raises(RuntimeError, match='Zstd decompression error: invalid input data'): + codec.decode(bytes4) \ No newline at end of file From d3049ca37611901747278df537dc32e5e36a48c1 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Fri, 27 Jun 2025 19:03:00 -0400 Subject: [PATCH 04/14] Adapt zstd.pyx streaming for Py_buffer --- numcodecs/zstd.pyx | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/numcodecs/zstd.pyx b/numcodecs/zstd.pyx index 5de558ca..913ea27e 100644 --- a/numcodecs/zstd.pyx +++ b/numcodecs/zstd.pyx @@ -8,8 +8,6 @@ from cpython.bytes cimport PyBytes_AS_STRING, PyBytes_FromStringAndSize from cpython.memoryview cimport PyMemoryView_GET_BUFFER -from .compat_ext cimport Buffer -from .compat_ext import Buffer from .compat_ext cimport PyBytes_RESIZE, ensure_continguous_memoryview from .compat import ensure_contiguous_ndarray @@ -222,7 +220,7 @@ def decompress(source, dest=None): raise RuntimeError('Zstd decompression error: invalid input data') if dest_size == ZSTD_CONTENTSIZE_UNKNOWN and dest is None: - return stream_decompress(source_buffer) + return stream_decompress(source_pb) # setup destination buffer if dest is None: @@ -230,8 +228,6 @@ def decompress(source, dest=None): dest_1d = dest = PyBytes_FromStringAndSize(NULL, dest_size) else: dest_1d = ensure_contiguous_ndarray(dest) - if dest_size == ZSTD_CONTENTSIZE_UNKNOWN: - dest_size = dest_buffer.nbytes # obtain dest memoryview dest_mv = memoryview(dest_1d) @@ -239,6 +235,9 @@ def decompress(source, dest=None): dest_ptr = dest_pb.buf dest_nbytes = dest_pb.len + if dest_size == ZSTD_CONTENTSIZE_UNKNOWN: + dest_size = dest_nbytes + # validate output buffer if dest_nbytes < dest_size: raise ValueError('destination buffer too small; expected at least %s, ' @@ -261,12 +260,12 @@ def decompress(source, dest=None): return dest -cdef stream_decompress(Buffer source_buffer): +cdef stream_decompress(const Py_buffer* source_pb): """Decompress data of unknown size Parameters ---------- - source : Buffer + source : Py_buffer Compressed data buffer Returns @@ -276,10 +275,9 @@ cdef stream_decompress(Buffer source_buffer): """ cdef: - char *source_ptr + const char *source_ptr void *dest_ptr void *new_dst - Buffer dest_buffer = None size_t source_size, dest_size, decompressed_size size_t DEST_GROWTH_SIZE, status ZSTD_inBuffer input @@ -290,8 +288,8 @@ cdef stream_decompress(Buffer source_buffer): # one completely block in all circumstances DEST_GROWTH_SIZE = ZSTD_DStreamOutSize(); - source_ptr = source_buffer.ptr - source_size = source_buffer.nbytes + source_ptr = source_pb.buf + source_size = source_pb.len # unknown content size, guess it is twice the size as the source dest_size = source_size * 2 From 834b74dccd079fb6ad138831823953a67468871a Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Fri, 27 Jun 2025 19:20:13 -0400 Subject: [PATCH 05/14] Formatting with ruff --- numcodecs/tests/test_zstd.py | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/numcodecs/tests/test_zstd.py b/numcodecs/tests/test_zstd.py index 0ed7cb74..820668c6 100644 --- a/numcodecs/tests/test_zstd.py +++ b/numcodecs/tests/test_zstd.py @@ -97,9 +97,7 @@ def test_streaming_decompression(): codec = Zstd() # Encode bytes directly that were the result of streaming compression - bytes_val = bytes(bytearray([ - 40, 181, 47, 253, 0, 88, 97, 0, 0, 72, 101, 108, 108, 111, 32, 87, 111, 114, 108, 100, 33, - ])) + bytes_val = b'(\xb5/\xfd\x00Xa\x00\x00Hello World!' dec = codec.decode(bytes_val) assert dec == b'Hello World!' @@ -109,21 +107,11 @@ def test_streaming_decompression(): assert dec2 == b'Hello World!Hello World!' # Single long frame that decompresses to a large output - bytes3 = bytes(bytearray([ - 40, 181, 47, 253, 0, 88, 36, 2, 0, 164, 3, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, - 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, - 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 1, 0, 58, 252, 223, 115, 5, 5, 76, 0, 0, 8, - 115, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 107, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 99, 1, 0, 252, 255, 57, - 16, 2, 76, 0, 0, 8, 91, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 83, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 75, 1, - 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 67, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 117, 1, 0, 252, 255, 57, 16, 2, 76, - 0, 0, 8, 109, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 101, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 93, 1, 0, 252, - 255, 57, 16, 2, 76, 0, 0, 8, 85, 1, 0, 252, 255, 57, 16, 2, 76, 0, 0, 8, 77, 1, 0, 252, 255, 57, 16, 2, 77, 0, 0, 8, - 69, 1, 0, 252, 127, 29, 8, 1, - ])) + bytes3 = b'(\xb5/\xfd\x00X$\x02\x00\xa4\x03ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz\x01\x00:\xfc\xdfs\x05\x05L\x00\x00\x08s\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08k\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08c\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08[\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08S\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08K\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08C\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08u\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08m\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08e\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08]\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08U\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08M\x01\x00\xfc\xff9\x10\x02M\x00\x00\x08E\x01\x00\xfc\x7f\x1d\x08\x01' dec3 = codec.decode(bytes3) assert dec3 == b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 1024 * 32 # Garbage input results in an error bytes4 = bytes(bytearray([0, 0, 0, 0, 0, 0, 0, 0])) with pytest.raises(RuntimeError, match='Zstd decompression error: invalid input data'): - codec.decode(bytes4) \ No newline at end of file + codec.decode(bytes4) From a2c7fb025844fab4e89a33c25044552ca0de9af2 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Sat, 28 Jun 2025 16:46:53 -0400 Subject: [PATCH 06/14] Fix zstd comparison of different signedness --- numcodecs/zstd.pyx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/numcodecs/zstd.pyx b/numcodecs/zstd.pyx index 913ea27e..f93da633 100644 --- a/numcodecs/zstd.pyx +++ b/numcodecs/zstd.pyx @@ -203,6 +203,7 @@ def decompress(source, dest=None): char* dest_ptr size_t source_size, dest_size, decompressed_size size_t nbytes, cbytes, blocksize + size_t dest_nbytes # obtain source memoryview source_mv = ensure_continguous_memoryview(source) @@ -298,7 +299,7 @@ cdef stream_decompress(const Py_buffer* source_pb): # minimum dest_size is DEST_GROWTH_SIZE dest_size = DEST_GROWTH_SIZE - dest_ptr = malloc(dest_size) + dest_ptr = malloc(dest_size) zds = ZSTD_createDStream() try: From 0dcffe6b621594ae1c7e3dc4360f45253269950d Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Thu, 3 Jul 2025 05:42:03 -0400 Subject: [PATCH 07/14] Undo change to unrelated test --- numcodecs/tests/test_zstd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/numcodecs/tests/test_zstd.py b/numcodecs/tests/test_zstd.py index 820668c6..b669a315 100644 --- a/numcodecs/tests/test_zstd.py +++ b/numcodecs/tests/test_zstd.py @@ -47,8 +47,8 @@ np.random.randint(0, 2**25, size=1000, dtype='u8').view('m8[m]'), np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[ns]'), np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[ns]'), - np.random.randint(-(2**63), -(2**63) + 20, dtype='i8').view('M8[m]'), - np.random.randint(-(2**63), -(2**63) + 20, dtype='i8').view('m8[m]'), + np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('M8[m]'), + np.random.randint(-(2**63), -(2**63) + 20, size=1000, dtype='i8').view('m8[m]'), ] From 4edc73ad7b2458e2e7552df6b2856ffd54b6cf98 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Thu, 3 Jul 2025 16:32:34 -0400 Subject: [PATCH 08/14] Add zstd cli tests --- numcodecs/tests/test_zstd.py | 51 +++++++++++++++++++++++++++++++++--- 1 file changed, 48 insertions(+), 3 deletions(-) diff --git a/numcodecs/tests/test_zstd.py b/numcodecs/tests/test_zstd.py index b669a315..974faec3 100644 --- a/numcodecs/tests/test_zstd.py +++ b/numcodecs/tests/test_zstd.py @@ -1,6 +1,7 @@ import itertools import numpy as np +import subprocess import pytest try: @@ -96,22 +97,66 @@ def test_streaming_decompression(): # Test input frames with unknown frame content size codec = Zstd() + # If the zstd command line interface is available, check the bytes + cli = zstd_cli_available() + # Encode bytes directly that were the result of streaming compression bytes_val = b'(\xb5/\xfd\x00Xa\x00\x00Hello World!' dec = codec.decode(bytes_val) - assert dec == b'Hello World!' + dec_expected = b'Hello World!' + assert dec == dec_expected + if cli: + assert bytes_val == generate_zstd_streaming_bytes(dec_expected) + assert dec_expected == generate_zstd_streaming_bytes(bytes_val, decompress=True) # Two consecutive frames given as input bytes2 = bytes(bytearray(bytes_val * 2)) dec2 = codec.decode(bytes2) - assert dec2 == b'Hello World!Hello World!' + dec2_expected = b'Hello World!Hello World!' + assert dec2 == dec2_expected + if cli: + assert dec2_expected == generate_zstd_streaming_bytes(bytes2, decompress=True) # Single long frame that decompresses to a large output bytes3 = b'(\xb5/\xfd\x00X$\x02\x00\xa4\x03ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz\x01\x00:\xfc\xdfs\x05\x05L\x00\x00\x08s\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08k\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08c\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08[\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08S\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08K\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08C\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08u\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08m\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08e\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08]\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08U\x01\x00\xfc\xff9\x10\x02L\x00\x00\x08M\x01\x00\xfc\xff9\x10\x02M\x00\x00\x08E\x01\x00\xfc\x7f\x1d\x08\x01' dec3 = codec.decode(bytes3) - assert dec3 == b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 1024 * 32 + dec3_expected = b'ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz' * 1024 * 32 + assert dec3 == dec3_expected + if cli: + assert bytes3 == generate_zstd_streaming_bytes(dec3_expected) + assert dec3_expected == generate_zstd_streaming_bytes(bytes3, decompress=True) # Garbage input results in an error bytes4 = bytes(bytearray([0, 0, 0, 0, 0, 0, 0, 0])) with pytest.raises(RuntimeError, match='Zstd decompression error: invalid input data'): codec.decode(bytes4) + +def generate_zstd_streaming_bytes(input: bytes, *, decompress: bool =False) -> bytes: + """ + Use the zstd command line interface to compress or decompress bytes in streaming mode. + """ + if decompress: + args = ["-d"] + else: + args = [] + + p = subprocess.run( + ["zstd", "--no-check", *args], + input=input, + capture_output=True + ) + return p.stdout + +def view_zstd_streaming_bytes(): + bytes_val = generate_zstd_streaming_bytes(b"Hello world!") + print(f" bytes_val = {bytes_val}") + + bytes3 = generate_zstd_streaming_bytes(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz" * 1024 * 32) + print(f" bytes3 = {bytes3}") + +def zstd_cli_available() -> bool: + return not subprocess.run( + ["zstd", "-V"], + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL + ).returncode From 16cc9b3a805cb688280e962f1549f2f33b4c46c0 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Thu, 3 Jul 2025 16:34:27 -0400 Subject: [PATCH 09/14] Test Zstd against pyzstd --- numcodecs/tests/test_pyzstd.py | 50 ++++++++++++++++++++++++++++++++++ pyproject.toml | 1 + 2 files changed, 51 insertions(+) create mode 100644 numcodecs/tests/test_pyzstd.py diff --git a/numcodecs/tests/test_pyzstd.py b/numcodecs/tests/test_pyzstd.py new file mode 100644 index 00000000..ba9a24e2 --- /dev/null +++ b/numcodecs/tests/test_pyzstd.py @@ -0,0 +1,50 @@ +import pytest +import numpy as np +from typing import TYPE_CHECKING + +try: + from numcodecs.zstd import Zstd +except ImportError: # pragma: no cover + pytest.skip("numcodecs.zstd not available", allow_module_level=True) + +if TYPE_CHECKING: # pragma: no cover + import pyzstd +else: + pyzstd = pytest.importorskip("pyzstd") + +test_data = [ + b"Hello World!", + np.arange(113).tobytes(), + np.arange(10,15).tobytes(), + np.random.randint(3, 50, size=(53,)).tobytes() +] + +@pytest.mark.parametrize("input", test_data) +def test_pyzstd_simple(input): + z = Zstd() + assert z.decode(pyzstd.compress(input)) == input + assert pyzstd.decompress(z.encode(input)) == input + +@pytest.mark.xfail +@pytest.mark.parametrize("input", test_data) +def test_pyzstd_simple_multiple_frames(input): + z = Zstd() + assert z.decode(pyzstd.compress(input)*2) == input*2 + assert pyzstd.decompress(z.encode(input)*2) == input*2 + +@pytest.mark.parametrize("input", test_data) +def test_pyzstd_streaming(input): + pyzstd_c = pyzstd.ZstdCompressor() + pyzstd_d = pyzstd.ZstdDecompressor() + z = Zstd() + + d_bytes = input + pyzstd_c.compress(d_bytes) + c_bytes = pyzstd_c.flush() + assert z.decode(c_bytes) == d_bytes + assert pyzstd_d.decompress(z.encode(d_bytes)) == d_bytes + + # Test multiple streaming frames + assert z.decode(c_bytes*2) == d_bytes*2 + assert z.decode(c_bytes*3) == d_bytes*3 + assert z.decode(c_bytes*99) == d_bytes*99 diff --git a/pyproject.toml b/pyproject.toml index e7d8ff69..387603f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -57,6 +57,7 @@ test = [ "coverage", "pytest", "pytest-cov", + "pyzstd" ] test_extras = [ "importlib_metadata", From 308caf32b4e38ef2b810abfd13a678f56f1d5a75 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Thu, 3 Jul 2025 16:37:39 -0400 Subject: [PATCH 10/14] Apply ruff --- numcodecs/tests/test_pyzstd.py | 23 +++++++++++++---------- numcodecs/tests/test_zstd.py | 19 +++++++++---------- 2 files changed, 22 insertions(+), 20 deletions(-) diff --git a/numcodecs/tests/test_pyzstd.py b/numcodecs/tests/test_pyzstd.py index ba9a24e2..5e08900d 100644 --- a/numcodecs/tests/test_pyzstd.py +++ b/numcodecs/tests/test_pyzstd.py @@ -13,31 +13,34 @@ pyzstd = pytest.importorskip("pyzstd") test_data = [ - b"Hello World!", - np.arange(113).tobytes(), - np.arange(10,15).tobytes(), - np.random.randint(3, 50, size=(53,)).tobytes() + b"Hello World!", + np.arange(113).tobytes(), + np.arange(10, 15).tobytes(), + np.random.randint(3, 50, size=(53,)).tobytes(), ] + @pytest.mark.parametrize("input", test_data) def test_pyzstd_simple(input): z = Zstd() assert z.decode(pyzstd.compress(input)) == input assert pyzstd.decompress(z.encode(input)) == input + @pytest.mark.xfail @pytest.mark.parametrize("input", test_data) def test_pyzstd_simple_multiple_frames(input): z = Zstd() - assert z.decode(pyzstd.compress(input)*2) == input*2 - assert pyzstd.decompress(z.encode(input)*2) == input*2 + assert z.decode(pyzstd.compress(input) * 2) == input * 2 + assert pyzstd.decompress(z.encode(input) * 2) == input * 2 + @pytest.mark.parametrize("input", test_data) def test_pyzstd_streaming(input): pyzstd_c = pyzstd.ZstdCompressor() pyzstd_d = pyzstd.ZstdDecompressor() z = Zstd() - + d_bytes = input pyzstd_c.compress(d_bytes) c_bytes = pyzstd_c.flush() @@ -45,6 +48,6 @@ def test_pyzstd_streaming(input): assert pyzstd_d.decompress(z.encode(d_bytes)) == d_bytes # Test multiple streaming frames - assert z.decode(c_bytes*2) == d_bytes*2 - assert z.decode(c_bytes*3) == d_bytes*3 - assert z.decode(c_bytes*99) == d_bytes*99 + assert z.decode(c_bytes * 2) == d_bytes * 2 + assert z.decode(c_bytes * 3) == d_bytes * 3 + assert z.decode(c_bytes * 99) == d_bytes * 99 diff --git a/numcodecs/tests/test_zstd.py b/numcodecs/tests/test_zstd.py index 974faec3..280773c9 100644 --- a/numcodecs/tests/test_zstd.py +++ b/numcodecs/tests/test_zstd.py @@ -131,7 +131,8 @@ def test_streaming_decompression(): with pytest.raises(RuntimeError, match='Zstd decompression error: invalid input data'): codec.decode(bytes4) -def generate_zstd_streaming_bytes(input: bytes, *, decompress: bool =False) -> bytes: + +def generate_zstd_streaming_bytes(input: bytes, *, decompress: bool = False) -> bytes: """ Use the zstd command line interface to compress or decompress bytes in streaming mode. """ @@ -140,23 +141,21 @@ def generate_zstd_streaming_bytes(input: bytes, *, decompress: bool =False) -> b else: args = [] - p = subprocess.run( - ["zstd", "--no-check", *args], - input=input, - capture_output=True - ) + p = subprocess.run(["zstd", "--no-check", *args], input=input, capture_output=True) return p.stdout + def view_zstd_streaming_bytes(): bytes_val = generate_zstd_streaming_bytes(b"Hello world!") print(f" bytes_val = {bytes_val}") - bytes3 = generate_zstd_streaming_bytes(b"ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz" * 1024 * 32) + bytes3 = generate_zstd_streaming_bytes( + b"ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\]^_`abcdefghijklmnopqrstuvwxyz" * 1024 * 32 + ) print(f" bytes3 = {bytes3}") + def zstd_cli_available() -> bool: return not subprocess.run( - ["zstd", "-V"], - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL + ["zstd", "-V"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL ).returncode From 1f5670fceb1565fb571022de78d28da5ac8ad1df Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Thu, 3 Jul 2025 17:03:49 -0400 Subject: [PATCH 11/14] Fix zstd tests, coverage --- numcodecs/tests/test_pyzstd.py | 15 +++++++++++---- numcodecs/tests/test_zstd.py | 4 +++- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/numcodecs/tests/test_pyzstd.py b/numcodecs/tests/test_pyzstd.py index 5e08900d..6567fb40 100644 --- a/numcodecs/tests/test_pyzstd.py +++ b/numcodecs/tests/test_pyzstd.py @@ -1,7 +1,9 @@ -import pytest -import numpy as np +# Check Zstd against pyzstd package from typing import TYPE_CHECKING +import numpy as np +import pytest + try: from numcodecs.zstd import Zstd except ImportError: # pragma: no cover @@ -16,7 +18,7 @@ b"Hello World!", np.arange(113).tobytes(), np.arange(10, 15).tobytes(), - np.random.randint(3, 50, size=(53,)).tobytes(), + np.random.randint(3, 50, size=(53,), dtype=np.uint16).tobytes(), ] @@ -29,9 +31,14 @@ def test_pyzstd_simple(input): @pytest.mark.xfail @pytest.mark.parametrize("input", test_data) -def test_pyzstd_simple_multiple_frames(input): +def test_pyzstd_simple_multiple_frames_decode(input): z = Zstd() assert z.decode(pyzstd.compress(input) * 2) == input * 2 + + +@pytest.mark.parametrize("input", test_data) +def test_pyzstd_simple_multiple_frames_encode(input): + z = Zstd() assert pyzstd.decompress(z.encode(input) * 2) == input * 2 diff --git a/numcodecs/tests/test_zstd.py b/numcodecs/tests/test_zstd.py index 280773c9..73433d71 100644 --- a/numcodecs/tests/test_zstd.py +++ b/numcodecs/tests/test_zstd.py @@ -1,7 +1,7 @@ import itertools +import subprocess import numpy as np -import subprocess import pytest try: @@ -99,6 +99,8 @@ def test_streaming_decompression(): # If the zstd command line interface is available, check the bytes cli = zstd_cli_available() + if cli: + view_zstd_streaming_bytes() # Encode bytes directly that were the result of streaming compression bytes_val = b'(\xb5/\xfd\x00Xa\x00\x00Hello World!' From dcf29890c524862c510317a0407f87b60857b261 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Wed, 9 Jul 2025 22:17:09 -0400 Subject: [PATCH 12/14] Make imports not optional --- numcodecs/tests/test_pyzstd.py | 12 ++---------- numcodecs/tests/test_zstd.py | 7 +------ 2 files changed, 3 insertions(+), 16 deletions(-) diff --git a/numcodecs/tests/test_pyzstd.py b/numcodecs/tests/test_pyzstd.py index 6567fb40..52f0b58f 100644 --- a/numcodecs/tests/test_pyzstd.py +++ b/numcodecs/tests/test_pyzstd.py @@ -1,18 +1,10 @@ # Check Zstd against pyzstd package -from typing import TYPE_CHECKING import numpy as np import pytest +import pyzstd -try: - from numcodecs.zstd import Zstd -except ImportError: # pragma: no cover - pytest.skip("numcodecs.zstd not available", allow_module_level=True) - -if TYPE_CHECKING: # pragma: no cover - import pyzstd -else: - pyzstd = pytest.importorskip("pyzstd") +from numcodecs.zstd import Zstd test_data = [ b"Hello World!", diff --git a/numcodecs/tests/test_zstd.py b/numcodecs/tests/test_zstd.py index 73433d71..04b474df 100644 --- a/numcodecs/tests/test_zstd.py +++ b/numcodecs/tests/test_zstd.py @@ -4,12 +4,6 @@ import numpy as np import pytest -try: - from numcodecs.zstd import Zstd -except ImportError: # pragma: no cover - pytest.skip("numcodecs.zstd not available", allow_module_level=True) - - from numcodecs.tests.common import ( check_backwards_compatibility, check_config, @@ -18,6 +12,7 @@ check_err_encode_object_buffer, check_repr, ) +from numcodecs.zstd import Zstd codecs = [ Zstd(), From 78f26fcd0b79b515d76acaf7c31bf4b6143d7ad8 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Wed, 9 Jul 2025 22:17:49 -0400 Subject: [PATCH 13/14] Add EndlessZstdDecompressor tests --- numcodecs/tests/test_pyzstd.py | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/numcodecs/tests/test_pyzstd.py b/numcodecs/tests/test_pyzstd.py index 52f0b58f..f3cdd544 100644 --- a/numcodecs/tests/test_pyzstd.py +++ b/numcodecs/tests/test_pyzstd.py @@ -38,6 +38,7 @@ def test_pyzstd_simple_multiple_frames_encode(input): def test_pyzstd_streaming(input): pyzstd_c = pyzstd.ZstdCompressor() pyzstd_d = pyzstd.ZstdDecompressor() + pyzstd_e = pyzstd.EndlessZstdDecompressor() z = Zstd() d_bytes = input @@ -47,6 +48,11 @@ def test_pyzstd_streaming(input): assert pyzstd_d.decompress(z.encode(d_bytes)) == d_bytes # Test multiple streaming frames - assert z.decode(c_bytes * 2) == d_bytes * 2 - assert z.decode(c_bytes * 3) == d_bytes * 3 - assert z.decode(c_bytes * 99) == d_bytes * 99 + assert z.decode(c_bytes * 2) == pyzstd_e.decompress(c_bytes * 2) + assert z.decode(c_bytes * 3) == pyzstd_e.decompress(c_bytes * 3) + assert z.decode(c_bytes * 4) == pyzstd_e.decompress(c_bytes * 4) + assert z.decode(c_bytes * 5) == pyzstd_e.decompress(c_bytes * 5) + assert z.decode(c_bytes * 7) == pyzstd_e.decompress(c_bytes * 7) + assert z.decode(c_bytes * 11) == pyzstd_e.decompress(c_bytes * 11) + assert z.decode(c_bytes * 13) == pyzstd_e.decompress(c_bytes * 13) + assert z.decode(c_bytes * 99) == pyzstd_e.decompress(c_bytes * 99) From 313d534946796a53ee3d7074a1db0fe973819893 Mon Sep 17 00:00:00 2001 From: Mark Kittisopikul Date: Thu, 10 Jul 2025 16:55:12 -0400 Subject: [PATCH 14/14] Add docstrings to test_pyzstd.py --- numcodecs/tests/test_pyzstd.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/numcodecs/tests/test_pyzstd.py b/numcodecs/tests/test_pyzstd.py index f3cdd544..b9dd6db2 100644 --- a/numcodecs/tests/test_pyzstd.py +++ b/numcodecs/tests/test_pyzstd.py @@ -16,6 +16,10 @@ @pytest.mark.parametrize("input", test_data) def test_pyzstd_simple(input): + """ + Test if Zstd.[decode, encode] can perform the inverse operation to + pyzstd.[compress, decompress] in the simple case. + """ z = Zstd() assert z.decode(pyzstd.compress(input)) == input assert pyzstd.decompress(z.encode(input)) == input @@ -24,18 +28,33 @@ def test_pyzstd_simple(input): @pytest.mark.xfail @pytest.mark.parametrize("input", test_data) def test_pyzstd_simple_multiple_frames_decode(input): + """ + Test decompression of two concatenated frames of known sizes + + numcodecs.zstd.Zstd currently fails because it only assesses the size of the + first frame. Rather, it should keep iterating through all the frames until + the end of the input buffer. + """ z = Zstd() + assert pyzstd.decompress(pyzstd.compress(input) * 2) == input * 2 assert z.decode(pyzstd.compress(input) * 2) == input * 2 @pytest.mark.parametrize("input", test_data) def test_pyzstd_simple_multiple_frames_encode(input): + """ + Test if pyzstd can decompress two concatenated frames from Zstd.encode + """ z = Zstd() assert pyzstd.decompress(z.encode(input) * 2) == input * 2 @pytest.mark.parametrize("input", test_data) def test_pyzstd_streaming(input): + """ + Test if Zstd can decode a single frame and concatenated frames in streaming + mode where the decompressed size is not recorded in the frame header. + """ pyzstd_c = pyzstd.ZstdCompressor() pyzstd_d = pyzstd.ZstdDecompressor() pyzstd_e = pyzstd.EndlessZstdDecompressor()