diff --git a/problem_report.py b/problem_report.py index 63afb2eeb..876796027 100644 --- a/problem_report.py +++ b/problem_report.py @@ -87,6 +87,12 @@ def __next__(self) -> Iterator[bytes]: return self.entry_iterator() +def _add_extension_if_missing(filename: str, extension: str) -> str: + if filename.endswith(extension): + return filename + return f"{filename}{extension}" + + def _base64_decoder(entry: Iterable[bytes]) -> Iterator[bytes]: for line in entry: try: @@ -95,6 +101,24 @@ def _base64_decoder(entry: Iterable[bytes]) -> Iterator[bytes]: raise MalformedProblemReport(str(error)) from None +def _create_compressed_attachment(name: str, value: bytes) -> email.mime.base.MIMEBase: + mime_subtype, extension = _derive_compression(name, value) + filename = _add_extension_if_missing(name, extension) + attachment = email.mime.base.MIMEBase("application", mime_subtype) + attachment.add_header("Content-Disposition", "attachment", filename=filename) + attachment.set_payload(value) + email.encoders.encode_base64(attachment) + return attachment + + +def _derive_compression(name: str, value: bytes) -> tuple[str, str]: + if value.startswith(GZIP_HEADER_START): + return ("gzip", ".gz") + if value.startswith(ZSTANDARD_MAGIC_NUMBER): + return ("zstd", ".zst") + raise ValueError(f"Unknown compression for {name}") + + def _strip_gzip_header(line: bytes) -> bytes: """Strip gzip header from line and return the rest.""" flags = line[3] @@ -829,7 +853,6 @@ def write_mime( # if it's a tuple, we have a file reference; read the contents # and gzip it elif not isinstance(v, bytes | str): - attach_value = "" if hasattr(v[0], "read"): f = v[0] # file-like object else: @@ -850,7 +873,7 @@ def write_mime( f.close() # binary value - elif self.is_binary(v): + elif isinstance(v, bytes) and self.is_binary(v): if k.endswith(".gz"): attach_value = v else: @@ -858,16 +881,7 @@ def write_mime( # if we have an attachment value, create an attachment if attach_value: - att = email.mime.base.MIMEBase("application", "gzip") - if k.endswith(".gz"): - att.add_header("Content-Disposition", "attachment", filename=k) - else: - att.add_header( - "Content-Disposition", "attachment", filename=k + ".gz" - ) - att.set_payload(attach_value) - email.encoders.encode_base64(att) - attachments.append(att) + attachments.append(_create_compressed_attachment(k, attach_value)) else: # plain text value size = len(v) diff --git a/tests/unit/test_problem_report.py b/tests/unit/test_problem_report.py index 186f1ed99..a3996924f 100644 --- a/tests/unit/test_problem_report.py +++ b/tests/unit/test_problem_report.py @@ -21,6 +21,13 @@ import problem_report BIN_DATA = b"ABABABABAB\0\0\0Z\x01\x02" +GZIP_BIN_DATA = ( + b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03str\x84B\x06" + b"\x06\x86(F&\x003\x95\xd4\x0b\x10\x00\x00\x00" +) +ZSTD_BIN_DATA = ( + b"(\xb5/\xfd$\x10\x81\x00\x00ABABABABAB\x00\x00\x00Z\x01\x02\xbc\xdf\xdd\xfd" +) class T(unittest.TestCase): # pylint: disable=too-many-public-methods @@ -578,6 +585,78 @@ def test_sorted_items(self) -> None: ], ) + @unittest.skipUnless(zstandard, "zstandard Python module not available") + def test_write_mime_binary_values(self) -> None: + """write_mine() for binary values (gzip and zstd compressed).""" + report = problem_report.ProblemReport(date="now!") + report["Data.gz"] = GZIP_BIN_DATA + report["GzipData"] = problem_report.CompressedValue( + compressed_value=GZIP_BIN_DATA + ) + report["ZstdData"] = problem_report.CompressedValue( + compressed_value=ZSTD_BIN_DATA + ) + + output = io.BytesIO() + report.write_mime(output) + output.seek(0) + + message = email.message_from_binary_file(output) + remaining_parts = message.walk() + + # first part is the multipart container + part = next(remaining_parts) + self.assertTrue(part.is_multipart()) + + # second part should be an inline text/plain attachments with all short + # fields + part = next(remaining_parts) + self.assertFalse(part.is_multipart()) + self.assertEqual(part.get_content_type(), "text/plain") + self.assertEqual(part.get_content_charset(), "utf-8") + self.assertIsNone(part.get_filename()) + self.assertEqual( + part.get_payload(decode=True), b"ProblemType: Crash\nDate: now!\n" + ) + + # third part should be the Data.gz as attachment + part = next(remaining_parts) + self.assertEqual(part.get_filename(), "Data.gz") + self.assertFalse(part.is_multipart()) + self.assertEqual(part.get_content_type(), "application/gzip") + self.assertIsNone(part.get_content_charset()) + self.assertEqual(part.get_payload(decode=True), GZIP_BIN_DATA) + + # fourth part should be the GzipData as attachment + part = next(remaining_parts) + self.assertEqual(part.get_filename(), "GzipData.gz") + self.assertFalse(part.is_multipart()) + self.assertEqual(part.get_content_type(), "application/gzip") + self.assertIsNone(part.get_content_charset()) + self.assertEqual(part.get_payload(decode=True), GZIP_BIN_DATA) + + # fifth part should be the ZstdData as attachment + part = next(remaining_parts) + self.assertEqual(part.get_filename(), "ZstdData.zst") + self.assertFalse(part.is_multipart()) + self.assertEqual(part.get_content_type(), "application/zstd") + self.assertIsNone(part.get_content_charset()) + self.assertEqual(part.get_payload(decode=True), ZSTD_BIN_DATA) + + with self.assertRaises(StopIteration): + next(remaining_parts) + + def test_write_mime_invalid_compressed_binary(self) -> None: + """write_mine() for invalid compressed binary values.""" + report = problem_report.ProblemReport() + report["InvalidData.gz"] = b"\0X" + + output = io.BytesIO() + with self.assertRaisesRegex( + ValueError, "^Unknown compression for InvalidData.gz$" + ): + report.write_mime(output) + def test_write_mime_text(self) -> None: """write_mime() for text values.""" pr = problem_report.ProblemReport(date="now!")