From eca903872d928b366f38aa6e355df64e0df5d4bb Mon Sep 17 00:00:00 2001 From: Benjamin Drung Date: Fri, 24 Apr 2026 15:03:10 +0200 Subject: [PATCH 1/2] problem_report: introduce _create_compressed_attachment Refactor `problem_report.py` by introducing the helper function `_create_compressed_attachment` in preparation for following commits. Ensure that the type of `attach_value` is either `None` or `bytes`. Bug: https://launchpad.net/bugs/2148656 Signed-off-by: Benjamin Drung --- problem_report.py | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/problem_report.py b/problem_report.py index 63afb2eeb..366a09370 100644 --- a/problem_report.py +++ b/problem_report.py @@ -87,6 +87,12 @@ def __next__(self) -> Iterator[bytes]: return self.entry_iterator() +def _add_extension_if_missing(filename: str, extension: str) -> str: + if filename.endswith(extension): + return filename + return f"{filename}{extension}" + + def _base64_decoder(entry: Iterable[bytes]) -> Iterator[bytes]: for line in entry: try: @@ -95,6 +101,15 @@ def _base64_decoder(entry: Iterable[bytes]) -> Iterator[bytes]: raise MalformedProblemReport(str(error)) from None +def _create_compressed_attachment(name: str, value: bytes) -> email.mime.base.MIMEBase: + filename = _add_extension_if_missing(name, ".gz") + attachment = email.mime.base.MIMEBase("application", "gzip") + attachment.add_header("Content-Disposition", "attachment", filename=filename) + attachment.set_payload(value) + email.encoders.encode_base64(attachment) + return attachment + + def _strip_gzip_header(line: bytes) -> bytes: """Strip gzip header from line and return the rest.""" flags = line[3] @@ -829,7 +844,6 @@ def write_mime( # if it's a tuple, we have a file reference; read the contents # and gzip it elif not isinstance(v, bytes | str): - attach_value = "" if hasattr(v[0], "read"): f = v[0] # file-like object else: @@ -850,7 +864,7 @@ def write_mime( f.close() # binary value - elif self.is_binary(v): + elif isinstance(v, bytes) and self.is_binary(v): if k.endswith(".gz"): attach_value = v else: @@ -858,16 +872,7 @@ def write_mime( # if we have an attachment value, create an attachment if attach_value: - att = email.mime.base.MIMEBase("application", "gzip") - if k.endswith(".gz"): - att.add_header("Content-Disposition", "attachment", filename=k) - else: - att.add_header( - "Content-Disposition", "attachment", filename=k + ".gz" - ) - att.set_payload(attach_value) - email.encoders.encode_base64(att) - attachments.append(att) + attachments.append(_create_compressed_attachment(k, attach_value)) else: # plain text value size = len(v) From a36ff969434d3500341eea532ccd9081be7334d2 Mon Sep 17 00:00:00 2001 From: Benjamin Drung Date: Fri, 24 Apr 2026 15:04:56 +0200 Subject: [PATCH 2/2] problem_report: fix file extension for zstd compressed values When Apport is used with systemd-coredump, the CoreDump is zstd compressed (instead of gzip). When filing Launchpad bugs, the zstd compressed CoreDump is attached to the bug named `CoreDump.gz`. Support zstd compressed values in `ProblemReport.write_mime`. Tested with: ``` divide-by-zero PYTHONPATH=$(pwd) APPORT_LAUNCHPAD_INSTANCE=qastaging bin/apport-cli /var/crash/_usr_bin_divide-by-zero.1000.crash ``` Bug: https://launchpad.net/bugs/2148656 --- problem_report.py | 13 ++++- tests/unit/test_problem_report.py | 79 +++++++++++++++++++++++++++++++ 2 files changed, 90 insertions(+), 2 deletions(-) diff --git a/problem_report.py b/problem_report.py index 366a09370..876796027 100644 --- a/problem_report.py +++ b/problem_report.py @@ -102,14 +102,23 @@ def _base64_decoder(entry: Iterable[bytes]) -> Iterator[bytes]: def _create_compressed_attachment(name: str, value: bytes) -> email.mime.base.MIMEBase: - filename = _add_extension_if_missing(name, ".gz") - attachment = email.mime.base.MIMEBase("application", "gzip") + mime_subtype, extension = _derive_compression(name, value) + filename = _add_extension_if_missing(name, extension) + attachment = email.mime.base.MIMEBase("application", mime_subtype) attachment.add_header("Content-Disposition", "attachment", filename=filename) attachment.set_payload(value) email.encoders.encode_base64(attachment) return attachment +def _derive_compression(name: str, value: bytes) -> tuple[str, str]: + if value.startswith(GZIP_HEADER_START): + return ("gzip", ".gz") + if value.startswith(ZSTANDARD_MAGIC_NUMBER): + return ("zstd", ".zst") + raise ValueError(f"Unknown compression for {name}") + + def _strip_gzip_header(line: bytes) -> bytes: """Strip gzip header from line and return the rest.""" flags = line[3] diff --git a/tests/unit/test_problem_report.py b/tests/unit/test_problem_report.py index 186f1ed99..a3996924f 100644 --- a/tests/unit/test_problem_report.py +++ b/tests/unit/test_problem_report.py @@ -21,6 +21,13 @@ import problem_report BIN_DATA = b"ABABABABAB\0\0\0Z\x01\x02" +GZIP_BIN_DATA = ( + b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03str\x84B\x06" + b"\x06\x86(F&\x003\x95\xd4\x0b\x10\x00\x00\x00" +) +ZSTD_BIN_DATA = ( + b"(\xb5/\xfd$\x10\x81\x00\x00ABABABABAB\x00\x00\x00Z\x01\x02\xbc\xdf\xdd\xfd" +) class T(unittest.TestCase): # pylint: disable=too-many-public-methods @@ -578,6 +585,78 @@ def test_sorted_items(self) -> None: ], ) + @unittest.skipUnless(zstandard, "zstandard Python module not available") + def test_write_mime_binary_values(self) -> None: + """write_mine() for binary values (gzip and zstd compressed).""" + report = problem_report.ProblemReport(date="now!") + report["Data.gz"] = GZIP_BIN_DATA + report["GzipData"] = problem_report.CompressedValue( + compressed_value=GZIP_BIN_DATA + ) + report["ZstdData"] = problem_report.CompressedValue( + compressed_value=ZSTD_BIN_DATA + ) + + output = io.BytesIO() + report.write_mime(output) + output.seek(0) + + message = email.message_from_binary_file(output) + remaining_parts = message.walk() + + # first part is the multipart container + part = next(remaining_parts) + self.assertTrue(part.is_multipart()) + + # second part should be an inline text/plain attachments with all short + # fields + part = next(remaining_parts) + self.assertFalse(part.is_multipart()) + self.assertEqual(part.get_content_type(), "text/plain") + self.assertEqual(part.get_content_charset(), "utf-8") + self.assertIsNone(part.get_filename()) + self.assertEqual( + part.get_payload(decode=True), b"ProblemType: Crash\nDate: now!\n" + ) + + # third part should be the Data.gz as attachment + part = next(remaining_parts) + self.assertEqual(part.get_filename(), "Data.gz") + self.assertFalse(part.is_multipart()) + self.assertEqual(part.get_content_type(), "application/gzip") + self.assertIsNone(part.get_content_charset()) + self.assertEqual(part.get_payload(decode=True), GZIP_BIN_DATA) + + # fourth part should be the GzipData as attachment + part = next(remaining_parts) + self.assertEqual(part.get_filename(), "GzipData.gz") + self.assertFalse(part.is_multipart()) + self.assertEqual(part.get_content_type(), "application/gzip") + self.assertIsNone(part.get_content_charset()) + self.assertEqual(part.get_payload(decode=True), GZIP_BIN_DATA) + + # fifth part should be the ZstdData as attachment + part = next(remaining_parts) + self.assertEqual(part.get_filename(), "ZstdData.zst") + self.assertFalse(part.is_multipart()) + self.assertEqual(part.get_content_type(), "application/zstd") + self.assertIsNone(part.get_content_charset()) + self.assertEqual(part.get_payload(decode=True), ZSTD_BIN_DATA) + + with self.assertRaises(StopIteration): + next(remaining_parts) + + def test_write_mime_invalid_compressed_binary(self) -> None: + """write_mine() for invalid compressed binary values.""" + report = problem_report.ProblemReport() + report["InvalidData.gz"] = b"\0X" + + output = io.BytesIO() + with self.assertRaisesRegex( + ValueError, "^Unknown compression for InvalidData.gz$" + ): + report.write_mime(output) + def test_write_mime_text(self) -> None: """write_mime() for text values.""" pr = problem_report.ProblemReport(date="now!")