1+ import copy
12import json
23import logging
34import os
4- import shutil
55import sys
6- import tempfile
76import unittest
7+ from typing import Dict , Any
8+ from datetime import datetime
89
910from tuf import exceptions
10- from tuf .api .metadata import Metadata
11- from tuf .ngclient ._internal .trusted_metadata_set import TrustedMetadataSet
11+ from tuf .api .metadata import Metadata , MetaFile
12+ from tuf .ngclient ._internal .trusted_metadata_set import (
13+ TrustedMetadataSet ,
14+ verify_with_threshold
15+ )
16+ from securesystemslib import hash as sslib_hash
17+ from securesystemslib .signer import SSlibSigner
18+ from securesystemslib .interface import import_ed25519_privatekey_from_file
1219
1320from tests import utils
1421
@@ -26,6 +33,13 @@ def setUpClass(cls):
2633 with open (os .path .join (cls .repo_dir , f"{ md } .json" ), "rb" ) as f :
2734 cls .metadata [md ] = f .read ()
2835
36+ keystore_dir = os .path .join (os .getcwd (), 'repository_data' , 'keystore' )
37+ cls .keystore = {}
38+ for role in ['delegation' , 'snapshot' , 'targets' , 'timestamp' ]:
39+ cls .keystore [role ] = import_ed25519_privatekey_from_file (
40+ os .path .join (keystore_dir , role + '_key' ),
41+ password = "password"
42+ )
2943
3044 def test_update (self ):
3145 trusted_set = TrustedMetadataSet (self .metadata ["root" ])
@@ -41,6 +55,9 @@ def test_update(self):
4155 self .metadata ["role2" ], "role2" , "role1"
4256 )
4357
58+ # the 4 top level metadata objects + 2 additional delegated targets
59+ self .assertTrue (len (trusted_set ), 6 )
60+
4461 def test_out_of_order_ops (self ):
4562 trusted_set = TrustedMetadataSet (self .metadata ["root" ])
4663
@@ -118,9 +135,247 @@ def test_update_with_invalid_json(self):
118135
119136 update_func (metadata )
120137
138+ def test_verify_with_threshold (self ):
139+ # Call verify_with_threshold with non root or targets delegator.
140+ delegated_role = Metadata .from_bytes (self .metadata ["role1" ])
141+ timestamp = Metadata .from_bytes (self .metadata ["timestamp" ])
142+ with self .assertRaises (ValueError ):
143+ verify_with_threshold (timestamp , "role1" , delegated_role )
144+
145+ # Call verify_with_threshold with non existent role_name.
146+ targets = Metadata .from_bytes (self .metadata ["targets" ])
147+ with self .assertRaises (ValueError ):
148+ verify_with_threshold (targets , "foo" , delegated_role )
149+
150+ def test_invalid_update_root (self ):
151+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
152+ # new_root data with invalid snapshot type
153+ invalid_type_data = json .loads (self .metadata ["root" ])
154+ invalid_type_data ["signed" ]["_type" ] = "snapshot"
155+ invalid_type_data ["signed" ]["meta" ] = {"file1.txt" : {"version" : 1 }}
156+ invalid_type_data = json .dumps (invalid_type_data ).encode ()
157+ # RepositoryError is thrown during new_root deserialization.
158+ # It's not thrown when checking new_root.signed.type != "root"
159+ with self .assertRaises (exceptions .RepositoryError ):
160+ trusted_set .update_root (invalid_type_data )
161+
162+ # new_root data with threshold which cannot be verified.
163+ modified_threshold_data = copy .deepcopy (
164+ json .loads (self .metadata ["root" ])
165+ )
166+ modified_threshold_data ["signed" ]["roles" ]["root" ]["threshold" ] = 2
167+ modified_threshold_data = json .dumps (modified_threshold_data ).encode ()
168+ with self .assertRaises (exceptions .UnsignedMetadataError ):
169+ trusted_set .update_root (modified_threshold_data )
170+
171+ # new_root.signed.version has the same version as old root
172+ with self .assertRaises (exceptions .ReplayedMetadataError ):
173+ trusted_set .update_root (self .metadata ["root" ])
174+
175+ # if _root_update_finished, then fail when calling update_root
176+ trusted_set .root_update_finished ()
177+ with self .assertRaises (RuntimeError ):
178+ trusted_set .update_root (self .metadata ["root" ])
179+
180+ def test_root_update_finished_expired (self ):
181+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
182+ # call root_update_finished when trusted root has expired
183+ expired_datetime = datetime .strptime (
184+ "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
185+ )
186+ trusted_set .root .signed .expires = expired_datetime
187+ with self .assertRaises (exceptions .ExpiredMetadataError ):
188+ trusted_set .root_update_finished ()
189+
190+ def _sign_modified_obj (
191+ self ,
192+ role :str ,
193+ metadata_obj : Metadata
194+ ) -> Dict [str , Any ]:
195+ key_dict = self .keystore [role ]
196+ sslib_signer = SSlibSigner (key_dict )
197+ signature = metadata_obj .sign (sslib_signer )
198+ return signature .to_dict ()
199+
200+ def test_update_timestamp (self ):
201+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
202+ trusted_set .root_update_finished ()
203+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
204+ # new_timestamp.version < trusted_timestamp.version
205+ trusted_set .timestamp .signed .version = 2
206+ with self .assertRaises (exceptions .ReplayedMetadataError ):
207+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
208+ trusted_set .timestamp .signed .version = 1
209+
210+ # new_timestamp.snapshot.version < trusted_timestamp.snapshot.version
211+ trusted_set .timestamp .signed .meta ["snapshot.json" ].version = 2
212+ with self .assertRaises (exceptions .ReplayedMetadataError ):
213+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
214+ trusted_set .timestamp .signed .meta ["snapshot.json" ].version = 1
215+
216+ # new_timestamp has expired
217+ timestamp = Metadata .from_bytes (self .metadata ["timestamp" ])
218+ timestamp .signed .expires = datetime .strptime (
219+ "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
220+ )
221+ self ._sign_modified_obj ("timestamp" , timestamp )
222+ new_timestamp_byte_data = json .dumps (timestamp .to_dict ()).encode ()
223+ with self .assertRaises (exceptions .ExpiredMetadataError ):
224+ trusted_set .update_timestamp (new_timestamp_byte_data )
225+
226+ def _calculate_modified_hashes (
227+ self , true_hashes ,
228+ data : bytes
229+ ) -> Dict [str , str ]:
230+ modified_hashes = {}
231+ # Calculate hashes on modified data in order to pass hashes verification.
232+ for algo in true_hashes .keys ():
233+ digest_object = sslib_hash .digest (algo )
234+ digest_object .update (data )
235+ observed_hash = digest_object .hexdigest ()
236+ modified_hashes [algo ] = observed_hash
237+ return modified_hashes
238+
239+ def test_update_snapshot (self ):
240+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
241+ trusted_set .root_update_finished ()
242+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
243+ # new_snapshot data with invalid targets type
244+ invalid_type_data = json .loads (self .metadata ["snapshot" ])
245+ invalid_type_data ["signed" ]["_type" ] = "targets"
246+ invalid_type_data ["signed" ]["targets" ] = {}
247+ invalid_type_data = json .dumps (invalid_type_data ).encode ()
248+ timestamp_meta = trusted_set .timestamp .signed .meta ["snapshot.json" ]
249+ true_hashes = timestamp_meta .hashes or {}
250+ modified_hashes = self ._calculate_modified_hashes (
251+ true_hashes , invalid_type_data
252+ )
253+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = modified_hashes
254+
255+ with self .assertRaises (exceptions .RepositoryError ):
256+ trusted_set .update_snapshot (invalid_type_data )
257+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = true_hashes
258+ # cannot update snapshot after targets update completes or targets != None
259+ targets_obj = Metadata .from_bytes (self .metadata ["targets" ])
260+ trusted_set ._trusted_set ["targets" ] = targets_obj
261+ with self .assertRaises (RuntimeError ):
262+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
263+ del trusted_set ._trusted_set ["targets" ]
264+
265+ # Deserialization error - failed to decode the new_snapshot JSON.
266+ timestamp_meta = trusted_set .timestamp .signed .meta ["snapshot.json" ]
267+ true_hashes = timestamp_meta .hashes or {}
268+
269+ modified_hashes = self ._calculate_modified_hashes (
270+ true_hashes , b'{""sig": }'
271+ )
272+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = modified_hashes
273+ with self .assertRaises (exceptions .RepositoryError ):
274+ trusted_set .update_snapshot (b'{""sig": }' )
275+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = true_hashes
276+
277+ # root data with threshold which cannot be verified for new_snapshot
278+ trusted_set .root .signed .roles ["snapshot" ].threshold = 2
279+ with self .assertRaises (exceptions .UnsignedMetadataError ):
280+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
281+ trusted_set .root .signed .roles ["snapshot" ].threshold = 1
282+
283+ # new_snapshot.version != trusted timestamp.meta["snapshot"].version
284+ trusted_set .timestamp .signed .meta ["snapshot.json" ].version = 2
285+ with self .assertRaises (exceptions .BadVersionNumberError ):
286+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
287+ trusted_set .timestamp .signed .meta ["snapshot.json" ].version = 1
288+
289+
290+ def test_update_snapshot_after_succesfull_update (self ):
291+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
292+ trusted_set .root_update_finished ()
293+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
294+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
295+
296+ # Test removing a meta_file in new_snapshot compared to the old snapshot
297+ snapshot_obj = Metadata .from_bytes (self .metadata ["snapshot" ])
298+ snapshot_obj .signed .meta = {}
299+ # prepare timestamp.meta["snapshot"].hashes
300+ self ._sign_modified_obj ("snapshot" , snapshot_obj )
301+ timestamp_meta = trusted_set .timestamp .signed .meta ["snapshot.json" ]
302+ true_hashes = timestamp_meta .hashes or {}
303+ modified_snapshot_data = json .dumps (snapshot_obj .to_dict ()).encode ()
304+ modified_hashes = self ._calculate_modified_hashes (
305+ true_hashes , modified_snapshot_data
306+ )
307+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = modified_hashes
308+
309+ with self .assertRaises (exceptions .RepositoryError ):
310+ trusted_set .update_snapshot (modified_snapshot_data )
311+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = true_hashes
312+
313+ # snapshot.meta["project1"].version != new_snapshot.meta["project1"].version
314+ for meta_file_path in trusted_set .snapshot .signed .meta .keys ():
315+ trusted_set .snapshot .signed .meta [meta_file_path ].version = 2
316+ with self .assertRaises (exceptions .BadVersionNumberError ):
317+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
318+ for meta_file_path in trusted_set .snapshot .signed .meta .keys ():
319+ trusted_set .snapshot .signed .meta [meta_file_path ].version = 1
320+
321+ # new_snapshot has expired
322+ snapshot_obj = Metadata .from_bytes (self .metadata ["snapshot" ])
323+ snapshot_obj .signed .expires = datetime .strptime (
324+ "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
325+ )
326+ self ._sign_modified_obj ("snapshot" , snapshot_obj )
327+ modified_snapshot_data = json .dumps (snapshot_obj .to_dict ()).encode ()
328+ modified_hashes = self ._calculate_modified_hashes (
329+ true_hashes , modified_snapshot_data
330+ )
331+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = modified_hashes
332+ with self .assertRaises (exceptions .ExpiredMetadataError ):
333+ trusted_set .update_snapshot (modified_snapshot_data )
334+ trusted_set .timestamp .signed .meta ["snapshot.json" ].hashes = true_hashes
335+
336+ def test_update_targets (self ):
337+ trusted_set = TrustedMetadataSet (self .metadata ["root" ])
338+ trusted_set .root_update_finished ()
339+ trusted_set .update_timestamp (self .metadata ["timestamp" ])
340+ trusted_set .update_snapshot (self .metadata ["snapshot" ])
341+
342+ # remove meta information with information about targets from snapshot
343+ trusted_set .snapshot .signed .meta = {}
344+ with self .assertRaises (exceptions .RepositoryError ):
345+ trusted_set .update_targets (self .metadata ["targets" ])
346+ snapshot = Metadata .from_bytes (self .metadata ["snapshot" ])
347+ trusted_set .snapshot .signed .meta = snapshot .signed .meta
348+
349+ # observed_hash != stored hash in snapshot meta for targets
350+ true_hashes = {}
351+ for target_path , meta_file in trusted_set .snapshot .signed .meta .items ():
352+ true_hashes [target_path ] = meta_file .hashes
353+ trusted_set .snapshot .signed .meta [target_path ].hashes = {"sha256" : "b" }
354+ with self .assertRaises (exceptions .BadHashError ):
355+ trusted_set .update_targets (self .metadata ["targets" ])
356+ # Return to the original hash values
357+ for target_path in true_hashes .keys ():
358+ trusted_set .snapshot .signed .meta [target_path ].hashes = \
359+ true_hashes [target_path ]
360+
361+ # new_delegate.signed.version != meta.version stored in snapshot
362+ for target_path in trusted_set .snapshot .signed .meta .keys ():
363+ trusted_set .snapshot .signed .meta [target_path ].version = 2
364+ with self .assertRaises (exceptions .BadVersionNumberError ):
365+ trusted_set .update_targets (self .metadata ["targets" ])
366+ trusted_set .snapshot .signed .meta [target_path ].version = 1
367+
368+ # new_delegate has expired
369+ targets_obj = Metadata .from_bytes (self .metadata ["targets" ])
370+ targets_obj .signed .expires = datetime .strptime (
371+ "1970-01-01T00:00:00Z" , "%Y-%m-%dT%H:%M:%SZ"
372+ )
373+ self ._sign_modified_obj ("targets" , targets_obj )
374+ modified_targets_data = json .dumps (targets_obj .to_dict ()).encode ()
375+ with self .assertRaises (exceptions .ExpiredMetadataError ):
376+ trusted_set .update_targets (modified_targets_data )
121377
122378 # TODO test updating over initial metadata (new keys, newer timestamp, etc)
123- # TODO test the actual specification checks
124379
125380
126381if __name__ == '__main__' :
0 commit comments