From 726af73256be9c509c4aa1662c11f72abde7c1be Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Tue, 29 Jun 2021 19:14:56 +0300 Subject: [PATCH 1/6] ng client metadata set: organize common test code Move the shared code between tests into the "setupClass" function. Signed-off-by: Martin Vrachev --- tests/test_trusted_metadata_set.py | 90 ++++++++++++++++-------------- 1 file changed, 47 insertions(+), 43 deletions(-) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index b59e9de78b..397c494ebc 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -16,36 +16,37 @@ class TestTrustedMetadataSet(unittest.TestCase): - def test_update(self): - repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') + @classmethod + def setUpClass(cls): + cls.repo_dir = os.path.join( + os.getcwd(), 'repository_data', 'repository', 'metadata' + ) + cls.metadata = {} + for md in ["root", "timestamp", "snapshot", "targets", "role1", "role2"]: + with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f: + cls.metadata[md] = f.read() + - with open(os.path.join(repo_dir, "root.json"), "rb") as f: - trusted_set = TrustedMetadataSet(f.read()) + def test_update(self): + trusted_set = TrustedMetadataSet(self.metadata["root"]) trusted_set.root_update_finished() - with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f: - trusted_set.update_timestamp(f.read()) - with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f: - trusted_set.update_snapshot(f.read()) - with open(os.path.join(repo_dir, "targets.json"), "rb") as f: - trusted_set.update_targets(f.read()) - with open(os.path.join(repo_dir, "role1.json"), "rb") as f: - trusted_set.update_delegated_targets(f.read(), "role1", "targets") - with open(os.path.join(repo_dir, "role2.json"), "rb") as f: - trusted_set.update_delegated_targets(f.read(), "role2", "role1") + trusted_set.update_timestamp(self.metadata["timestamp"]) + trusted_set.update_snapshot(self.metadata["snapshot"]) + trusted_set.update_targets(self.metadata["targets"]) + trusted_set.update_delegated_targets( + self.metadata["role1"], "role1", "targets" + ) + trusted_set.update_delegated_targets( + self.metadata["role2"], "role2", "role1" + ) def test_out_of_order_ops(self): - repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') - data={} - for md in ["root", "timestamp", "snapshot", "targets", "role1"]: - with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: - data[md] = f.read() - - trusted_set = TrustedMetadataSet(data["root"]) + trusted_set = TrustedMetadataSet(self.metadata["root"]) # Update timestamp before root is finished with self.assertRaises(RuntimeError): - trusted_set.update_timestamp(data["timestamp"]) + trusted_set.update_timestamp(self.metadata["timestamp"]) trusted_set.root_update_finished() with self.assertRaises(RuntimeError): @@ -53,50 +54,53 @@ def test_out_of_order_ops(self): # Update snapshot before timestamp with self.assertRaises(RuntimeError): - trusted_set.update_snapshot(data["snapshot"]) + trusted_set.update_snapshot(self.metadata["snapshot"]) - trusted_set.update_timestamp(data["timestamp"]) + trusted_set.update_timestamp(self.metadata["timestamp"]) # Update targets before snapshot with self.assertRaises(RuntimeError): - trusted_set.update_targets(data["targets"]) + trusted_set.update_targets(self.metadata["targets"]) - trusted_set.update_snapshot(data["snapshot"]) + trusted_set.update_snapshot(self.metadata["snapshot"]) - #update timestamp after snapshot + # update timestamp after snapshot with self.assertRaises(RuntimeError): - trusted_set.update_timestamp(data["timestamp"]) + trusted_set.update_timestamp(self.metadata["timestamp"]) # Update delegated targets before targets with self.assertRaises(RuntimeError): - trusted_set.update_delegated_targets(data["role1"], "role1", "targets") + trusted_set.update_delegated_targets( + self.metadata["role1"], "role1", "targets" + ) - trusted_set.update_targets(data["targets"]) - trusted_set.update_delegated_targets(data["role1"], "role1", "targets") + trusted_set.update_targets(self.metadata["targets"]) + trusted_set.update_delegated_targets( + self.metadata["role1"], "role1", "targets" + ) - def test_update_with_invalid_json(self): - repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') - data={} - for md in ["root", "timestamp", "snapshot", "targets", "role1"]: - with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: - data[md] = f.read() + trusted_set.update_targets(self.metadata["targets"]) + trusted_set.update_delegated_targets( + self.metadata["role1"], "role1", "targets" + ) + def test_update_with_invalid_json(self): # root.json not a json file at all with self.assertRaises(exceptions.RepositoryError): TrustedMetadataSet(b"") # root.json is invalid - root = Metadata.from_bytes(data["root"]) + root = Metadata.from_bytes(self.metadata["root"]) root.signed.version += 1 with self.assertRaises(exceptions.RepositoryError): TrustedMetadataSet(json.dumps(root.to_dict()).encode()) - trusted_set = TrustedMetadataSet(data["root"]) + trusted_set = TrustedMetadataSet(self.metadata["root"]) trusted_set.root_update_finished() top_level_md = [ - (data["timestamp"], trusted_set.update_timestamp), - (data["snapshot"], trusted_set.update_snapshot), - (data["targets"], trusted_set.update_targets), + (self.metadata["timestamp"], trusted_set.update_timestamp), + (self.metadata["snapshot"], trusted_set.update_snapshot), + (self.metadata["targets"], trusted_set.update_targets), ] for metadata, update_func in top_level_md: # metadata is not json @@ -110,7 +114,7 @@ def test_update_with_invalid_json(self): # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): - update_func(data["root"]) + update_func(self.metadata["root"]) update_func(metadata) From 6a942889ffcf2a8785ee74e0c4dcc80ef940b2fc Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Thu, 1 Jul 2021 20:14:13 +0300 Subject: [PATCH 2/6] ngclient TrustedMetadataSet: improve unit testing The current situation with the TrustedMetadataSet testing is that we don't have a mnimimal amount of unit tests testing the different branches in the various API functionality in the class. This commit proposes simple unit tests covering almost all of the branches in the API functions and increasing the unit test coverage (as reported from the "coverage" tool) from 74 % to 97 %. The code could be complicated at places, because the different branches in the update_* functions depend on other metadata classes as well. Still, I hope we can find a way and simplify the code. Signed-off-by: Martin Vrachev --- tests/test_trusted_metadata_set.py | 244 ++++++++++++++++++++++++----- 1 file changed, 209 insertions(+), 35 deletions(-) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 397c494ebc..ba022f2865 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,14 +1,22 @@ +import copy import json import logging import os -import shutil import sys -import tempfile import unittest +from typing import Dict, Any +from datetime import datetime from tuf import exceptions from tuf.api.metadata import Metadata -from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet +from tuf.ngclient._internal.trusted_metadata_set import( + TrustedMetadataSet +) +from securesystemslib.signer import SSlibSigner +from securesystemslib.interface import( + import_ed25519_privatekey_from_file, + import_rsa_privatekey_from_file +) from tests import utils @@ -26,64 +34,92 @@ def setUpClass(cls): with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f: cls.metadata[md] = f.read() + keystore_dir = os.path.join(os.getcwd(), 'repository_data', 'keystore') + cls.keystore = {} + root_key_dict = import_rsa_privatekey_from_file( + os.path.join(keystore_dir, "root" + '_key'), + password="password" + ) + cls.keystore["root"] = SSlibSigner(root_key_dict) + for role in ["delegation", "snapshot", "targets", "timestamp"]: + key_dict = import_ed25519_privatekey_from_file( + os.path.join(keystore_dir, role + '_key'), + password="password" + ) + cls.keystore[role] = SSlibSigner(key_dict) - def test_update(self): - trusted_set = TrustedMetadataSet(self.metadata["root"]) - trusted_set.root_update_finished() + def setUp(self) -> None: + self.trusted_set = TrustedMetadataSet(self.metadata["root"]) - trusted_set.update_timestamp(self.metadata["timestamp"]) - trusted_set.update_snapshot(self.metadata["snapshot"]) - trusted_set.update_targets(self.metadata["targets"]) - trusted_set.update_delegated_targets( + def _root_update_finished_and_update_timestamp(self): + self.trusted_set.root_update_finished() + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + + def _update_all_besides_targets(self): + self.trusted_set.root_update_finished() + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + + def test_update(self): + self.trusted_set.root_update_finished() + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_delegated_targets( self.metadata["role1"], "role1", "targets" ) - trusted_set.update_delegated_targets( + self.trusted_set.update_delegated_targets( self.metadata["role2"], "role2", "role1" ) + # the 4 top level metadata objects + 2 additional delegated targets + self.assertTrue(len(self.trusted_set), 6) def test_out_of_order_ops(self): - trusted_set = TrustedMetadataSet(self.metadata["root"]) - # Update timestamp before root is finished with self.assertRaises(RuntimeError): - trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + + self.trusted_set.root_update_finished() + with self.assertRaises(RuntimeError): + self.trusted_set.root_update_finished() - trusted_set.root_update_finished() + # Update root after a previous successful root update with self.assertRaises(RuntimeError): - trusted_set.root_update_finished() + self.trusted_set.update_root(self.metadata["root"]) # Update snapshot before timestamp with self.assertRaises(RuntimeError): - trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) - trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) # Update targets before snapshot with self.assertRaises(RuntimeError): - trusted_set.update_targets(self.metadata["targets"]) + self.trusted_set.update_targets(self.metadata["targets"]) - trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(self.metadata["snapshot"]) # update timestamp after snapshot with self.assertRaises(RuntimeError): - trusted_set.update_timestamp(self.metadata["timestamp"]) + self.trusted_set.update_timestamp(self.metadata["timestamp"]) # Update delegated targets before targets with self.assertRaises(RuntimeError): - trusted_set.update_delegated_targets( + self.trusted_set.update_delegated_targets( self.metadata["role1"], "role1", "targets" ) - trusted_set.update_targets(self.metadata["targets"]) - trusted_set.update_delegated_targets( - self.metadata["role1"], "role1", "targets" - ) + self.trusted_set.update_targets(self.metadata["targets"]) - trusted_set.update_targets(self.metadata["targets"]) - trusted_set.update_delegated_targets( + # Update snapshot after sucessful targets update + with self.assertRaises(RuntimeError): + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + + self.trusted_set.update_delegated_targets( self.metadata["role1"], "role1", "targets" ) + def test_update_with_invalid_json(self): # root.json not a json file at all with self.assertRaises(exceptions.RepositoryError): @@ -94,20 +130,27 @@ def test_update_with_invalid_json(self): with self.assertRaises(exceptions.RepositoryError): TrustedMetadataSet(json.dumps(root.to_dict()).encode()) - trusted_set = TrustedMetadataSet(self.metadata["root"]) - trusted_set.root_update_finished() + # update_root called with the wrong metadata type + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_root(self.metadata["snapshot"]) + + self.trusted_set.root_update_finished() top_level_md = [ - (self.metadata["timestamp"], trusted_set.update_timestamp), - (self.metadata["snapshot"], trusted_set.update_snapshot), - (self.metadata["targets"], trusted_set.update_targets), + (self.metadata["timestamp"], self.trusted_set.update_timestamp), + (self.metadata["snapshot"], self.trusted_set.update_snapshot), + (self.metadata["targets"], self.trusted_set.update_targets), ] for metadata, update_func in top_level_md: + md = Metadata.from_bytes(metadata) + if md.signed.type == "snapshot": + # timestamp hashes and length intervene when testing snapshot + self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None + self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None # metadata is not json with self.assertRaises(exceptions.RepositoryError): update_func(b"") # metadata is invalid - md = Metadata.from_bytes(metadata) md.signed.version += 1 with self.assertRaises(exceptions.RepositoryError): update_func(json.dumps(md.to_dict()).encode()) @@ -119,8 +162,139 @@ def test_update_with_invalid_json(self): update_func(metadata) + def test_update_root_new_root_cannot_be_verified_with_threshold(self): + # new_root data with threshold which cannot be verified. + modified_threshold_data = copy.deepcopy( + json.loads(self.metadata["root"]) + ) + # change something in root so signature doesn't match the content. + modified_threshold_data["signed"]["roles"]["root"]["version"] = 2 + modified_threshold_data = json.dumps(modified_threshold_data).encode() + with self.assertRaises(exceptions.UnsignedMetadataError): + self.trusted_set.update_root(modified_threshold_data) + + def test_update_root_new_root_ver_same_as_trusted_root_ver(self): + with self.assertRaises(exceptions.ReplayedMetadataError): + self.trusted_set.update_root(self.metadata["root"]) + + + def test_root_update_finished_expired(self): + root = Metadata.from_bytes(self.metadata["root"]) + root.signed.expires = datetime(1970, 1, 1) + root.sign(self.keystore["root"]) + modified_root_data = json.dumps(root.to_dict()).encode() + tmp_trusted_set = TrustedMetadataSet(modified_root_data) + # call root_update_finished when trusted root has expired + with self.assertRaises(exceptions.ExpiredMetadataError): + tmp_trusted_set.root_update_finished() + + + def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): + self._root_update_finished_and_update_timestamp() + # new_timestamp.version < trusted_timestamp.version + self.trusted_set.timestamp.signed.version = 2 + with self.assertRaises(exceptions.ReplayedMetadataError): + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + + def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self): + self._root_update_finished_and_update_timestamp() + # new_timestamp.snapshot.version < trusted_timestamp.snapshot.version + self.trusted_set.timestamp.signed.meta["snapshot.json"].version = 2 + with self.assertRaises(exceptions.ReplayedMetadataError): + self.trusted_set.update_timestamp(self.metadata["timestamp"]) + + def test_update_timestamp_expired(self): + self._root_update_finished_and_update_timestamp() + # new_timestamp has expired + timestamp = Metadata.from_bytes(self.metadata["timestamp"]) + timestamp.signed.expires = datetime(1970, 1, 1) + timestamp.sign(self.keystore["timestamp"]) + new_timestamp_byte_data = json.dumps(timestamp.to_dict()).encode() + with self.assertRaises(exceptions.ExpiredMetadataError): + self.trusted_set.update_timestamp(new_timestamp_byte_data) + + + def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): + self._root_update_finished_and_update_timestamp() + # remove keyids representing snapshot signatures from root data + self.trusted_set.root.signed.roles["snapshot"].keyids = [] + with self.assertRaises(exceptions.UnsignedMetadataError): + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + + def test_update_snapshot_version_different_timestamp_snapshot_version(self): + self._root_update_finished_and_update_timestamp() + # new_snapshot.version != trusted timestamp.meta["snapshot"].version + self.trusted_set.timestamp.signed.meta["snapshot.json"].version = 2 + with self.assertRaises(exceptions.BadVersionNumberError): + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + + def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): + self._update_all_besides_targets() + # Test removing a meta_file in new_snapshot compared to the old snapshot + snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + snapshot.signed.meta = {} + snapshot.sign(self.keystore["snapshot"]) + self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None + self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None + modified_snapshot_data = json.dumps(snapshot.to_dict()).encode() + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_snapshot(modified_snapshot_data) + + def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self): + self._update_all_besides_targets() + # snapshot.meta["project1"].version != new_snapshot.meta["project1"].version + for metafile in self.trusted_set.snapshot.signed.meta.values(): + metafile.version += 1 + with self.assertRaises(exceptions.BadVersionNumberError): + self.trusted_set.update_snapshot(self.metadata["snapshot"]) + + def test_update_snapshot_after_succesfull_expired_new_snapshot(self): + self._update_all_besides_targets() + # new_snapshot has expired + snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + snapshot.signed.expires = datetime(1970, 1, 1) + snapshot.sign(self.keystore["snapshot"]) + self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None + self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None + modified_snapshot_data = json.dumps(snapshot.to_dict()).encode() + with self.assertRaises(exceptions.ExpiredMetadataError): + self.trusted_set.update_snapshot(modified_snapshot_data) + + + def test_update_targets_no_meta_in_snapshot(self): + self._update_all_besides_targets() + # remove meta information with information about targets from snapshot + self.trusted_set.snapshot.signed.meta = {} + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_targets(self.metadata["targets"]) + + def test_update_targets_hash_different_than_snapshot_meta_hash(self): + self._update_all_besides_targets() + # observed_hash != stored hash in snapshot meta for targets + for target_path in self.trusted_set.snapshot.signed.meta.keys(): + self.trusted_set.snapshot.signed.meta[target_path].hashes = {"sha256": "b"} + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_targets(self.metadata["targets"]) + + def test_update_targets_version_different_snapshot_meta_version(self): + self._update_all_besides_targets() + # new_delegate.signed.version != meta.version stored in snapshot + for target_path in self.trusted_set.snapshot.signed.meta.keys(): + self.trusted_set.snapshot.signed.meta[target_path].version = 2 + with self.assertRaises(exceptions.BadVersionNumberError): + self.trusted_set.update_targets(self.metadata["targets"]) + + def test_update_targets_expired_new_target(self): + self._update_all_besides_targets() + # new_delegated_target has expired + targets = Metadata.from_bytes(self.metadata["targets"]) + targets.signed.expires = datetime(1970, 1, 1) + targets.sign(self.keystore["targets"]) + modified_targets_data = json.dumps(targets.to_dict()).encode() + with self.assertRaises(exceptions.ExpiredMetadataError): + self.trusted_set.update_targets(modified_targets_data) + # TODO test updating over initial metadata (new keys, newer timestamp, etc) - # TODO test the actual specification checks if __name__ == '__main__': From c7c9d0f3212e72b02302d2e8297ca47b88802db0 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Fri, 16 Jul 2021 16:18:32 +0300 Subject: [PATCH 3/6] TrustedMetadataSet testing: use Metadata.to_bytes Signed-off-by: Martin Vrachev --- tests/test_trusted_metadata_set.py | 33 ++++++++++-------------------- 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index ba022f2865..30ce7d8248 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,10 +1,7 @@ -import copy -import json import logging import os import sys import unittest -from typing import Dict, Any from datetime import datetime from tuf import exceptions @@ -128,7 +125,7 @@ def test_update_with_invalid_json(self): root = Metadata.from_bytes(self.metadata["root"]) root.signed.version += 1 with self.assertRaises(exceptions.RepositoryError): - TrustedMetadataSet(json.dumps(root.to_dict()).encode()) + TrustedMetadataSet(root.to_bytes()) # update_root called with the wrong metadata type with self.assertRaises(exceptions.RepositoryError): @@ -153,7 +150,7 @@ def test_update_with_invalid_json(self): # metadata is invalid md.signed.version += 1 with self.assertRaises(exceptions.RepositoryError): - update_func(json.dumps(md.to_dict()).encode()) + update_func(md.to_bytes()) # metadata is of wrong type with self.assertRaises(exceptions.RepositoryError): @@ -164,14 +161,11 @@ def test_update_with_invalid_json(self): def test_update_root_new_root_cannot_be_verified_with_threshold(self): # new_root data with threshold which cannot be verified. - modified_threshold_data = copy.deepcopy( - json.loads(self.metadata["root"]) - ) - # change something in root so signature doesn't match the content. - modified_threshold_data["signed"]["roles"]["root"]["version"] = 2 - modified_threshold_data = json.dumps(modified_threshold_data).encode() + root = Metadata.from_bytes(self.metadata["root"]) + # remove root role keyids representing root signatures + root.signed.roles["root"].keyids = [] with self.assertRaises(exceptions.UnsignedMetadataError): - self.trusted_set.update_root(modified_threshold_data) + self.trusted_set.update_root(root.to_bytes()) def test_update_root_new_root_ver_same_as_trusted_root_ver(self): with self.assertRaises(exceptions.ReplayedMetadataError): @@ -182,8 +176,7 @@ def test_root_update_finished_expired(self): root = Metadata.from_bytes(self.metadata["root"]) root.signed.expires = datetime(1970, 1, 1) root.sign(self.keystore["root"]) - modified_root_data = json.dumps(root.to_dict()).encode() - tmp_trusted_set = TrustedMetadataSet(modified_root_data) + tmp_trusted_set = TrustedMetadataSet(root.to_bytes()) # call root_update_finished when trusted root has expired with self.assertRaises(exceptions.ExpiredMetadataError): tmp_trusted_set.root_update_finished() @@ -209,9 +202,8 @@ def test_update_timestamp_expired(self): timestamp = Metadata.from_bytes(self.metadata["timestamp"]) timestamp.signed.expires = datetime(1970, 1, 1) timestamp.sign(self.keystore["timestamp"]) - new_timestamp_byte_data = json.dumps(timestamp.to_dict()).encode() with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_timestamp(new_timestamp_byte_data) + self.trusted_set.update_timestamp(timestamp.to_bytes()) def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): @@ -236,9 +228,8 @@ def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): snapshot.sign(self.keystore["snapshot"]) self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None - modified_snapshot_data = json.dumps(snapshot.to_dict()).encode() with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_snapshot(modified_snapshot_data) + self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self): self._update_all_besides_targets() @@ -256,9 +247,8 @@ def test_update_snapshot_after_succesfull_expired_new_snapshot(self): snapshot.sign(self.keystore["snapshot"]) self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None - modified_snapshot_data = json.dumps(snapshot.to_dict()).encode() with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(modified_snapshot_data) + self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_targets_no_meta_in_snapshot(self): @@ -290,9 +280,8 @@ def test_update_targets_expired_new_target(self): targets = Metadata.from_bytes(self.metadata["targets"]) targets.signed.expires = datetime(1970, 1, 1) targets.sign(self.keystore["targets"]) - modified_targets_data = json.dumps(targets.to_dict()).encode() with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_targets(modified_targets_data) + self.trusted_set.update_targets(targets.to_bytes()) # TODO test updating over initial metadata (new keys, newer timestamp, etc) From 71838562dc63068e0cb70a3c6d2383bd1593fb0b Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Mon, 19 Jul 2021 14:49:03 +0300 Subject: [PATCH 4/6] ngcl. tests: remove modification on internal state Signed-off-by: Martin Vrachev --- tests/test_trusted_metadata_set.py | 161 ++++++++++++++++++++++------- 1 file changed, 121 insertions(+), 40 deletions(-) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 30ce7d8248..219b7c724d 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,14 +1,14 @@ import logging +from typing import Optional, Union import os import sys import unittest from datetime import datetime from tuf import exceptions -from tuf.api.metadata import Metadata -from tuf.ngclient._internal.trusted_metadata_set import( - TrustedMetadataSet -) +from tuf.api.metadata import Metadata, MetaFile +from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet + from securesystemslib.signer import SSlibSigner from securesystemslib.interface import( import_ed25519_privatekey_from_file, @@ -48,14 +48,77 @@ def setUpClass(cls): def setUp(self) -> None: self.trusted_set = TrustedMetadataSet(self.metadata["root"]) - def _root_update_finished_and_update_timestamp(self): - self.trusted_set.root_update_finished() - self.trusted_set.update_timestamp(self.metadata["timestamp"]) + def _root_updated_and_update_timestamp( + self, timestamp_bytes: Optional[bytes] = None + ): + """Finsh root update and update timestamp with passed timestamp_bytes. + + Args: + timestamp_bytes: + Bytes used when calling trusted_set.update_timestamp(). + Default self.metadata["timestamp"]. - def _update_all_besides_targets(self): + """ + timestamp_bytes = timestamp_bytes or self.metadata["timestamp"] self.trusted_set.root_update_finished() - self.trusted_set.update_timestamp(self.metadata["timestamp"]) - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_timestamp(timestamp_bytes) + + + def _update_all_besides_targets( + self, + timestamp_bytes: Optional[bytes] = None, + snapshot_bytes: Optional[bytes] = None, + ): + """Update all metadata roles besides targets. + + Args: + timestamp_bytes: + Bytes used when calling trusted_set.update_timestamp(). + Default self.metadata["timestamp"]. + snapshot_bytes: + Bytes used when calling trusted_set.update_snapshot(). + Default self.metadata["snapshot"]. + + """ + self._root_updated_and_update_timestamp(timestamp_bytes) + snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] + self.trusted_set.update_snapshot(snapshot_bytes) + + def _modify_timestamp_meta(self, version: Optional[int] = 1): + """Remove hashes and length from timestamp.meta["snapshot.json"]. + Create a timestamp.meta["snapshot.json"] containing only version. + + Args: + version: + Version used when instantiating MetaFile for timestamp.meta. + + """ + timestamp = Metadata.from_bytes(self.metadata["timestamp"]) + timestamp.signed.meta["snapshot.json"] = MetaFile(version) + timestamp.sign(self.keystore["timestamp"]) + return timestamp.to_bytes() + + def _modify_snapshot_meta( + self, version: Union[int, None] = 1, length: Optional[int]= None + ): + """Modify hashes and length from snapshot_meta.meta["snapshot.json"]. + If version and length is None, then snapshot meta will be an empty dictionary. + + Args: + version: + Version used when instantiating MetaFile for snapshot.meta. + length: + Length used when instantiating MetaFile for snapshot.meta. + + """ + snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + if version is None and length is None: + snapshot.signed.meta = {} + else: + for metafile_path in snapshot.signed.meta: + snapshot.signed.meta[metafile_path] = MetaFile(version, length) + snapshot.sign(self.keystore["snapshot"]) + return snapshot.to_bytes() def test_update(self): self.trusted_set.root_update_finished() @@ -183,21 +246,23 @@ def test_root_update_finished_expired(self): def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): - self._root_update_finished_and_update_timestamp() # new_timestamp.version < trusted_timestamp.version - self.trusted_set.timestamp.signed.version = 2 + timestamp = Metadata.from_bytes(self.metadata["timestamp"]) + timestamp.signed.version = 3 + timestamp.sign(self.keystore["timestamp"]) + self._root_updated_and_update_timestamp(timestamp.to_bytes()) with self.assertRaises(exceptions.ReplayedMetadataError): self.trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self): - self._root_update_finished_and_update_timestamp() + modified_timestamp = self._modify_timestamp_meta(version=3) + self._root_updated_and_update_timestamp(modified_timestamp) # new_timestamp.snapshot.version < trusted_timestamp.snapshot.version - self.trusted_set.timestamp.signed.meta["snapshot.json"].version = 2 with self.assertRaises(exceptions.ReplayedMetadataError): self.trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_expired(self): - self._root_update_finished_and_update_timestamp() + self.trusted_set.root_update_finished() # new_timestamp has expired timestamp = Metadata.from_bytes(self.metadata["timestamp"]) timestamp.signed.expires = datetime(1970, 1, 1) @@ -207,70 +272,86 @@ def test_update_timestamp_expired(self): def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): - self._root_update_finished_and_update_timestamp() - # remove keyids representing snapshot signatures from root data - self.trusted_set.root.signed.roles["snapshot"].keyids = [] + modified_timestamp = self._modify_timestamp_meta() + self._root_updated_and_update_timestamp(modified_timestamp) + snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_version_different_timestamp_snapshot_version(self): - self._root_update_finished_and_update_timestamp() + modified_timestamp = self._modify_timestamp_meta(version=2) + self._root_updated_and_update_timestamp(modified_timestamp) # new_snapshot.version != trusted timestamp.meta["snapshot"].version - self.trusted_set.timestamp.signed.meta["snapshot.json"].version = 2 + snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + snapshot.signed.version = 3 + snapshot.sign(self.keystore["snapshot"]) with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(self.metadata["snapshot"]) + self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): - self._update_all_besides_targets() + modified_timestamp = self._modify_timestamp_meta() + self._update_all_besides_targets(modified_timestamp) # Test removing a meta_file in new_snapshot compared to the old snapshot snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signed.meta = {} snapshot.sign(self.keystore["snapshot"]) - self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None - self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self): - self._update_all_besides_targets() + modified_timestamp = self._modify_timestamp_meta() + self._root_updated_and_update_timestamp(modified_timestamp) # snapshot.meta["project1"].version != new_snapshot.meta["project1"].version - for metafile in self.trusted_set.snapshot.signed.meta.values(): - metafile.version += 1 + snapshot = Metadata.from_bytes(self.metadata["snapshot"]) + for metafile_path in snapshot.signed.meta.keys(): + snapshot.signed.meta[metafile_path].version += 1 + snapshot.sign(self.keystore["snapshot"]) + self.trusted_set.update_snapshot(snapshot.to_bytes()) with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_snapshot(self.metadata["snapshot"]) - def test_update_snapshot_after_succesfull_expired_new_snapshot(self): - self._update_all_besides_targets() + def test_update_snapshot_expired_new_snapshot(self): + modified_timestamp = self._modify_timestamp_meta() + self._root_updated_and_update_timestamp(modified_timestamp) # new_snapshot has expired snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signed.expires = datetime(1970, 1, 1) snapshot.sign(self.keystore["snapshot"]) - self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None - self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None with self.assertRaises(exceptions.ExpiredMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_targets_no_meta_in_snapshot(self): - self._update_all_besides_targets() + modified_timestamp = self._modify_timestamp_meta() + modified_snapshot = self._modify_snapshot_meta(version=None) + self._update_all_besides_targets( + timestamp_bytes = modified_timestamp, + snapshot_bytes= modified_snapshot + ) # remove meta information with information about targets from snapshot - self.trusted_set.snapshot.signed.meta = {} with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_hash_different_than_snapshot_meta_hash(self): - self._update_all_besides_targets() + modified_timestamp = self._modify_timestamp_meta() + modified_snapshot = self._modify_snapshot_meta(version=1, length=1) + self._update_all_besides_targets( + timestamp_bytes = modified_timestamp, + snapshot_bytes= modified_snapshot + ) # observed_hash != stored hash in snapshot meta for targets - for target_path in self.trusted_set.snapshot.signed.meta.keys(): - self.trusted_set.snapshot.signed.meta[target_path].hashes = {"sha256": "b"} with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_version_different_snapshot_meta_version(self): - self._update_all_besides_targets() + modified_timestamp = self._modify_timestamp_meta() + modified_snapshot = self._modify_snapshot_meta(version=2) + self._update_all_besides_targets( + timestamp_bytes = modified_timestamp, + snapshot_bytes= modified_snapshot + ) # new_delegate.signed.version != meta.version stored in snapshot - for target_path in self.trusted_set.snapshot.signed.meta.keys(): - self.trusted_set.snapshot.signed.meta[target_path].version = 2 with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_targets(self.metadata["targets"]) From 11531caf42fefef67d8a7d011235552c6a3de71b Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Wed, 21 Jul 2021 17:34:23 +0300 Subject: [PATCH 5/6] Embed modification function helpers Instead of using general abstract modification functions embed smaller modification functions inside each test where it's needed and create modify_metadata function that does all of the common stuff like: - instantiating a metadata object - calling the modification function - signing the modified object - serializing back to bytes. Signed-off-by: Martin Vrachev --- tests/test_trusted_metadata_set.py | 146 ++++++++++++++++------------- 1 file changed, 80 insertions(+), 66 deletions(-) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 219b7c724d..a75f09990a 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -1,12 +1,12 @@ import logging -from typing import Optional, Union +from typing import Optional, Union, Callable import os import sys import unittest from datetime import datetime from tuf import exceptions -from tuf.api.metadata import Metadata, MetaFile +from tuf.api.metadata import Metadata, Signed, Timestamp, Snapshot, MetaFile from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet from securesystemslib.signer import SSlibSigner @@ -84,41 +84,15 @@ def _update_all_besides_targets( snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] self.trusted_set.update_snapshot(snapshot_bytes) - def _modify_timestamp_meta(self, version: Optional[int] = 1): - """Remove hashes and length from timestamp.meta["snapshot.json"]. - Create a timestamp.meta["snapshot.json"] containing only version. - - Args: - version: - Version used when instantiating MetaFile for timestamp.meta. - - """ - timestamp = Metadata.from_bytes(self.metadata["timestamp"]) - timestamp.signed.meta["snapshot.json"] = MetaFile(version) - timestamp.sign(self.keystore["timestamp"]) - return timestamp.to_bytes() - - def _modify_snapshot_meta( - self, version: Union[int, None] = 1, length: Optional[int]= None - ): - """Modify hashes and length from snapshot_meta.meta["snapshot.json"]. - If version and length is None, then snapshot meta will be an empty dictionary. - - Args: - version: - Version used when instantiating MetaFile for snapshot.meta. - length: - Length used when instantiating MetaFile for snapshot.meta. - - """ - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) - if version is None and length is None: - snapshot.signed.meta = {} - else: - for metafile_path in snapshot.signed.meta: - snapshot.signed.meta[metafile_path] = MetaFile(version, length) - snapshot.sign(self.keystore["snapshot"]) - return snapshot.to_bytes() + def modify_metadata( + self, rolename: str, modification_func: Callable[["Signed"], None] + ): + """Instantiate metadata from rolename type, call modification_func and + sign it again with self.keystore[rolename] signer.""" + metadata = Metadata.from_bytes(self.metadata[rolename]) + modification_func(metadata.signed) + metadata.sign(self.keystore[rolename]) + return metadata.to_bytes() def test_update(self): self.trusted_set.root_update_finished() @@ -255,7 +229,10 @@ def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): self.trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self): - modified_timestamp = self._modify_timestamp_meta(version=3) + def version_modifier(timestamp: Timestamp): + timestamp.version = 3 + + modified_timestamp = self.modify_metadata("timestamp", version_modifier) self._root_updated_and_update_timestamp(modified_timestamp) # new_timestamp.snapshot.version < trusted_timestamp.snapshot.version with self.assertRaises(exceptions.ReplayedMetadataError): @@ -272,16 +249,27 @@ def test_update_timestamp_expired(self): def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): - modified_timestamp = self._modify_timestamp_meta() - self._root_updated_and_update_timestamp(modified_timestamp) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + self._root_updated_and_update_timestamp(timestamp) snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_version_different_timestamp_snapshot_version(self): - modified_timestamp = self._modify_timestamp_meta(version=2) - self._root_updated_and_update_timestamp(modified_timestamp) + def hashes_length_version_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + timestamp.meta["snapshot.json"].version = 2 + + timestamp = self.modify_metadata( + "timestamp", hashes_length_version_modifier + ) + self._root_updated_and_update_timestamp(timestamp) # new_snapshot.version != trusted timestamp.meta["snapshot"].version snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signed.version = 3 @@ -290,8 +278,12 @@ def test_update_snapshot_version_different_timestamp_snapshot_version(self): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): - modified_timestamp = self._modify_timestamp_meta() - self._update_all_besides_targets(modified_timestamp) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + self._update_all_besides_targets(timestamp) # Test removing a meta_file in new_snapshot compared to the old snapshot snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signed.meta = {} @@ -300,8 +292,12 @@ def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self): - modified_timestamp = self._modify_timestamp_meta() - self._root_updated_and_update_timestamp(modified_timestamp) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + self._root_updated_and_update_timestamp(timestamp) # snapshot.meta["project1"].version != new_snapshot.meta["project1"].version snapshot = Metadata.from_bytes(self.metadata["snapshot"]) for metafile_path in snapshot.signed.meta.keys(): @@ -312,8 +308,12 @@ def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_diffe self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_expired_new_snapshot(self): - modified_timestamp = self._modify_timestamp_meta() - self._root_updated_and_update_timestamp(modified_timestamp) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + self._root_updated_and_update_timestamp(timestamp) # new_snapshot has expired snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signed.expires = datetime(1970, 1, 1) @@ -323,34 +323,48 @@ def test_update_snapshot_expired_new_snapshot(self): def test_update_targets_no_meta_in_snapshot(self): - modified_timestamp = self._modify_timestamp_meta() - modified_snapshot = self._modify_snapshot_meta(version=None) - self._update_all_besides_targets( - timestamp_bytes = modified_timestamp, - snapshot_bytes= modified_snapshot - ) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + def no_meta_modifier(snapshot: Snapshot): + snapshot.meta = {} + + snapshot = self.modify_metadata("snapshot", no_meta_modifier) + self._update_all_besides_targets(timestamp, snapshot) # remove meta information with information about targets from snapshot with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_hash_different_than_snapshot_meta_hash(self): - modified_timestamp = self._modify_timestamp_meta() - modified_snapshot = self._modify_snapshot_meta(version=1, length=1) - self._update_all_besides_targets( - timestamp_bytes = modified_timestamp, - snapshot_bytes= modified_snapshot - ) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + def meta_length_modifier(snapshot: Snapshot): + for metafile_path in snapshot.meta: + snapshot.meta[metafile_path] = MetaFile(version=1, length=1) + + snapshot = self.modify_metadata("snapshot", meta_length_modifier) + self._update_all_besides_targets(timestamp, snapshot) # observed_hash != stored hash in snapshot meta for targets with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_version_different_snapshot_meta_version(self): - modified_timestamp = self._modify_timestamp_meta() - modified_snapshot = self._modify_snapshot_meta(version=2) - self._update_all_besides_targets( - timestamp_bytes = modified_timestamp, - snapshot_bytes= modified_snapshot - ) + def hashes_length_modifier(timestamp: Timestamp): + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + timestamp = self.modify_metadata("timestamp", hashes_length_modifier) + def meta_modifier(snapshot: Snapshot): + for metafile_path in snapshot.meta: + snapshot.meta[metafile_path] = MetaFile(version=2) + + snapshot = self.modify_metadata("snapshot", meta_modifier) + self._update_all_besides_targets(timestamp, snapshot) # new_delegate.signed.version != meta.version stored in snapshot with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_targets(self.metadata["targets"]) From 617e87eb26df19de5b45416f179d5e525aeb8069 Mon Sep 17 00:00:00 2001 From: Martin Vrachev Date: Mon, 16 Aug 2021 18:13:01 +0300 Subject: [PATCH 6/6] Annotations and use extensively modify_metadata Signed-off-by: Martin Vrachev --- tests/test_trusted_metadata_set.py | 188 ++++++++++++++--------------- 1 file changed, 88 insertions(+), 100 deletions(-) diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index a75f09990a..aa2d39e46d 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -6,7 +6,15 @@ from datetime import datetime from tuf import exceptions -from tuf.api.metadata import Metadata, Signed, Timestamp, Snapshot, MetaFile +from tuf.api.metadata import ( + Metadata, + Signed, + Root, + Timestamp, + Snapshot, + MetaFile, + Targets +) from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet from securesystemslib.signer import SSlibSigner @@ -21,6 +29,22 @@ class TestTrustedMetadataSet(unittest.TestCase): + def modify_metadata( + self, rolename: str, modification_func: Callable[["Signed"], None] + ) -> bytes: + """Instantiate metadata from rolename type, call modification_func and + sign it again with self.keystore[rolename] signer. + + Attributes: + rolename: A denoting the name of the metadata which will be modified. + modification_func: Function that will be called to modify the signed + portion of metadata bytes. + """ + metadata = Metadata.from_bytes(self.metadata[rolename]) + modification_func(metadata.signed) + metadata.sign(self.keystore[rolename]) + return metadata.to_bytes() + @classmethod def setUpClass(cls): cls.repo_dir = os.path.join( @@ -45,12 +69,20 @@ def setUpClass(cls): ) cls.keystore[role] = SSlibSigner(key_dict) + def hashes_length_modifier(timestamp: Timestamp) -> None: + timestamp.meta["snapshot.json"].hashes = None + timestamp.meta["snapshot.json"].length = None + + cls.metadata["timestamp"] = cls.modify_metadata( + cls, "timestamp", hashes_length_modifier + ) + def setUp(self) -> None: self.trusted_set = TrustedMetadataSet(self.metadata["root"]) def _root_updated_and_update_timestamp( self, timestamp_bytes: Optional[bytes] = None - ): + ) -> None: """Finsh root update and update timestamp with passed timestamp_bytes. Args: @@ -84,15 +116,6 @@ def _update_all_besides_targets( snapshot_bytes = snapshot_bytes or self.metadata["snapshot"] self.trusted_set.update_snapshot(snapshot_bytes) - def modify_metadata( - self, rolename: str, modification_func: Callable[["Signed"], None] - ): - """Instantiate metadata from rolename type, call modification_func and - sign it again with self.keystore[rolename] signer.""" - metadata = Metadata.from_bytes(self.metadata[rolename]) - modification_func(metadata.signed) - metadata.sign(self.keystore[rolename]) - return metadata.to_bytes() def test_update(self): self.trusted_set.root_update_finished() @@ -177,10 +200,6 @@ def test_update_with_invalid_json(self): ] for metadata, update_func in top_level_md: md = Metadata.from_bytes(metadata) - if md.signed.type == "snapshot": - # timestamp hashes and length intervene when testing snapshot - self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None - self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None # metadata is not json with self.assertRaises(exceptions.RepositoryError): update_func(b"") @@ -210,10 +229,11 @@ def test_update_root_new_root_ver_same_as_trusted_root_ver(self): def test_root_update_finished_expired(self): - root = Metadata.from_bytes(self.metadata["root"]) - root.signed.expires = datetime(1970, 1, 1) - root.sign(self.keystore["root"]) - tmp_trusted_set = TrustedMetadataSet(root.to_bytes()) + def root_expired_modifier(root: Root) -> None: + root.expires = datetime(1970, 1, 1) + + root = self.modify_metadata("root", root_expired_modifier) + tmp_trusted_set = TrustedMetadataSet(root) # call root_update_finished when trusted root has expired with self.assertRaises(exceptions.ExpiredMetadataError): tmp_trusted_set.root_update_finished() @@ -221,15 +241,16 @@ def test_root_update_finished_expired(self): def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self): # new_timestamp.version < trusted_timestamp.version - timestamp = Metadata.from_bytes(self.metadata["timestamp"]) - timestamp.signed.version = 3 - timestamp.sign(self.keystore["timestamp"]) - self._root_updated_and_update_timestamp(timestamp.to_bytes()) + def version_modifier(timestamp: Timestamp) -> None: + timestamp.version = 3 + + timestamp = self.modify_metadata("timestamp", version_modifier) + self._root_updated_and_update_timestamp(timestamp) with self.assertRaises(exceptions.ReplayedMetadataError): self.trusted_set.update_timestamp(self.metadata["timestamp"]) def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self): - def version_modifier(timestamp: Timestamp): + def version_modifier(timestamp: Timestamp) -> None: timestamp.version = 3 modified_timestamp = self.modify_metadata("timestamp", version_modifier) @@ -241,130 +262,96 @@ def version_modifier(timestamp: Timestamp): def test_update_timestamp_expired(self): self.trusted_set.root_update_finished() # new_timestamp has expired - timestamp = Metadata.from_bytes(self.metadata["timestamp"]) - timestamp.signed.expires = datetime(1970, 1, 1) - timestamp.sign(self.keystore["timestamp"]) + def timestamp_expired_modifier(timestamp: Timestamp) -> None: + timestamp.expires = datetime(1970, 1, 1) + + timestamp = self.modify_metadata("timestamp", timestamp_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_timestamp(timestamp.to_bytes()) + self.trusted_set.update_timestamp(timestamp) def test_update_snapshot_cannot_verify_snapshot_with_threshold(self): - def hashes_length_modifier(timestamp: Timestamp): - timestamp.meta["snapshot.json"].hashes = None - timestamp.meta["snapshot.json"].length = None - - timestamp = self.modify_metadata("timestamp", hashes_length_modifier) - self._root_updated_and_update_timestamp(timestamp) + self._root_updated_and_update_timestamp(self.metadata["timestamp"]) snapshot = Metadata.from_bytes(self.metadata["snapshot"]) snapshot.signatures.clear() with self.assertRaises(exceptions.UnsignedMetadataError): self.trusted_set.update_snapshot(snapshot.to_bytes()) def test_update_snapshot_version_different_timestamp_snapshot_version(self): - def hashes_length_version_modifier(timestamp: Timestamp): - timestamp.meta["snapshot.json"].hashes = None - timestamp.meta["snapshot.json"].length = None + def timestamp_version_modifier(timestamp: Timestamp) -> None: timestamp.meta["snapshot.json"].version = 2 - timestamp = self.modify_metadata( - "timestamp", hashes_length_version_modifier - ) + timestamp = self.modify_metadata("timestamp", timestamp_version_modifier) self._root_updated_and_update_timestamp(timestamp) # new_snapshot.version != trusted timestamp.meta["snapshot"].version - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) - snapshot.signed.version = 3 - snapshot.sign(self.keystore["snapshot"]) + def snapshot_version_modifier(snapshot: Snapshot) -> None: + snapshot.version = 3 + + snapshot = self.modify_metadata("snapshot", snapshot_version_modifier) with self.assertRaises(exceptions.BadVersionNumberError): - self.trusted_set.update_snapshot(snapshot.to_bytes()) + self.trusted_set.update_snapshot(snapshot) def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self): - def hashes_length_modifier(timestamp: Timestamp): - timestamp.meta["snapshot.json"].hashes = None - timestamp.meta["snapshot.json"].length = None - - timestamp = self.modify_metadata("timestamp", hashes_length_modifier) - self._update_all_besides_targets(timestamp) + self._update_all_besides_targets(self.metadata["timestamp"]) # Test removing a meta_file in new_snapshot compared to the old snapshot - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) - snapshot.signed.meta = {} - snapshot.sign(self.keystore["snapshot"]) + def no_meta_modifier(snapshot: Snapshot) -> None: + snapshot.meta = {} + + snapshot = self.modify_metadata("snapshot", no_meta_modifier) with self.assertRaises(exceptions.RepositoryError): - self.trusted_set.update_snapshot(snapshot.to_bytes()) + self.trusted_set.update_snapshot(snapshot) def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self): - def hashes_length_modifier(timestamp: Timestamp): - timestamp.meta["snapshot.json"].hashes = None - timestamp.meta["snapshot.json"].length = None - - timestamp = self.modify_metadata("timestamp", hashes_length_modifier) - self._root_updated_and_update_timestamp(timestamp) + self._root_updated_and_update_timestamp(self.metadata["timestamp"]) # snapshot.meta["project1"].version != new_snapshot.meta["project1"].version - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) - for metafile_path in snapshot.signed.meta.keys(): - snapshot.signed.meta[metafile_path].version += 1 - snapshot.sign(self.keystore["snapshot"]) - self.trusted_set.update_snapshot(snapshot.to_bytes()) + def version_meta_modifier(snapshot: Snapshot) -> None: + for metafile_path in snapshot.meta.keys(): + snapshot.meta[metafile_path].version += 1 + + snapshot = self.modify_metadata("snapshot", version_meta_modifier) + self.trusted_set.update_snapshot(snapshot) with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_snapshot(self.metadata["snapshot"]) def test_update_snapshot_expired_new_snapshot(self): - def hashes_length_modifier(timestamp: Timestamp): - timestamp.meta["snapshot.json"].hashes = None - timestamp.meta["snapshot.json"].length = None - - timestamp = self.modify_metadata("timestamp", hashes_length_modifier) - self._root_updated_and_update_timestamp(timestamp) + self._root_updated_and_update_timestamp(self.metadata["timestamp"]) # new_snapshot has expired - snapshot = Metadata.from_bytes(self.metadata["snapshot"]) - snapshot.signed.expires = datetime(1970, 1, 1) - snapshot.sign(self.keystore["snapshot"]) + def snapshot_expired_modifier(snapshot: Snapshot) -> None: + snapshot.expires = datetime(1970, 1, 1) + + snapshot = self.modify_metadata("snapshot", snapshot_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_snapshot(snapshot.to_bytes()) + self.trusted_set.update_snapshot(snapshot) def test_update_targets_no_meta_in_snapshot(self): - def hashes_length_modifier(timestamp: Timestamp): - timestamp.meta["snapshot.json"].hashes = None - timestamp.meta["snapshot.json"].length = None - - timestamp = self.modify_metadata("timestamp", hashes_length_modifier) - def no_meta_modifier(snapshot: Snapshot): + def no_meta_modifier(snapshot: Snapshot) -> None: snapshot.meta = {} snapshot = self.modify_metadata("snapshot", no_meta_modifier) - self._update_all_besides_targets(timestamp, snapshot) + self._update_all_besides_targets(self.metadata["timestamp"], snapshot) # remove meta information with information about targets from snapshot with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_hash_different_than_snapshot_meta_hash(self): - def hashes_length_modifier(timestamp: Timestamp): - timestamp.meta["snapshot.json"].hashes = None - timestamp.meta["snapshot.json"].length = None - - timestamp = self.modify_metadata("timestamp", hashes_length_modifier) - def meta_length_modifier(snapshot: Snapshot): + def meta_length_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=1, length=1) snapshot = self.modify_metadata("snapshot", meta_length_modifier) - self._update_all_besides_targets(timestamp, snapshot) + self._update_all_besides_targets(self.metadata["timestamp"], snapshot) # observed_hash != stored hash in snapshot meta for targets with self.assertRaises(exceptions.RepositoryError): self.trusted_set.update_targets(self.metadata["targets"]) def test_update_targets_version_different_snapshot_meta_version(self): - def hashes_length_modifier(timestamp: Timestamp): - timestamp.meta["snapshot.json"].hashes = None - timestamp.meta["snapshot.json"].length = None - - timestamp = self.modify_metadata("timestamp", hashes_length_modifier) - def meta_modifier(snapshot: Snapshot): + def meta_modifier(snapshot: Snapshot) -> None: for metafile_path in snapshot.meta: snapshot.meta[metafile_path] = MetaFile(version=2) snapshot = self.modify_metadata("snapshot", meta_modifier) - self._update_all_besides_targets(timestamp, snapshot) + self._update_all_besides_targets(self.metadata["timestamp"], snapshot) # new_delegate.signed.version != meta.version stored in snapshot with self.assertRaises(exceptions.BadVersionNumberError): self.trusted_set.update_targets(self.metadata["targets"]) @@ -372,11 +359,12 @@ def meta_modifier(snapshot: Snapshot): def test_update_targets_expired_new_target(self): self._update_all_besides_targets() # new_delegated_target has expired - targets = Metadata.from_bytes(self.metadata["targets"]) - targets.signed.expires = datetime(1970, 1, 1) - targets.sign(self.keystore["targets"]) + def target_expired_modifier(target: Targets) -> None: + target.expires = datetime(1970, 1, 1) + + targets = self.modify_metadata("targets", target_expired_modifier) with self.assertRaises(exceptions.ExpiredMetadataError): - self.trusted_set.update_targets(targets.to_bytes()) + self.trusted_set.update_targets(targets) # TODO test updating over initial metadata (new keys, newer timestamp, etc)