-
Notifications
You must be signed in to change notification settings - Fork 23
Fix ECDSA timing attack vulnerability #73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -3,11 +3,15 @@ | |||||||||||
|
|
||||||||||||
| from __future__ import annotations | ||||||||||||
|
|
||||||||||||
| import hashlib | ||||||||||||
| import unittest | ||||||||||||
| from typing import cast | ||||||||||||
|
|
||||||||||||
| from ecdsa import SECP256k1, SigningKey, VerifyingKey, util | ||||||||||||
| from cryptography.hazmat.primitives import hashes | ||||||||||||
| from cryptography.hazmat.primitives.asymmetric import ec | ||||||||||||
| from cryptography.hazmat.primitives.asymmetric.utils import ( | ||||||||||||
| decode_dss_signature, | ||||||||||||
| encode_dss_signature, | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| from . import asymmetric_crypto | ||||||||||||
| from .bcs import Deserializer, Serializer | ||||||||||||
|
|
@@ -16,19 +20,35 @@ | |||||||||||
| class PrivateKey(asymmetric_crypto.PrivateKey): | ||||||||||||
| LENGTH: int = 32 | ||||||||||||
|
|
||||||||||||
| key: SigningKey | ||||||||||||
| key: ec.EllipticCurvePrivateKey | ||||||||||||
|
|
||||||||||||
| def __init__(self, key: SigningKey): | ||||||||||||
| def __init__(self, key: ec.EllipticCurvePrivateKey): | ||||||||||||
| self.key = key | ||||||||||||
|
|
||||||||||||
| def __eq__(self, other: object): | ||||||||||||
| if not isinstance(other, PrivateKey): | ||||||||||||
| return NotImplemented | ||||||||||||
| return self.key == other.key | ||||||||||||
| # Compare private values | ||||||||||||
| return self._to_bytes() == other._to_bytes() | ||||||||||||
|
|
||||||||||||
| def __str__(self): | ||||||||||||
| return self.aip80() | ||||||||||||
|
|
||||||||||||
| def _to_bytes(self) -> bytes: | ||||||||||||
| """Convert private key to raw 32-byte representation.""" | ||||||||||||
| private_numbers = self.key.private_numbers() | ||||||||||||
| return private_numbers.private_value.to_bytes( | ||||||||||||
| PrivateKey.LENGTH, byteorder="big" | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| @staticmethod | ||||||||||||
| def _from_bytes(key_bytes: bytes) -> ec.EllipticCurvePrivateKey: | ||||||||||||
| """Create private key from raw 32-byte representation.""" | ||||||||||||
| if len(key_bytes) != PrivateKey.LENGTH: | ||||||||||||
| raise Exception("Length mismatch") | ||||||||||||
| private_value = int.from_bytes(key_bytes, byteorder="big") | ||||||||||||
| return ec.derive_private_key(private_value, ec.SECP256K1()) | ||||||||||||
|
|
||||||||||||
| @staticmethod | ||||||||||||
| def from_hex(value: str | bytes, strict: bool | None = None) -> PrivateKey: | ||||||||||||
| """ | ||||||||||||
|
|
@@ -43,9 +63,7 @@ def from_hex(value: str | bytes, strict: bool | None = None) -> PrivateKey: | |||||||||||
| ) | ||||||||||||
| if len(parsed_value.hex()) != PrivateKey.LENGTH * 2: | ||||||||||||
| raise Exception("Length mismatch") | ||||||||||||
| return PrivateKey( | ||||||||||||
| SigningKey.from_string(parsed_value, SECP256k1, hashlib.sha3_256) | ||||||||||||
| ) | ||||||||||||
| return PrivateKey(PrivateKey._from_bytes(parsed_value)) | ||||||||||||
|
|
||||||||||||
| @staticmethod | ||||||||||||
| def from_str(value: str, strict: bool | None = None) -> PrivateKey: | ||||||||||||
|
|
@@ -59,61 +77,93 @@ def from_str(value: str, strict: bool | None = None) -> PrivateKey: | |||||||||||
| return PrivateKey.from_hex(value, strict) | ||||||||||||
|
|
||||||||||||
| def hex(self) -> str: | ||||||||||||
| return f"0x{self.key.to_string().hex()}" | ||||||||||||
| return f"0x{self._to_bytes().hex()}" | ||||||||||||
|
|
||||||||||||
| def aip80(self) -> str: | ||||||||||||
| return PrivateKey.format_private_key( | ||||||||||||
| self.hex(), asymmetric_crypto.PrivateKeyVariant.Secp256k1 | ||||||||||||
| ) | ||||||||||||
|
|
||||||||||||
| def public_key(self) -> PublicKey: | ||||||||||||
| return PublicKey(self.key.verifying_key) | ||||||||||||
| return PublicKey(self.key.public_key()) | ||||||||||||
|
|
||||||||||||
| @staticmethod | ||||||||||||
| def random() -> PrivateKey: | ||||||||||||
| return PrivateKey( | ||||||||||||
| SigningKey.generate(curve=SECP256k1, hashfunc=hashlib.sha3_256) | ||||||||||||
| ) | ||||||||||||
| return PrivateKey(ec.generate_private_key(ec.SECP256K1())) | ||||||||||||
|
|
||||||||||||
| def sign(self, data: bytes) -> Signature: | ||||||||||||
| sig = self.key.sign_deterministic(data, hashfunc=hashlib.sha3_256) | ||||||||||||
| n = SECP256k1.generator.order() | ||||||||||||
| r, s = util.sigdecode_string(sig, n) | ||||||||||||
| # Use deterministic ECDSA (RFC 6979) with SHA3-256 | ||||||||||||
| signature_der = self.key.sign(data, ec.ECDSA(hashes.SHA3_256())) | ||||||||||||
| # Decode DER signature to get r and s | ||||||||||||
| r, s = decode_dss_signature(signature_der) | ||||||||||||
|
|
||||||||||||
| # SECP256K1 curve order | ||||||||||||
| n = 0xFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFEBAAEDCE6AF48A03BBFD25E8CD0364141 | ||||||||||||
|
|
||||||||||||
| # The signature is valid for both s and -s, normalization ensures that only s < n // 2 is valid | ||||||||||||
| if s > (n // 2): | ||||||||||||
| mod_s = (s * -1) % n | ||||||||||||
| sig = util.sigencode_string(r, mod_s, n) | ||||||||||||
| return Signature(sig) | ||||||||||||
| s = n - s | ||||||||||||
|
|
||||||||||||
| # Encode r and s as raw bytes (32 bytes each) | ||||||||||||
| sig_bytes = r.to_bytes(32, byteorder="big") + s.to_bytes(32, byteorder="big") | ||||||||||||
| return Signature(sig_bytes) | ||||||||||||
|
|
||||||||||||
| @staticmethod | ||||||||||||
| def deserialize(deserializer: Deserializer) -> PrivateKey: | ||||||||||||
| key = deserializer.to_bytes() | ||||||||||||
| if len(key) != PrivateKey.LENGTH: | ||||||||||||
| raise Exception("Length mismatch") | ||||||||||||
|
|
||||||||||||
| return PrivateKey(SigningKey.from_string(key, SECP256k1, hashlib.sha3_256)) | ||||||||||||
| return PrivateKey(PrivateKey._from_bytes(key)) | ||||||||||||
|
|
||||||||||||
| def serialize(self, serializer: Serializer): | ||||||||||||
| serializer.to_bytes(self.key.to_string()) | ||||||||||||
| serializer.to_bytes(self._to_bytes()) | ||||||||||||
|
|
||||||||||||
|
|
||||||||||||
| class PublicKey(asymmetric_crypto.PublicKey): | ||||||||||||
| LENGTH: int = 64 | ||||||||||||
| LENGTH_WITH_PREFIX_LENGTH: int = 65 | ||||||||||||
|
|
||||||||||||
| key: VerifyingKey | ||||||||||||
| key: ec.EllipticCurvePublicKey | ||||||||||||
|
|
||||||||||||
| def __init__(self, key: VerifyingKey): | ||||||||||||
| def __init__(self, key: ec.EllipticCurvePublicKey): | ||||||||||||
| self.key = key | ||||||||||||
|
|
||||||||||||
| def __eq__(self, other: object): | ||||||||||||
| if not isinstance(other, PublicKey): | ||||||||||||
| return NotImplemented | ||||||||||||
| return self.key == other.key | ||||||||||||
| # Compare public key bytes | ||||||||||||
| return self.to_crypto_bytes() == other.to_crypto_bytes() | ||||||||||||
|
|
||||||||||||
| def __str__(self) -> str: | ||||||||||||
| return self.hex() | ||||||||||||
|
|
||||||||||||
| @staticmethod | ||||||||||||
| def _from_uncompressed_bytes(key_bytes: bytes) -> ec.EllipticCurvePublicKey: | ||||||||||||
| """Create public key from uncompressed format (64 or 65 bytes).""" | ||||||||||||
| # Handle optional 0x04 prefix | ||||||||||||
| if len(key_bytes) == PublicKey.LENGTH_WITH_PREFIX_LENGTH: | ||||||||||||
| if key_bytes[0] != 0x04: | ||||||||||||
| raise Exception("Invalid public key format") | ||||||||||||
| key_bytes = key_bytes[1:] | ||||||||||||
| elif len(key_bytes) != PublicKey.LENGTH: | ||||||||||||
| raise Exception("Length mismatch") | ||||||||||||
|
|
||||||||||||
| # Split into x and y coordinates | ||||||||||||
| x = int.from_bytes(key_bytes[:32], byteorder="big") | ||||||||||||
| y = int.from_bytes(key_bytes[32:], byteorder="big") | ||||||||||||
|
|
||||||||||||
| # Create public key from numbers | ||||||||||||
| public_numbers = ec.EllipticCurvePublicNumbers(x, y, ec.SECP256K1()) | ||||||||||||
| return public_numbers.public_key() | ||||||||||||
|
|
||||||||||||
| def _to_uncompressed_bytes(self) -> bytes: | ||||||||||||
| """Convert public key to uncompressed format (64 bytes, no prefix).""" | ||||||||||||
| public_numbers = self.key.public_numbers() | ||||||||||||
| x_bytes = public_numbers.x.to_bytes(32, byteorder="big") | ||||||||||||
| y_bytes = public_numbers.y.to_bytes(32, byteorder="big") | ||||||||||||
| return x_bytes + y_bytes | ||||||||||||
|
|
||||||||||||
| @staticmethod | ||||||||||||
| def from_str(value: str) -> PublicKey: | ||||||||||||
| if value[0:2] == "0x": | ||||||||||||
|
|
@@ -124,23 +174,30 @@ def from_str(value: str) -> PublicKey: | |||||||||||
| and len(value) != PublicKey.LENGTH_WITH_PREFIX_LENGTH * 2 | ||||||||||||
| ): | ||||||||||||
| raise Exception("Length mismatch") | ||||||||||||
| return PublicKey( | ||||||||||||
| VerifyingKey.from_string(bytes.fromhex(value), SECP256k1, hashlib.sha3_256) | ||||||||||||
| ) | ||||||||||||
| return PublicKey(PublicKey._from_uncompressed_bytes(bytes.fromhex(value))) | ||||||||||||
|
|
||||||||||||
| def hex(self) -> str: | ||||||||||||
| return f"0x04{self.key.to_string().hex()}" | ||||||||||||
| return f"0x04{self._to_uncompressed_bytes().hex()}" | ||||||||||||
|
|
||||||||||||
| def verify(self, data: bytes, signature: asymmetric_crypto.Signature) -> bool: | ||||||||||||
| try: | ||||||||||||
| signature = cast(Signature, signature) | ||||||||||||
| self.key.verify(signature.data(), data) | ||||||||||||
| sig_bytes = signature.data() | ||||||||||||
|
|
||||||||||||
|
||||||||||||
| # Ensure the raw signature is exactly 64 bytes (32 bytes for r, 32 bytes for s) | |
| if len(sig_bytes) != Signature.LENGTH: | |
| raise ValueError("Signature length mismatch") |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,7 +1,4 @@ | ||
| [mypy] | ||
|
|
||
| [mypy-ecdsa.*] | ||
| ignore_missing_imports = True | ||
|
|
||
| [mypy-python_graphql_client] | ||
| ignore_missing_imports = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The comment on line 95 states that deterministic ECDSA (RFC 6979) is being used, but this should be verified. In the cryptography library, the default
ec.ECDSA()algorithm uses RFC 6979 for deterministic signatures, which is correct. However, it would be good to add a test that verifies the signature is deterministic by checking that signing the same message multiple times produces the same signature.