diff --git a/pycardano/serialization.py b/pycardano/serialization.py index 22f51a92..b2e7abe3 100644 --- a/pycardano/serialization.py +++ b/pycardano/serialization.py @@ -52,6 +52,7 @@ __all__ = [ "default_encoder", "IndefiniteList", + "IndefiniteFrozenList", "Primitive", "CBORBase", "CBORSerializable", @@ -199,7 +200,9 @@ def decode_array(self, subtype: int) -> Sequence[Any]: length = self._decode_length(subtype, allow_indefinite=True) if length is None: - return IndefiniteList(cast(Primitive, self.decode_array(subtype=subtype))) + ret = IndefiniteFrozenList(list(self.decode_array(subtype=subtype))) + ret.freeze() + return ret else: return self.decode_array(subtype=subtype) @@ -323,20 +326,27 @@ def _dfs(value, freeze=False): return _set elif isinstance(value, tuple): return tuple(_dfs(v, freeze) for v in value) - elif isinstance(value, list): + elif isinstance( + value, (IndefiniteFrozenList, FrozenList, IndefiniteList, list) + ): _list = [_dfs(v, freeze) for v in value] - if freeze: - fl = FrozenList(_list) - fl.freeze() - return fl - return _list - elif isinstance(value, IndefiniteList): - _list = [_dfs(v, freeze) for v in value] - if freeze: - fl = IndefiniteFrozenList(_list) - fl.freeze() - return fl - return IndefiniteList(_list) + + already_frozen = isinstance(value, (IndefiniteFrozenList, FrozenList)) + should_freeze = already_frozen or freeze + + if not should_freeze: + return ( + IndefiniteList(_list) + if isinstance(value, IndefiniteList) + else _list + ) + + is_indefinite = isinstance( + value, (IndefiniteFrozenList, IndefiniteList) + ) + fl = IndefiniteFrozenList(_list) if is_indefinite else FrozenList(_list) + fl.freeze() + return fl elif isinstance(value, CBORTag): return CBORTag(value.tag, _dfs(value.value, freeze)) else: @@ -1186,13 +1196,21 @@ def __eq__(self, other: object) -> bool: def __repr__(self) -> str: return f"{self.__class__.__name__}({list(self)})" - def to_shallow_primitive(self) -> Union[CBORTag, Union[List[T], IndefiniteList]]: + def to_shallow_primitive( + self, + ) -> Union[CBORTag, List[T], IndefiniteList, FrozenList, IndefiniteFrozenList]: + fields: Union[IndefiniteFrozenList, FrozenList] + if self._is_indefinite_list: + fields = IndefiniteFrozenList(list(self)) + else: + fields = FrozenList(self) + fields.freeze() if self._use_tag: return CBORTag( 258, - IndefiniteList(list(self)) if self._is_indefinite_list else list(self), + fields, ) - return IndefiniteList(list(self)) if self._is_indefinite_list else list(self) + return fields @classmethod def from_primitive( @@ -1223,6 +1241,9 @@ def from_primitive( def __deepcopy__(self, memo): return self.__class__(deepcopy(list(self), memo), use_tag=self._use_tag) + def __hash__(self): + return hash(self.to_shallow_primitive()) + class NonEmptyOrderedSet(OrderedSet[T]): def __init__( diff --git a/test/pycardano/test_serialization.py b/test/pycardano/test_serialization.py index 65b9d1ef..29f38689 100644 --- a/test/pycardano/test_serialization.py +++ b/test/pycardano/test_serialization.py @@ -21,10 +21,12 @@ import cbor2 import pytest from cbor2 import CBORTag +from frozenlist import FrozenList from pycardano import ( CBORBase, Datum, + IndefiniteFrozenList, MultiAsset, Primitive, RawPlutusData, @@ -618,8 +620,8 @@ def test_ordered_set(): # Test serialization without tag s = OrderedSet([1, 2, 3], use_tag=False) primitive = s.to_primitive() - assert isinstance(primitive, list) - assert primitive == [1, 2, 3] + assert isinstance(primitive, (list, FrozenList)) + assert list(primitive) == [1, 2, 3] # Test serialization with tag s = OrderedSet([1, 2, 3], use_tag=True) @@ -695,8 +697,10 @@ def test_non_empty_ordered_set(): # Test serialization without tag s = NonEmptyOrderedSet([1, 2, 3], use_tag=False) primitive = s.to_primitive() - assert isinstance(primitive, list) - assert primitive == [1, 2, 3] + from frozenlist import FrozenList + + assert isinstance(primitive, (list, FrozenList)) + assert list(primitive) == [1, 2, 3] # Test serialization with tag s = NonEmptyOrderedSet([1, 2, 3], use_tag=True) @@ -1050,3 +1054,69 @@ def to_shallow_primitive(self) -> Union[Primitive, CBORSerializable]: with pytest.raises(IOError): test1.save(f.name) + + +def test_ordered_set_as_key_in_dict(): + a = NonEmptyOrderedSet([1, 2, 3]) + + class MyTest(DictCBORSerializable): + KEY_TYPE = NonEmptyOrderedSet + VALUE_TYPE = int + + d = MyTest() + d[a] = 1 + + check_two_way_cbor(d) + + +def test_indefinite_list_highjacking_does_not_break_cbor2(): + ls = IndefiniteFrozenList(["hello"]) + ls.freeze() + a = {ls: 1} + encoded = cbor2.dumps(a, default=default_encoder) + decoded = cbor2.loads(encoded) + assert isinstance(list(decoded.keys())[0], IndefiniteList) + + +def test_definite_list_highjacking_does_not_break_cbor2(): + ls = FrozenList(["hello"]) + ls.freeze() + a = {ls: 1} + encoded = cbor2.dumps(a, default=default_encoder) + decoded = cbor2.loads(encoded) + assert isinstance(list(decoded.keys())[0], (list, tuple)) + + +def test_indefinite_list_highjacking_does_not_break_cbor2_datum(): + ls = IndefiniteFrozenList(["hello"]) + ls.freeze() + datum = CBORTag(251, ls) + a = {datum: 1} + encoded = cbor2.dumps(a, default=default_encoder) + decoded = cbor2.loads(encoded) + assert isinstance(list(decoded.keys())[0], CBORTag) + assert isinstance(list(decoded.keys())[0].value, IndefiniteList) + + +def test_definite_list_highjacking_does_not_break_cbor2_datum(): + ls = FrozenList(["hello"]) + ls.freeze() + datum = CBORTag(251, ls) + a = {datum: 1} + encoded = cbor2.dumps(a, default=default_encoder) + decoded = cbor2.loads(encoded) + assert isinstance(list(decoded.keys())[0], CBORTag) + assert isinstance(list(decoded.keys())[0].value, (list, tuple)) + + +def test_ordered_set_as_key_in_dict_indefinite_list(): + a = NonEmptyOrderedSet(IndefiniteList([1, 2, 3])) + + class MyTest(DictCBORSerializable): + KEY_TYPE = NonEmptyOrderedSet + VALUE_TYPE = int + + d = MyTest() + d[a] = 1 + + check_two_way_cbor(d) diff --git a/test/pycardano/test_txbuilder.py b/test/pycardano/test_txbuilder.py index 13f183c2..67beb342 100644 --- a/test/pycardano/test_txbuilder.py +++ b/test/pycardano/test_txbuilder.py @@ -8,6 +8,7 @@ import pytest from cbor2 import CBORTag +from frozenlist import FrozenList from pycardano import ( AssetName, @@ -69,6 +70,13 @@ from pycardano.witness import TransactionWitnessSet, VerificationKeyWitness +def frozen_list(items): + """Helper function to create a frozen list from items.""" + fl = FrozenList(items) + fl.freeze() + return fl + + def test_tx_builder(chain_context): tx_builder = TransactionBuilder(chain_context, [RandomImproveMultiAsset([0, 0])]) sender = "addr_test1vrm9x2zsux7va6w892g38tvchnzahvcd9tykqf3ygnmwtaqyfg52x" @@ -82,7 +90,7 @@ def test_tx_builder(chain_context): tx_body = tx_builder.build(change_address=sender_address) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 0]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 0]])), 1: [ # First output [sender_address.to_primitive(), 500000], @@ -126,7 +134,7 @@ def test_tx_builder_with_certain_input(chain_context): tx_body = tx_builder.build(change_address=sender_address) expected = { - 0: CBORTag(258, [[b"2" * 32, 1]]), + 0: CBORTag(258, frozen_list([[b"2" * 32, 1]])), 1: [ # First output [sender_address.to_primitive(), 500000], @@ -296,7 +304,7 @@ def test_tx_builder_with_potential_inputs(chain_context): tx_body = tx_builder.build(change_address=sender_address) expect = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])), 1: [ # First output [sender_address.to_primitive(), 2500000], @@ -397,10 +405,12 @@ def test_tx_builder_mint_multi_asset(chain_context): expected = { 0: CBORTag( 258, - [ - [b"11111111111111111111111111111111", 0], - [b"22222222222222222222222222222222", 1], - ], + frozen_list( + [ + [b"11111111111111111111111111111111", 0], + [b"22222222222222222222222222222222", 1], + ] + ), ), 1: [ # First output @@ -420,7 +430,7 @@ def test_tx_builder_mint_multi_asset(chain_context): 3: 123456789, 8: 1000, 9: mint, - 14: CBORTag(258, [sender_address.payment_part.to_primitive()]), + 14: CBORTag(258, frozen_list([sender_address.payment_part.to_primitive()])), } assert expected == tx_body.to_primitive() @@ -1310,7 +1320,7 @@ def test_excluded_input(chain_context): tx_body = tx_builder.build(change_address=sender_address) expected = { - 0: CBORTag(258, [[b"22222222222222222222222222222222", 1]]), + 0: CBORTag(258, frozen_list([[b"22222222222222222222222222222222", 1]])), 1: [ # First output [sender_address.to_primitive(), 500000], @@ -1448,7 +1458,7 @@ def test_tx_builder_exact_fee_no_change(chain_context): tx = tx_builder.build_and_sign([SK]) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])), 1: [ [sender_address.to_primitive(), 9835951], ], @@ -1484,7 +1494,7 @@ def test_tx_builder_certificates(chain_context): tx_body = tx_builder.build(change_address=sender_address) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 0]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 0]])), 1: [ # First output [sender_address.to_primitive(), 500000], @@ -1603,7 +1613,7 @@ def test_tx_builder_stake_pool_registration(chain_context, pool_params): tx_body = tx_builder.build(change_address=sender_address) expected = { - 0: CBORTag(258, [[b"22222222222222222222222222222222", 2]]), + 0: CBORTag(258, frozen_list([[b"22222222222222222222222222222222", 2]])), 1: [ [ b"`\xf6S(P\xe1\xbc\xce\xe9\xc7*\x91\x13\xad\x98\xbc\xc5\xdb\xb3\r*\xc9`&$D\xf6\xe5\xf4", @@ -1659,7 +1669,7 @@ def test_tx_builder_withdrawal(chain_context): tx_body = tx_builder.build(change_address=sender_address) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 0]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 0]])), 1: [ # First output [sender_address.to_primitive(), 500000], @@ -1694,7 +1704,7 @@ def test_tx_builder_no_output(chain_context): ) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])), 1: [ [sender_address.to_primitive(), 9835951], ], @@ -1724,7 +1734,7 @@ def test_tx_builder_merge_change_to_output(chain_context): ) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])), 1: [ [sender_address.to_primitive(), 9835951], ], @@ -1758,7 +1768,7 @@ def test_tx_builder_merge_change_to_output_2(chain_context): ) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])), 1: [ [sender_address.to_primitive(), 10000], [receiver_address.to_primitive(), 10000], @@ -1790,7 +1800,7 @@ def test_tx_builder_merge_change_to_zero_amount_output(chain_context): ) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])), 1: [ [sender_address.to_primitive(), 9835951], ], @@ -1820,7 +1830,7 @@ def test_tx_builder_merge_change_smaller_than_min_utxo(chain_context): ) expected = { - 0: CBORTag(258, [[b"11111111111111111111111111111111", 3]]), + 0: CBORTag(258, frozen_list([[b"11111111111111111111111111111111", 3]])), 1: [ [sender_address.to_primitive(), 9835951], ],