Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 3 additions & 8 deletions bin/apport-unpack
Original file line number Diff line number Diff line change
Expand Up @@ -56,22 +56,17 @@ def open_report(report_filename: str) -> Iterator[(BinaryIO | gzip.GzipFile)]:

def unpack_report_to_directory(
report: problem_report.ProblemReport, target_directory: str
) -> list[str]:
) -> None:
"""Write each report entry into a separate file.

Return a list of keys that were not loaded.
"""
missing_keys = []
for key, value in report.items():
if value is None:
missing_keys.append(key)
continue
with open(os.path.join(target_directory, key), "wb") as key_file:
if isinstance(value, str):
key_file.write(value.encode("UTF-8"))
else:
key_file.write(value)
return missing_keys


# pylint: disable-next=missing-function-docstring
Expand All @@ -95,10 +90,10 @@ def main() -> None:
# In case of passing the report to stdin,
# the report needs to be loaded in one go.
# The current implementation loads the whole report into memory.
report.load(report_file, binary=args.report == "-")
bin_keys = report.load(report_file, binary=args.report == "-")
except (OSError, problem_report.MalformedProblemReport) as error:
fatal("%s", str(error))
bin_keys = unpack_report_to_directory(report, args.target_directory)
unpack_report_to_directory(report, args.target_directory)
if bin_keys:
try:
with open_report(args.report) as report_file:
Expand Down
28 changes: 12 additions & 16 deletions problem_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,10 @@ def load(
file: gzip.GzipFile | typing.IO[bytes],
binary: bool | typing.Literal["compressed"] = True,
key_filter: Iterable[str] | None = None,
) -> None:
) -> list[str]:
"""Initialize problem report from a file-like object.

If binary is False, binary data is not loaded; the dictionary key is
created, but its value will be an empty string. If it is True, it is
If binary is False, binary data is not loaded. If it is True, it is
transparently uncompressed and available as dictionary byte array
values. If binary is 'compressed', the compressed value is retained,
and the dictionary value will be a CompressedValue object. This is
Expand All @@ -371,9 +370,13 @@ def load(

file needs to be opened in binary mode.

If key_filter is given, only those keys will be loaded.
If key_filter is given, only those keys will be loaded. If all
specified keys were loaded, the remaining file will be skipped.

Files are in RFC822 format, but with case sensitive keys.

Return a list of dictionary keys that were skipped. Note: This list
might not be exhaustive in case key_filter is given.
"""
self._assert_bin_mode(file)
self.data.clear()
Expand All @@ -382,14 +385,16 @@ def load(
else:
remaining_keys = None

skipped_keys = []
for entry in _EntryParser(file):
key, iterator, base64_encoded = _parse_entry(entry)
if remaining_keys is not None and key not in remaining_keys:
skipped_keys.append(key)
continue

if base64_encoded:
if binary is False:
self.data[key] = None
skipped_keys.append(key)
elif binary == "compressed":
self.data[key] = CompressedValue(
name=key, compressed_value=b"".join(iterator)
Expand All @@ -408,6 +413,7 @@ def load(
break

self.old_keys = set(self.data.keys())
return skipped_keys

def extract_keys(
self,
Expand Down Expand Up @@ -475,13 +481,6 @@ def get_timestamp(self) -> int | None:
except locale.Error:
return None

def has_removed_fields(self) -> bool:
"""Check if the report has any keys which were not loaded.

This could happen when using binary=False in load().
"""
return None in self.values()

@staticmethod
def is_binary(string: bytes | str) -> bool:
"""Check if the given strings contains binary data."""
Expand All @@ -508,8 +507,7 @@ def sorted_items(

The most interesting fields will be returned first. The remaining
items will be returned in alphabetical order. Keys starting with
an underscore are considered internal and are skipped. Also values
that are None will be skipped.
an underscore are considered internal and are skipped.

If keys is provided, only those keys will be iterated over.
"""
Expand All @@ -534,8 +532,6 @@ def sorted_items(
if key.startswith("_"):
continue
value = self[key]
if value is None:
continue
yield key, value

def write(self, file: typing.IO[bytes], only_new: bool = False) -> None:
Expand Down
26 changes: 12 additions & 14 deletions tests/unit/test_problem_report.py
Original file line number Diff line number Diff line change
Expand Up @@ -426,19 +426,18 @@ def test_read_file(self) -> None:

# test with reading everything
pr = problem_report.ProblemReport()
pr.load(io.BytesIO(bin_report))
skipped_keys = pr.load(io.BytesIO(bin_report))
self.assertEqual(pr["File"], BIN_DATA)
self.assertEqual(pr.has_removed_fields(), False)
self.assertEqual(skipped_keys, [])

# test with skipping binary data
pr.load(io.BytesIO(bin_report), binary=False)
self.assertIsNone(pr["File"])
self.assertEqual(pr.has_removed_fields(), True)
skipped_keys = pr.load(io.BytesIO(bin_report), binary=False)
self.assertEqual(skipped_keys, ["File"])

# test with keeping compressed binary data
pr.load(io.BytesIO(bin_report), binary="compressed")
skipped_keys = pr.load(io.BytesIO(bin_report), binary="compressed")
self.assertEqual(pr["Foo"], "Bar")
self.assertEqual(pr.has_removed_fields(), False)
self.assertEqual(skipped_keys, [])
self.assertTrue(isinstance(pr["File"], problem_report.CompressedValue))
self.assertEqual(len(pr["File"]), len(BIN_DATA))
self.assertEqual(pr["File"].get_value(), BIN_DATA)
Expand All @@ -458,18 +457,17 @@ def test_read_file_legacy(self) -> None:

# test with reading everything
pr = problem_report.ProblemReport()
pr.load(io.BytesIO(bin_report))
skipped_keys = pr.load(io.BytesIO(bin_report))
self.assertEqual(pr["File"], b"AB" * 10 + b"\0" * 10 + b"Z")
self.assertEqual(pr.has_removed_fields(), False)
self.assertEqual(skipped_keys, [])

# test with skipping binary data
pr.load(io.BytesIO(bin_report), binary=False)
self.assertIsNone(pr["File"])
self.assertEqual(pr.has_removed_fields(), True)
skipped_keys = pr.load(io.BytesIO(bin_report), binary=False)
self.assertEqual(skipped_keys, ["File"])

# test with keeping CompressedValues
pr.load(io.BytesIO(bin_report), binary="compressed")
self.assertEqual(pr.has_removed_fields(), False)
skipped_keys = pr.load(io.BytesIO(bin_report), binary="compressed")
self.assertEqual(skipped_keys, [])
self.assertEqual(len(pr["File"]), 31)
self.assertEqual(pr["File"].get_value(), b"AB" * 10 + b"\0" * 10 + b"Z")
self.assertEqual(pr["File"].name, "File")
Expand Down
Loading