Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 26 additions & 12 deletions problem_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ def __next__(self) -> Iterator[bytes]:
return self.entry_iterator()


def _add_extension_if_missing(filename: str, extension: str) -> str:
if filename.endswith(extension):
return filename
return f"{filename}{extension}"


def _base64_decoder(entry: Iterable[bytes]) -> Iterator[bytes]:
for line in entry:
try:
Expand All @@ -95,6 +101,24 @@ def _base64_decoder(entry: Iterable[bytes]) -> Iterator[bytes]:
raise MalformedProblemReport(str(error)) from None


def _create_compressed_attachment(name: str, value: bytes) -> email.mime.base.MIMEBase:
mime_subtype, extension = _derive_compression(name, value)
filename = _add_extension_if_missing(name, extension)
attachment = email.mime.base.MIMEBase("application", mime_subtype)
attachment.add_header("Content-Disposition", "attachment", filename=filename)
attachment.set_payload(value)
email.encoders.encode_base64(attachment)
return attachment


def _derive_compression(name: str, value: bytes) -> tuple[str, str]:
if value.startswith(GZIP_HEADER_START):
return ("gzip", ".gz")
if value.startswith(ZSTANDARD_MAGIC_NUMBER):
return ("zstd", ".zst")
raise ValueError(f"Unknown compression for {name}")


def _strip_gzip_header(line: bytes) -> bytes:
"""Strip gzip header from line and return the rest."""
flags = line[3]
Expand Down Expand Up @@ -829,7 +853,6 @@ def write_mime(
# if it's a tuple, we have a file reference; read the contents
# and gzip it
elif not isinstance(v, bytes | str):
attach_value = ""
if hasattr(v[0], "read"):
f = v[0] # file-like object
else:
Expand All @@ -850,24 +873,15 @@ def write_mime(
f.close()

# binary value
elif self.is_binary(v):
elif isinstance(v, bytes) and self.is_binary(v):
if k.endswith(".gz"):
attach_value = v
else:
attach_value = CompressedValue(v, k).compressed_value

# if we have an attachment value, create an attachment
if attach_value:
att = email.mime.base.MIMEBase("application", "gzip")
if k.endswith(".gz"):
att.add_header("Content-Disposition", "attachment", filename=k)
else:
att.add_header(
"Content-Disposition", "attachment", filename=k + ".gz"
)
att.set_payload(attach_value)
email.encoders.encode_base64(att)
attachments.append(att)
attachments.append(_create_compressed_attachment(k, attach_value))
else:
# plain text value
size = len(v)
Expand Down
79 changes: 79 additions & 0 deletions tests/unit/test_problem_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@
import problem_report

BIN_DATA = b"ABABABABAB\0\0\0Z\x01\x02"
GZIP_BIN_DATA = (
b"\x1f\x8b\x08\x00\x00\x00\x00\x00\x00\x03str\x84B\x06"
b"\x06\x86(F&\x003\x95\xd4\x0b\x10\x00\x00\x00"
)
ZSTD_BIN_DATA = (
b"(\xb5/\xfd$\x10\x81\x00\x00ABABABABAB\x00\x00\x00Z\x01\x02\xbc\xdf\xdd\xfd"
)


class T(unittest.TestCase): # pylint: disable=too-many-public-methods
Expand Down Expand Up @@ -578,6 +585,78 @@ def test_sorted_items(self) -> None:
],
)

@unittest.skipUnless(zstandard, "zstandard Python module not available")
def test_write_mime_binary_values(self) -> None:
"""write_mine() for binary values (gzip and zstd compressed)."""
report = problem_report.ProblemReport(date="now!")
report["Data.gz"] = GZIP_BIN_DATA
report["GzipData"] = problem_report.CompressedValue(
compressed_value=GZIP_BIN_DATA
)
report["ZstdData"] = problem_report.CompressedValue(
compressed_value=ZSTD_BIN_DATA
)

output = io.BytesIO()
report.write_mime(output)
output.seek(0)

message = email.message_from_binary_file(output)
remaining_parts = message.walk()

# first part is the multipart container
part = next(remaining_parts)
self.assertTrue(part.is_multipart())

# second part should be an inline text/plain attachments with all short
# fields
part = next(remaining_parts)
self.assertFalse(part.is_multipart())
self.assertEqual(part.get_content_type(), "text/plain")
self.assertEqual(part.get_content_charset(), "utf-8")
self.assertIsNone(part.get_filename())
self.assertEqual(
part.get_payload(decode=True), b"ProblemType: Crash\nDate: now!\n"
)

# third part should be the Data.gz as attachment
part = next(remaining_parts)
self.assertEqual(part.get_filename(), "Data.gz")
self.assertFalse(part.is_multipart())
self.assertEqual(part.get_content_type(), "application/gzip")
self.assertIsNone(part.get_content_charset())
self.assertEqual(part.get_payload(decode=True), GZIP_BIN_DATA)

# fourth part should be the GzipData as attachment
part = next(remaining_parts)
self.assertEqual(part.get_filename(), "GzipData.gz")
self.assertFalse(part.is_multipart())
self.assertEqual(part.get_content_type(), "application/gzip")
self.assertIsNone(part.get_content_charset())
self.assertEqual(part.get_payload(decode=True), GZIP_BIN_DATA)

# fifth part should be the ZstdData as attachment
part = next(remaining_parts)
self.assertEqual(part.get_filename(), "ZstdData.zst")
self.assertFalse(part.is_multipart())
self.assertEqual(part.get_content_type(), "application/zstd")
self.assertIsNone(part.get_content_charset())
self.assertEqual(part.get_payload(decode=True), ZSTD_BIN_DATA)

with self.assertRaises(StopIteration):
next(remaining_parts)

def test_write_mime_invalid_compressed_binary(self) -> None:
"""write_mine() for invalid compressed binary values."""
report = problem_report.ProblemReport()
report["InvalidData.gz"] = b"\0X"

output = io.BytesIO()
with self.assertRaisesRegex(
ValueError, "^Unknown compression for InvalidData.gz$"
):
report.write_mime(output)

def test_write_mime_text(self) -> None:
"""write_mime() for text values."""
pr = problem_report.ProblemReport(date="now!")
Expand Down
Loading