From 6d963fe176134eef8fb5715fbb681366128b94e9 Mon Sep 17 00:00:00 2001 From: beedi Date: Sun, 29 Jun 2025 15:52:07 +0000 Subject: [PATCH 1/6] feat:added encryptionManager class for and generate && recoverKey methods && test files --- requirements.txt | 3 +- src/lighthouseweb3/__init__.py | 19 +- .../functions/encryptionManager/__init__.py | 0 .../functions/encryptionManager/config.py | 2 + .../functions/encryptionManager/generate.py | 87 +++++++++ .../functions/encryptionManager/recoverKey.py | 171 ++++++++++++++++++ tests/tests_encryptionEngine/__init__.py | 0 tests/tests_encryptionEngine/test_generate.py | 79 ++++++++ .../test_recover_key.py | 146 +++++++++++++++ 9 files changed, 505 insertions(+), 2 deletions(-) create mode 100644 src/lighthouseweb3/functions/encryptionManager/__init__.py create mode 100644 src/lighthouseweb3/functions/encryptionManager/config.py create mode 100644 src/lighthouseweb3/functions/encryptionManager/generate.py create mode 100644 src/lighthouseweb3/functions/encryptionManager/recoverKey.py create mode 100644 tests/tests_encryptionEngine/__init__.py create mode 100644 tests/tests_encryptionEngine/test_generate.py create mode 100644 tests/tests_encryptionEngine/test_recover_key.py diff --git a/requirements.txt b/requirements.txt index 13c4766..97fd17c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,4 +3,5 @@ charset-normalizer==3.1.0 idna==3.4 requests==2.31.0 urllib3==2.0.2 -eth-account==0.13.7 \ No newline at end of file +eth-account==0.13.7 +cryptography \ No newline at end of file diff --git a/src/lighthouseweb3/__init__.py b/src/lighthouseweb3/__init__.py index b1d8d7c..14bca18 100644 --- a/src/lighthouseweb3/__init__.py +++ b/src/lighthouseweb3/__init__.py @@ -2,6 +2,7 @@ import os import io +from typing import List, Dict, Any from .functions import ( upload as d, deal_status, @@ -16,7 +17,7 @@ remove_ipns_record as removeIpnsRecord, create_wallet as createWallet ) - +from .functions.encryptionManager import generate, recoverKey class Lighthouse: def __init__(self, token: str = ""): @@ -224,3 +225,19 @@ def getTagged(self, tag: str): except Exception as e: raise e +class EncryptionManager: + @staticmethod + def generate(threshold: int, keyCount: int): + try: + return generate.generate(threshold, keyCount) + except Exception as e: + raise e + + + @staticmethod + def recoverKey(keyShards: List[Dict[str, Any]]): + try: + return recoverKey.recoverKey(keyShards) + except Exception as e: + raise e + \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryptionManager/__init__.py b/src/lighthouseweb3/functions/encryptionManager/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/lighthouseweb3/functions/encryptionManager/config.py b/src/lighthouseweb3/functions/encryptionManager/config.py new file mode 100644 index 0000000..bcc79dc --- /dev/null +++ b/src/lighthouseweb3/functions/encryptionManager/config.py @@ -0,0 +1,2 @@ +#A 257-bit prime to accommodate 256-bit secrets +PRIME = 2**256 + 297 \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryptionManager/generate.py b/src/lighthouseweb3/functions/encryptionManager/generate.py new file mode 100644 index 0000000..c9e6ba4 --- /dev/null +++ b/src/lighthouseweb3/functions/encryptionManager/generate.py @@ -0,0 +1,87 @@ +import secrets +import logging +from typing import Dict, List, Any +from .config import PRIME +logger = logging.getLogger(__name__) + + +def evaluate_polynomial(coefficients: List[int], x: int, prime: int) -> int: + """ + Evaluate a polynomial with given coefficients at point x. + msk[0] is constant term (the secret), msk[1] is x coefficient, etc. + + Args: + coefficients: List of coefficients where coefficients[0] is the constant term + x: Point at which to evaluate the polynomial + prime: Prime number for the finite field + + Returns: + The result of the polynomial evaluation modulo prime + """ + result = 0 + x_power = 1 # x^0 = 1 + + for coefficient in coefficients: + result = (result + coefficient * x_power) % prime + x_power = (x_power * x) % prime + + return result + +async def generate(threshold: int = 3, key_count: int = 5) -> Dict[str, Any]: + """ + Generate threshold cryptography key shards using Shamir's Secret Sharing + + Args: + threshold: Minimum number of shards needed to reconstruct the secret + key_count: Total number of key shards to generate + + Returns: + { + "masterKey": "", + "keyShards": [ + { + "key": "", + "index": "" + } + ] + } + """ + logger.info(f"Generating key shards with threshold={threshold}, key_count={key_count}") + + msk=[] + idVec=[] + secVec=[] + + if threshold > key_count: + raise ValueError("key_count must be greater than or equal to threshold") + if threshold < 1 or key_count < 1: + raise ValueError("threshold and key_count must be positive integers") + + + msk = [secrets.randbits(256) for _ in range(threshold)] + master_key = msk[0] + + used_ids = set() + + for i in range(key_count): + while True: + id_vec = secrets.randbits(32) + if id_vec != 0 and id_vec not in used_ids: + idVec.append(id_vec) + used_ids.add(id_vec) + break + + for i in range(key_count): + y = evaluate_polynomial(msk, idVec[i], PRIME) + secVec.append(y) + + result = { + "masterKey": hex(master_key), + "keyShards": [{"key": hex(secVec[i]), "index": hex(idVec[i])} for i in range(key_count)] + } + return result + +if __name__ == "__main__": + import asyncio + result = asyncio.run(generate(threshold=1, key_count=1)) + print(result) \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryptionManager/recoverKey.py b/src/lighthouseweb3/functions/encryptionManager/recoverKey.py new file mode 100644 index 0000000..8856723 --- /dev/null +++ b/src/lighthouseweb3/functions/encryptionManager/recoverKey.py @@ -0,0 +1,171 @@ +from typing import List, Dict, Any +import logging +from .config import PRIME + +logger = logging.getLogger(__name__) + +from typing import Tuple + +def extended_gcd(a: int, b: int) -> Tuple[int, int, int]: + """Extended Euclidean algorithm to find modular inverse. + + Args: + a: First integer + b: Second integer + + Returns: + A tuple (g, x, y) such that a*x + b*y = g = gcd(a, b) + """ + if a == 0: + return b, 0, 1 + else: + g, y, x = extended_gcd(b % a, a) + return g, x - (b // a) * y, y + +def modinv(a: int, m: int) -> int: + """Find the modular inverse of a mod m.""" + g, x, y = extended_gcd(a, m) + if g != 1: + raise ValueError('Modular inverse does not exist') + else: + return x % m + +def lagrange_interpolation(shares: List[Dict[str, str]], prime: int) -> int: + """ + Reconstruct the secret using Lagrange interpolation. + + Args: + shares: List of dictionaries with 'key' and 'index' fields + prime: The prime number used in the finite field + + Returns: + The reconstructed secret as integer + + Raises: + ValueError: If there are duplicate indices + """ + + points = [] + seen_indices = set() + + for i, share in enumerate(shares): + try: + key_str, index_str = validate_share(share, i) + x = int(index_str, 16) + + if x in seen_indices: + raise ValueError(f"Duplicate share index found: 0x{x:x}") + seen_indices.add(x) + + y = int(key_str, 16) + points.append((x, y)) + except ValueError as e: + raise ValueError(f"Invalid share at position {i}: {e}") + + + secret = 0 + + for i, (x_i, y_i) in enumerate(points): + # Calculate the Lagrange basis polynomial L_i(0) + # Evaluate at x=0 to get the constant term + numerator = 1 + denominator = 1 + + for j, (x_j, _) in enumerate(points): + if i != j: + numerator = (numerator * (-x_j)) % prime + denominator = (denominator * (x_i - x_j)) % prime + + try: + inv_denominator = modinv(denominator, prime) + except ValueError as e: + raise ValueError(f"Error in modular inverse calculation: {e}") + + term = (y_i * numerator * inv_denominator) % prime + secret = (secret + term) % prime + + return secret + +def validate_share(share: Dict[str, str], index: int) -> Tuple[str, str]: + """Validate and normalize a single share. + + Args: + share: Dictionary containing 'key' and 'index' fields + index: Position of the share in the input list (for error messages) + + Returns: + Tuple of (normalized_key, normalized_index) as strings without '0x' prefix + + Raises: + ValueError: If the share is invalid + """ + if not isinstance(share, dict): + raise ValueError(f"Share at index {index} must be a dictionary") + + if 'key' not in share or 'index' not in share: + raise ValueError(f"Share at index {index} is missing required fields 'key' or 'index'") + + key_str = str(share['key']).strip().lower() + index_str = str(share['index']).strip().lower() + + if key_str.startswith('0x'): + key_str = key_str[2:] + if index_str.startswith('0x'): + index_str = index_str[2:] + + + if not key_str: + raise ValueError(f"Empty key in share at index {index}") + if not all(c in '0123456789abcdef' for c in key_str): + raise ValueError(f"Invalid key format in share at index {index}: must be a valid hex string") + + if len(key_str) % 2 != 0: + key_str = '0' + key_str + + if not index_str: + raise ValueError(f"Empty index in share at index {index}") + if not all(c in '0123456789abcdef' for c in index_str): + raise ValueError(f"Invalid index format in share at index {index}: must be a valid hex string") + + index_int = int(index_str, 16) + if not (0 <= index_int <= 0xFFFFFFFF): + raise ValueError(f"Index out of range in share at index {index}: must be between 0 and 2^32-1") + + return key_str, index_str + + +async def recoverKey(keyShards: List[Dict[str, str]]) -> Dict[str, Any]: + """ + Recover the master key from a subset of key shares using Lagrange interpolation. + + Args: + keyShards: List of dictionaries containing 'key' and 'index' fields + + Returns: + { + "masterKey": "", + "error": "" + } + """ + logger.info(f"Attempting to recover master key from {len(keyShards)} shares") + + try: + for i, share in enumerate(keyShards): + validate_share(share, i) + secret = lagrange_interpolation(keyShards, PRIME) + return { + "masterKey": hex(secret), + "error": None + } + except ValueError as e: + logger.error(f"Validation error during key recovery: {str(e)}") + return { + "masterKey": None, + "error": f"Validation error: {str(e)}" + } + except Exception as e: + logger.error(f"Error during key recovery: {str(e)}") + return { + "masterKey": None, + "error": f"Recovery error: {str(e)}" + } diff --git a/tests/tests_encryptionEngine/__init__.py b/tests/tests_encryptionEngine/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/tests_encryptionEngine/test_generate.py b/tests/tests_encryptionEngine/test_generate.py new file mode 100644 index 0000000..3408ed3 --- /dev/null +++ b/tests/tests_encryptionEngine/test_generate.py @@ -0,0 +1,79 @@ +import unittest +import asyncio +import logging +from src.lighthouseweb3 import EncryptionManager + +logger = logging.getLogger(__name__) + +class TestGenerate(unittest.TestCase): + """Test cases for the generate module.""" + + def test_generate_basic(self): + """Test basic key generation with default parameters.""" + async def run_test(): + result = await EncryptionManager.generate(threshold=2, keyCount=3) + + self.assertIn('masterKey', result) + self.assertIn('keyShards', result) + + # Check master key format (hex string with 0x prefix) + self.assertIsInstance(result['masterKey'], str) + self.assertTrue(result['masterKey'].startswith('0x')) + self.assertTrue(all(c in '0123456789abcdef' for c in result['masterKey'][2:])) + + # Check key shards + self.assertEqual(len(result['keyShards']), 3) + for shard in result['keyShards']: + self.assertIn('key', shard) + self.assertIn('index', shard) + + # Check key format (hex string with 0x prefix) + self.assertTrue(shard['key'].startswith('0x')) + self.assertTrue(all(c in '0123456789abcdef' for c in shard['key'][2:])) + + # Check index format (hex string with 0x prefix) + self.assertTrue(shard['index'].startswith('0x')) + self.assertTrue(all(c in '0123456789abcdef' for c in shard['index'][2:])) + + return result + + return asyncio.run(run_test()) + + def test_generate_custom_parameters(self): + """Test key generation with custom parameters.""" + async def run_test(): + threshold = 3 + key_count = 5 + + result = await EncryptionManager.generate(threshold=threshold, keyCount=key_count) + + self.assertEqual(len(result['keyShards']), key_count) + + # Check all indices are present and unique + indices = [shard['index'] for shard in result['keyShards']] + self.assertEqual(len(set(indices)), key_count) # All unique + + # Verify all indices are valid hex strings with 0x prefix + for index in indices: + self.assertTrue(index.startswith('0x')) + self.assertTrue(all(c in '0123456789abcdef' for c in index[2:])) + + return result + + return asyncio.run(run_test()) + + def test_invalid_threshold(self): + """Test that invalid threshold raises an error.""" + async def run_test(): + with self.assertRaises(ValueError) as context: + await EncryptionManager.generate(threshold=0, keyCount=3) + self.assertIn("must be positive integers", str(context.exception)) + + with self.assertRaises(ValueError) as context: + await EncryptionManager.generate(threshold=4, keyCount=3) + self.assertIn("must be greater than or equal to threshold", str(context.exception)) + + return asyncio.run(run_test()) + +if __name__ == '__main__': + unittest.main(verbosity=2) diff --git a/tests/tests_encryptionEngine/test_recover_key.py b/tests/tests_encryptionEngine/test_recover_key.py new file mode 100644 index 0000000..e159687 --- /dev/null +++ b/tests/tests_encryptionEngine/test_recover_key.py @@ -0,0 +1,146 @@ +import unittest +import asyncio +import logging +from src.lighthouseweb3 import EncryptionManager + +logger = logging.getLogger(__name__) + +class TestRecoverKey(unittest.TestCase): + """Test cases for the recoverKey module.""" + + def test_empty_shares_list(self): + """Test that recovery fails with empty shares list.""" + async def run_test(): + result = await EncryptionManager.recoverKey([]) + self.assertEqual(result['masterKey'], '0x0') + self.assertIsNone(result['error']) + + return asyncio.run(run_test()) + + + def test_recover_key_with_generated_shares(self): + """Test key recovery with dynamically generated shares.""" + async def run_test(): + + threshold = 3 + key_count = 5 + gen_result = await EncryptionManager.generate(threshold=threshold, keyCount=key_count) + master_key = gen_result['masterKey'] + + shares = gen_result['keyShards'][:threshold] + result = await EncryptionManager.recoverKey(shares) + self.assertEqual(result['masterKey'], master_key) + self.assertIsNone(result['error']) + + for i in range(key_count - threshold + 1): + subset = gen_result['keyShards'][i:i+threshold] + result = await EncryptionManager.recoverKey(subset) + self.assertEqual(result['masterKey'], master_key) + self.assertIsNone(result['error']) + + return result + + return asyncio.run(run_test()) + + def test_recover_key_insufficient_shares(self): + """Test with minimum threshold shares""" + async def run_test(): + threshold = 2 + key_count = 5 + gen_result = await EncryptionManager.generate(threshold=threshold, keyCount=key_count) + master_key = gen_result['masterKey'] + shares = gen_result['keyShards'][:threshold] + result = await EncryptionManager.recoverKey(shares) + self.assertEqual(result['masterKey'], master_key) + self.assertIsNone(result['error']) + + result = await EncryptionManager.recoverKey(gen_result['keyShards']) + self.assertEqual(result['masterKey'], master_key) + self.assertIsNone(result['error']) + + return asyncio.run(run_test()) + + def test_insufficient_shares(self): + """Test with insufficient shares for recovery""" + async def run_test(): + threshold = 3 + key_count = 5 + gen_result = await EncryptionManager.generate(threshold=threshold, keyCount=key_count) + + # Test with one less than threshold (should still work as long as we have at least 2 shares) + result = await EncryptionManager.recoverKey(gen_result['keyShards'][:threshold-1]) + self.assertIsNotNone(result['masterKey']) + self.assertIsNone(result['error']) + + # Test with single share (should still work as long as we have at least 1 share) + result = await EncryptionManager.recoverKey(gen_result['keyShards'][:1]) + self.assertIsNotNone(result['masterKey']) + self.assertIsNone(result['error']) + + return asyncio.run(run_test()) + + def test_various_threshold_combinations(self): + """Test recovery with various threshold and share count combinations""" + async def run_test(): + test_cases = [ + (2, 3), + (3, 5), + (4, 7), + (3, 10), + ] + + for threshold, total in test_cases: + with self.subTest(threshold=threshold, total=total): + gen_result = await EncryptionManager.generate( + threshold=threshold, + keyCount=total + ) + master_key = gen_result['masterKey'] + + shares = gen_result['keyShards'][:threshold] + result = await EncryptionManager.recoverKey(shares) + self.assertEqual(result['masterKey'], master_key) + self.assertIsNone(result['error']) + + result = await EncryptionManager.recoverKey(gen_result['keyShards']) + self.assertEqual(result['masterKey'], master_key) + self.assertIsNone(result['error']) + + import random + subset = random.sample(gen_result['keyShards'], threshold + 1) + result = await EncryptionManager.recoverKey(subset) + self.assertEqual(result['masterKey'], master_key) + self.assertIsNone(result['error']) + + return asyncio.run(run_test()) + + + def test_invalid_share_format(self): + """Test that invalid share formats are handled correctly.""" + async def run_test(): + result = await EncryptionManager.recoverKey(["not a dict", "another invalid"]) + self.assertIsNone(result['masterKey']) + self.assertIn("must be a dictionary", result['error']) + + result = await EncryptionManager.recoverKey([{'key': '123'}, {'key': '456'}]) + self.assertIsNone(result['masterKey']) + self.assertIn("missing required fields 'key' or 'index'", result['error'].lower()) + + result = await EncryptionManager.recoverKey([ + {'key': 'invalidhex', 'index': '1'}, + {'key': 'invalidhex2', 'index': '2'} + ]) + self.assertIsNone(result['masterKey']) + self.assertIn("invalid key format", result['error'].lower()) + + result = await EncryptionManager.recoverKey([ + {'key': 'a' * 63, 'index': 'invalidindex'}, + {'key': 'b' * 63, 'index': 'invalidindex2'} + ]) + self.assertIsNone(result['masterKey']) + self.assertIn("invalid index format", result['error'].lower()) + + return asyncio.run(run_test()) + +if __name__ == '__main__': + unittest.main(verbosity=2) From f5cea273e4e42706bf0988f6acc280b7c5f984da Mon Sep 17 00:00:00 2001 From: AnonO6 <21ucs043@gmail.com> Date: Sun, 29 Jun 2025 17:17:43 +0000 Subject: [PATCH 2/6] fix:removed func call from generate.py --- src/lighthouseweb3/functions/encryptionManager/generate.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/lighthouseweb3/functions/encryptionManager/generate.py b/src/lighthouseweb3/functions/encryptionManager/generate.py index c9e6ba4..ab8c2f9 100644 --- a/src/lighthouseweb3/functions/encryptionManager/generate.py +++ b/src/lighthouseweb3/functions/encryptionManager/generate.py @@ -80,8 +80,3 @@ async def generate(threshold: int = 3, key_count: int = 5) -> Dict[str, Any]: "keyShards": [{"key": hex(secVec[i]), "index": hex(idVec[i])} for i in range(key_count)] } return result - -if __name__ == "__main__": - import asyncio - result = asyncio.run(generate(threshold=1, key_count=1)) - print(result) \ No newline at end of file From 45c0e718b0018b47dec0785fbce3130f5b747820 Mon Sep 17 00:00:00 2001 From: AnonO6 <21ucs043@gmail.com> Date: Sun, 29 Jun 2025 18:59:22 +0000 Subject: [PATCH 3/6] fix:removed cryptography from requirements.txt --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 97fd17c..13c4766 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,5 +3,4 @@ charset-normalizer==3.1.0 idna==3.4 requests==2.31.0 urllib3==2.0.2 -eth-account==0.13.7 -cryptography \ No newline at end of file +eth-account==0.13.7 \ No newline at end of file From 2b2382919930a68b4e4c9f1f21000158a5425327 Mon Sep 17 00:00:00 2001 From: AnonO6 <21ucs043@gmail.com> Date: Wed, 2 Jul 2025 18:43:59 +0000 Subject: [PATCH 4/6] fix: updated the variables casing --- src/lighthouseweb3/__init__.py | 7 +++++-- .../encryptionManager/{recoverKey.py => recover_key.py} | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) rename src/lighthouseweb3/functions/encryptionManager/{recoverKey.py => recover_key.py} (98%) diff --git a/src/lighthouseweb3/__init__.py b/src/lighthouseweb3/__init__.py index 14bca18..3a75113 100644 --- a/src/lighthouseweb3/__init__.py +++ b/src/lighthouseweb3/__init__.py @@ -17,7 +17,10 @@ remove_ipns_record as removeIpnsRecord, create_wallet as createWallet ) -from .functions.encryptionManager import generate, recoverKey +from .functions.encryptionManager import ( + generate, + recover_key as recoverKey +) class Lighthouse: def __init__(self, token: str = ""): @@ -237,7 +240,7 @@ def generate(threshold: int, keyCount: int): @staticmethod def recoverKey(keyShards: List[Dict[str, Any]]): try: - return recoverKey.recoverKey(keyShards) + return recoverKey.recover_key(keyShards) except Exception as e: raise e \ No newline at end of file diff --git a/src/lighthouseweb3/functions/encryptionManager/recoverKey.py b/src/lighthouseweb3/functions/encryptionManager/recover_key.py similarity index 98% rename from src/lighthouseweb3/functions/encryptionManager/recoverKey.py rename to src/lighthouseweb3/functions/encryptionManager/recover_key.py index 8856723..f03d54a 100644 --- a/src/lighthouseweb3/functions/encryptionManager/recoverKey.py +++ b/src/lighthouseweb3/functions/encryptionManager/recover_key.py @@ -134,7 +134,7 @@ def validate_share(share: Dict[str, str], index: int) -> Tuple[str, str]: return key_str, index_str -async def recoverKey(keyShards: List[Dict[str, str]]) -> Dict[str, Any]: +async def recover_key(keyShards: List[Dict[str, str]]) -> Dict[str, Any]: """ Recover the master key from a subset of key shares using Lagrange interpolation. From 94a1670eac3aa998439fca6ef565b05f45290a6f Mon Sep 17 00:00:00 2001 From: AnonO6 <21ucs043@gmail.com> Date: Wed, 2 Jul 2025 19:42:38 +0000 Subject: [PATCH 5/6] refactor: updated the hex validation func to use bytes.fromhex --- .../encryptionManager/recover_key.py | 24 ++++++++++++------- 1 file changed, 15 insertions(+), 9 deletions(-) diff --git a/src/lighthouseweb3/functions/encryptionManager/recover_key.py b/src/lighthouseweb3/functions/encryptionManager/recover_key.py index f03d54a..f0a6165 100644 --- a/src/lighthouseweb3/functions/encryptionManager/recover_key.py +++ b/src/lighthouseweb3/functions/encryptionManager/recover_key.py @@ -112,19 +112,26 @@ def validate_share(share: Dict[str, str], index: int) -> Tuple[str, str]: key_str = key_str[2:] if index_str.startswith('0x'): index_str = index_str[2:] - if not key_str: raise ValueError(f"Empty key in share at index {index}") - if not all(c in '0123456789abcdef' for c in key_str): - raise ValueError(f"Invalid key format in share at index {index}: must be a valid hex string") + if not index_str: + raise ValueError(f"Empty index in share at index {index}") if len(key_str) % 2 != 0: key_str = '0' + key_str - - if not index_str: - raise ValueError(f"Empty index in share at index {index}") - if not all(c in '0123456789abcdef' for c in index_str): + + if len(index_str) % 2 != 0: + index_str = '0' + index_str + + try: + bytes.fromhex(key_str) + except ValueError: + raise ValueError(f"Invalid key format in share at index {index}: must be a valid hex string") + + try: + bytes.fromhex(index_str) + except ValueError: raise ValueError(f"Invalid index format in share at index {index}: must be a valid hex string") index_int = int(index_str, 16) @@ -133,7 +140,6 @@ def validate_share(share: Dict[str, str], index: int) -> Tuple[str, str]: return key_str, index_str - async def recover_key(keyShards: List[Dict[str, str]]) -> Dict[str, Any]: """ Recover the master key from a subset of key shares using Lagrange interpolation. @@ -168,4 +174,4 @@ async def recover_key(keyShards: List[Dict[str, str]]) -> Dict[str, Any]: return { "masterKey": None, "error": f"Recovery error: {str(e)}" - } + } \ No newline at end of file From 629bafd961201525763f15d7c79b153e1bd2f60e Mon Sep 17 00:00:00 2001 From: AnonO6 <21ucs043@gmail.com> Date: Thu, 10 Jul 2025 16:49:51 +0000 Subject: [PATCH 6/6] refactor:class rename --- src/lighthouseweb3/__init__.py | 4 +- .../{encryptionManager => kavach}/__init__.py | 0 .../{encryptionManager => kavach}/config.py | 0 .../{encryptionManager => kavach}/generate.py | 0 .../recover_key.py | 0 .../__init__.py | 0 .../test_generate.py | 10 ++--- .../test_recover_key.py | 38 +++++++++---------- 8 files changed, 26 insertions(+), 26 deletions(-) rename src/lighthouseweb3/functions/{encryptionManager => kavach}/__init__.py (100%) rename src/lighthouseweb3/functions/{encryptionManager => kavach}/config.py (100%) rename src/lighthouseweb3/functions/{encryptionManager => kavach}/generate.py (100%) rename src/lighthouseweb3/functions/{encryptionManager => kavach}/recover_key.py (100%) rename tests/{tests_encryptionEngine => tests_kavach}/__init__.py (100%) rename tests/{tests_encryptionEngine => tests_kavach}/test_generate.py (88%) rename tests/{tests_encryptionEngine => tests_kavach}/test_recover_key.py (76%) diff --git a/src/lighthouseweb3/__init__.py b/src/lighthouseweb3/__init__.py index 3a75113..358777f 100644 --- a/src/lighthouseweb3/__init__.py +++ b/src/lighthouseweb3/__init__.py @@ -17,7 +17,7 @@ remove_ipns_record as removeIpnsRecord, create_wallet as createWallet ) -from .functions.encryptionManager import ( +from .functions.kavach import ( generate, recover_key as recoverKey ) @@ -228,7 +228,7 @@ def getTagged(self, tag: str): except Exception as e: raise e -class EncryptionManager: +class Kavach: @staticmethod def generate(threshold: int, keyCount: int): try: diff --git a/src/lighthouseweb3/functions/encryptionManager/__init__.py b/src/lighthouseweb3/functions/kavach/__init__.py similarity index 100% rename from src/lighthouseweb3/functions/encryptionManager/__init__.py rename to src/lighthouseweb3/functions/kavach/__init__.py diff --git a/src/lighthouseweb3/functions/encryptionManager/config.py b/src/lighthouseweb3/functions/kavach/config.py similarity index 100% rename from src/lighthouseweb3/functions/encryptionManager/config.py rename to src/lighthouseweb3/functions/kavach/config.py diff --git a/src/lighthouseweb3/functions/encryptionManager/generate.py b/src/lighthouseweb3/functions/kavach/generate.py similarity index 100% rename from src/lighthouseweb3/functions/encryptionManager/generate.py rename to src/lighthouseweb3/functions/kavach/generate.py diff --git a/src/lighthouseweb3/functions/encryptionManager/recover_key.py b/src/lighthouseweb3/functions/kavach/recover_key.py similarity index 100% rename from src/lighthouseweb3/functions/encryptionManager/recover_key.py rename to src/lighthouseweb3/functions/kavach/recover_key.py diff --git a/tests/tests_encryptionEngine/__init__.py b/tests/tests_kavach/__init__.py similarity index 100% rename from tests/tests_encryptionEngine/__init__.py rename to tests/tests_kavach/__init__.py diff --git a/tests/tests_encryptionEngine/test_generate.py b/tests/tests_kavach/test_generate.py similarity index 88% rename from tests/tests_encryptionEngine/test_generate.py rename to tests/tests_kavach/test_generate.py index 3408ed3..386fe79 100644 --- a/tests/tests_encryptionEngine/test_generate.py +++ b/tests/tests_kavach/test_generate.py @@ -1,7 +1,7 @@ import unittest import asyncio import logging -from src.lighthouseweb3 import EncryptionManager +from src.lighthouseweb3 import Kavach logger = logging.getLogger(__name__) @@ -11,7 +11,7 @@ class TestGenerate(unittest.TestCase): def test_generate_basic(self): """Test basic key generation with default parameters.""" async def run_test(): - result = await EncryptionManager.generate(threshold=2, keyCount=3) + result = await Kavach.generate(threshold=2, keyCount=3) self.assertIn('masterKey', result) self.assertIn('keyShards', result) @@ -45,7 +45,7 @@ async def run_test(): threshold = 3 key_count = 5 - result = await EncryptionManager.generate(threshold=threshold, keyCount=key_count) + result = await Kavach.generate(threshold=threshold, keyCount=key_count) self.assertEqual(len(result['keyShards']), key_count) @@ -66,11 +66,11 @@ def test_invalid_threshold(self): """Test that invalid threshold raises an error.""" async def run_test(): with self.assertRaises(ValueError) as context: - await EncryptionManager.generate(threshold=0, keyCount=3) + await Kavach.generate(threshold=0, keyCount=3) self.assertIn("must be positive integers", str(context.exception)) with self.assertRaises(ValueError) as context: - await EncryptionManager.generate(threshold=4, keyCount=3) + await Kavach.generate(threshold=4, keyCount=3) self.assertIn("must be greater than or equal to threshold", str(context.exception)) return asyncio.run(run_test()) diff --git a/tests/tests_encryptionEngine/test_recover_key.py b/tests/tests_kavach/test_recover_key.py similarity index 76% rename from tests/tests_encryptionEngine/test_recover_key.py rename to tests/tests_kavach/test_recover_key.py index e159687..d0c96e3 100644 --- a/tests/tests_encryptionEngine/test_recover_key.py +++ b/tests/tests_kavach/test_recover_key.py @@ -1,7 +1,7 @@ import unittest import asyncio import logging -from src.lighthouseweb3 import EncryptionManager +from src.lighthouseweb3 import Kavach logger = logging.getLogger(__name__) @@ -11,7 +11,7 @@ class TestRecoverKey(unittest.TestCase): def test_empty_shares_list(self): """Test that recovery fails with empty shares list.""" async def run_test(): - result = await EncryptionManager.recoverKey([]) + result = await Kavach.recoverKey([]) self.assertEqual(result['masterKey'], '0x0') self.assertIsNone(result['error']) @@ -24,17 +24,17 @@ async def run_test(): threshold = 3 key_count = 5 - gen_result = await EncryptionManager.generate(threshold=threshold, keyCount=key_count) + gen_result = await Kavach.generate(threshold=threshold, keyCount=key_count) master_key = gen_result['masterKey'] shares = gen_result['keyShards'][:threshold] - result = await EncryptionManager.recoverKey(shares) + result = await Kavach.recoverKey(shares) self.assertEqual(result['masterKey'], master_key) self.assertIsNone(result['error']) for i in range(key_count - threshold + 1): subset = gen_result['keyShards'][i:i+threshold] - result = await EncryptionManager.recoverKey(subset) + result = await Kavach.recoverKey(subset) self.assertEqual(result['masterKey'], master_key) self.assertIsNone(result['error']) @@ -47,14 +47,14 @@ def test_recover_key_insufficient_shares(self): async def run_test(): threshold = 2 key_count = 5 - gen_result = await EncryptionManager.generate(threshold=threshold, keyCount=key_count) + gen_result = await Kavach.generate(threshold=threshold, keyCount=key_count) master_key = gen_result['masterKey'] shares = gen_result['keyShards'][:threshold] - result = await EncryptionManager.recoverKey(shares) + result = await Kavach.recoverKey(shares) self.assertEqual(result['masterKey'], master_key) self.assertIsNone(result['error']) - result = await EncryptionManager.recoverKey(gen_result['keyShards']) + result = await Kavach.recoverKey(gen_result['keyShards']) self.assertEqual(result['masterKey'], master_key) self.assertIsNone(result['error']) @@ -65,15 +65,15 @@ def test_insufficient_shares(self): async def run_test(): threshold = 3 key_count = 5 - gen_result = await EncryptionManager.generate(threshold=threshold, keyCount=key_count) + gen_result = await Kavach.generate(threshold=threshold, keyCount=key_count) # Test with one less than threshold (should still work as long as we have at least 2 shares) - result = await EncryptionManager.recoverKey(gen_result['keyShards'][:threshold-1]) + result = await Kavach.recoverKey(gen_result['keyShards'][:threshold-1]) self.assertIsNotNone(result['masterKey']) self.assertIsNone(result['error']) # Test with single share (should still work as long as we have at least 1 share) - result = await EncryptionManager.recoverKey(gen_result['keyShards'][:1]) + result = await Kavach.recoverKey(gen_result['keyShards'][:1]) self.assertIsNotNone(result['masterKey']) self.assertIsNone(result['error']) @@ -91,24 +91,24 @@ async def run_test(): for threshold, total in test_cases: with self.subTest(threshold=threshold, total=total): - gen_result = await EncryptionManager.generate( + gen_result = await Kavach.generate( threshold=threshold, keyCount=total ) master_key = gen_result['masterKey'] shares = gen_result['keyShards'][:threshold] - result = await EncryptionManager.recoverKey(shares) + result = await Kavach.recoverKey(shares) self.assertEqual(result['masterKey'], master_key) self.assertIsNone(result['error']) - result = await EncryptionManager.recoverKey(gen_result['keyShards']) + result = await Kavach.recoverKey(gen_result['keyShards']) self.assertEqual(result['masterKey'], master_key) self.assertIsNone(result['error']) import random subset = random.sample(gen_result['keyShards'], threshold + 1) - result = await EncryptionManager.recoverKey(subset) + result = await Kavach.recoverKey(subset) self.assertEqual(result['masterKey'], master_key) self.assertIsNone(result['error']) @@ -118,22 +118,22 @@ async def run_test(): def test_invalid_share_format(self): """Test that invalid share formats are handled correctly.""" async def run_test(): - result = await EncryptionManager.recoverKey(["not a dict", "another invalid"]) + result = await Kavach.recoverKey(["not a dict", "another invalid"]) self.assertIsNone(result['masterKey']) self.assertIn("must be a dictionary", result['error']) - result = await EncryptionManager.recoverKey([{'key': '123'}, {'key': '456'}]) + result = await Kavach.recoverKey([{'key': '123'}, {'key': '456'}]) self.assertIsNone(result['masterKey']) self.assertIn("missing required fields 'key' or 'index'", result['error'].lower()) - result = await EncryptionManager.recoverKey([ + result = await Kavach.recoverKey([ {'key': 'invalidhex', 'index': '1'}, {'key': 'invalidhex2', 'index': '2'} ]) self.assertIsNone(result['masterKey']) self.assertIn("invalid key format", result['error'].lower()) - result = await EncryptionManager.recoverKey([ + result = await Kavach.recoverKey([ {'key': 'a' * 63, 'index': 'invalidindex'}, {'key': 'b' * 63, 'index': 'invalidindex2'} ])