From fe14008bbcf1ad4f044694914eab47b8c8c0672a Mon Sep 17 00:00:00 2001 From: IchiBan360 <79260104+IchiBan360@users.noreply.github.com> Date: Mon, 5 Aug 2024 15:15:31 +0300 Subject: [PATCH 1/7] python3 compatability Changes to make python-pyhsm work with python3 --- pyhsm/__init__.py | 2 +- pyhsm/aead_cmd.py | 8 ++-- pyhsm/aes_ecb_cmd.py | 2 +- pyhsm/base.py | 20 +++++----- pyhsm/basic_cmd.py | 14 +++---- pyhsm/buffer_cmd.py | 4 +- pyhsm/cmd.py | 12 +++--- pyhsm/db_cmd.py | 2 +- pyhsm/hmac_cmd.py | 4 +- pyhsm/ksm/db_export.py | 2 +- pyhsm/ksm/db_import.py | 2 +- pyhsm/ksm/import_keys.py | 52 ++++++++++++------------- pyhsm/ksm/yubikey_ksm.py | 31 +++++++-------- pyhsm/oath_hotp.py | 12 +++--- pyhsm/soft_hsm.py | 24 ++++++------ pyhsm/stick_client.py | 6 +-- pyhsm/stick_daemon.py | 20 +++++----- pyhsm/tools/decrypt_aead.py | 24 ++++++------ pyhsm/tools/generate_keys.py | 38 +++++++++--------- pyhsm/tools/keystore_unlock.py | 22 +++++------ pyhsm/tools/linux_add_entropy.py | 2 +- pyhsm/util.py | 34 ++++++++-------- pyhsm/val/init_oath_token.py | 28 +++++++------- pyhsm/val/validate_otp.py | 14 +++---- pyhsm/val/validation_server.py | 66 ++++++++++++++++---------------- pyhsm/validate_cmd.py | 2 +- pyhsm/yubikey.py | 22 +++++------ setup.py | 4 +- test/configure_hsm.py | 35 ++++++++--------- test/test_aead.py | 36 ++++++++--------- test/test_aes_ecb.py | 42 ++++++++++---------- test/test_basics.py | 26 ++++++------- test/test_buffer.py | 16 ++++---- test/test_common.py | 19 +++++---- test/test_db.py | 36 ++++++++--------- test/test_hmac.py | 64 +++++++++++++++---------------- test/test_init.py | 60 +++++++++++++++-------------- test/test_misc.py | 30 +++++++-------- test/test_oath.py | 8 ++-- test/test_otp_validate.py | 22 +++++------ test/test_soft_hsm.py | 46 +++++++++++----------- test/test_stick.py | 4 +- test/test_util.py | 22 +++++------ test/test_yubikey_validate.py | 34 ++++++++-------- 44 files changed, 485 insertions(+), 488 deletions(-) diff --git a/pyhsm/__init__.py b/pyhsm/__init__.py index cdde417..6a9a09b 100644 --- a/pyhsm/__init__.py +++ b/pyhsm/__init__.py @@ -41,7 +41,7 @@ See help(pyhsm.base) (L{pyhsm.base.YHSM}) for more information. """ -__version__ = '1.2.2-dev0' +__version__ = '1.2.2' __copyright__ = 'Yubico AB' __organization__ = 'Yubico' __license__ = 'BSD' diff --git a/pyhsm/aead_cmd.py b/pyhsm/aead_cmd.py index 8b916ca..8ca2c07 100644 --- a/pyhsm/aead_cmd.py +++ b/pyhsm/aead_cmd.py @@ -45,7 +45,7 @@ def __repr__(self): return '<%s instance at %s: nonce=%s, key_handle=0x%x, status=%s>' % ( self.__class__.__name__, hex(id(self)), - self.nonce.encode('hex'), + self.nonce, self.key_handle, pyhsm.defines.status2str(self.status) ) @@ -200,9 +200,9 @@ def __init__(self, nonce, key_handle, aead): self.data = aead def __repr__(self): - nonce_str = "None" + nonce_str = b"None" if self.nonce is not None: - nonce_str = self.nonce.encode('hex') + nonce_str = self.nonce return '<%s instance at %s: nonce=%s, key_handle=0x%x, data=%i bytes>' % ( self.__class__.__name__, hex(id(self)), @@ -262,4 +262,4 @@ def pack(self): # uint8_t key[KEY_SIZE]; // AES key # uint8_t uid[UID_SIZE]; // Unique (secret) ID # } YUBIKEY_SECRETS; - return self.key + self.uid.ljust(pyhsm.defines.UID_SIZE, chr(0)) + return self.key + self.uid.ljust(pyhsm.defines.UID_SIZE, b'\x00') diff --git a/pyhsm/aes_ecb_cmd.py b/pyhsm/aes_ecb_cmd.py index 3f8b28a..d8424de 100644 --- a/pyhsm/aes_ecb_cmd.py +++ b/pyhsm/aes_ecb_cmd.py @@ -78,7 +78,7 @@ def __init__(self, stick, key_handle, plaintext): # uint8_t plaintext[YHSM_BLOCK_SIZE]; // Plaintext block # } YHSM_ECB_BLOCK_ENCRYPT_REQ; payload = struct.pack('' % ( @@ -119,7 +119,7 @@ def parse_result(self, data): # uint8_t numBytes; // Number of bytes generated # uint8_t rnd[YSM_MAX_PKT_SIZE - 1]; // Random data # } YHSM_RANDOM_GENERATE_RESP; - num_bytes = pyhsm.util.validate_cmd_response_int('num_bytes', ord(data[0]), self.num_bytes) + num_bytes = pyhsm.util.validate_cmd_response_int('num_bytes', data[0], self.num_bytes) return data[1:1 + num_bytes] @@ -256,7 +256,7 @@ def __init__(self, stick, password=''): # typedef struct { # uint8_t password[YSM_BLOCK_SIZE]; // Unlock password # } YSM_KEY_STORAGE_UNLOCK_REQ; - packed = payload.ljust(pyhsm.defines.YSM_BLOCK_SIZE, chr(0x0)) + packed = payload.ljust(pyhsm.defines.YSM_BLOCK_SIZE, b'\x00') YHSM_Cmd.__init__(self, stick, pyhsm.defines.YSM_KEY_STORAGE_UNLOCK, packed) def parse_result(self, data): @@ -297,7 +297,7 @@ def __init__(self, stick, key=''): # typedef struct { # uint8_t key[YSM_MAX_KEY_SIZE]; // Key store decryption key # } YSM_KEY_STORE_DECRYPT_REQ; - packed = payload.ljust(pyhsm.defines.YSM_MAX_KEY_SIZE, chr(0x0)) + packed = payload.ljust(pyhsm.defines.YSM_MAX_KEY_SIZE, b'\x00') YHSM_Cmd.__init__(self, stick, pyhsm.defines.YSM_KEY_STORE_DECRYPT, packed) def parse_result(self, data): @@ -397,7 +397,7 @@ def __repr__(self): return '<%s instance at %s: nonce=%s, pu_count=%i, volatile=%i>' % ( self.__class__.__name__, hex(id(self)), - self.nonce.encode('hex'), + self.nonce, self.pu_count, self.volatile ) diff --git a/pyhsm/buffer_cmd.py b/pyhsm/buffer_cmd.py index 95492dd..2c1cd18 100644 --- a/pyhsm/buffer_cmd.py +++ b/pyhsm/buffer_cmd.py @@ -58,7 +58,7 @@ def parse_result(self, data): # typedef struct { # uint8_t numBytes; // Number of bytes in buffer now # } YSM_BUFFER_LOAD_RESP; - count = ord(data[0]) + count = data[0] if self.offset == 0: # if offset was 0, the buffer was reset and # we can verify the length returned @@ -90,7 +90,7 @@ def parse_result(self, data): # typedef struct { # uint8_t numBytes; // Number of bytes in buffer now # } YSM_BUFFER_LOAD_RESP; - count = ord(data[0]) + count = data[0] if self.offset == 0: # if offset was 0, the buffer was reset and # we can verify the length returned diff --git a/pyhsm/cmd.py b/pyhsm/cmd.py index 180c01c..d40296e 100644 --- a/pyhsm/cmd.py +++ b/pyhsm/cmd.py @@ -61,7 +61,7 @@ def execute(self, read_response=True): # YSM_NULL is the exception to the rule - it should NOT be prefixed with YSM_PKT.bcnt cmd_buf = struct.pack('BB', len(self.payload) + 1, self.command) else: - cmd_buf = chr(self.command) + cmd_buf = bytes([self.command]) cmd_buf += self.payload debug_info = None unlock = self.stick.acquire() @@ -127,16 +127,16 @@ def _handle_invalid_read_response(self, res, expected_len): if not res: reset(self.stick) raise pyhsm.exception.YHSM_Error('YubiHSM did not respond to command %s' \ - % (pyhsm.defines.cmd2str(self.command)) ) + % (pyhsm.defines.cmd2str(self.command))) # try to check if it is a YubiHSM in configuration mode - self.stick.write('\r\r\r', '(mode test)') - res2 = self.stick.read(50) # expect a timeout + self.stick.write(b'\r\r\r', '(mode test)') + res2 = self.stick.read(50).decode() # expect a timeout lines = res2.split('\n') for this in lines: if re.match('^(NO_CFG|WSAPI|HSM).*> .*', this): raise pyhsm.exception.YHSM_Error('YubiHSM is in configuration mode') - raise pyhsm.exception.YHSM_Error('Unknown response from serial device %s : "%s"' \ - % (self.stick.device, res.encode('hex'))) + raise pyhsm.exception.YHSM_Error('Unknown response from serial device %s, %s : "%s"' \ + % (self.stick.device, res, len(res.encode()))) def parse_result(self, data): """ diff --git a/pyhsm/db_cmd.py b/pyhsm/db_cmd.py index 8f5a89f..1cfa863 100644 --- a/pyhsm/db_cmd.py +++ b/pyhsm/db_cmd.py @@ -105,7 +105,7 @@ def __repr__(self): return '<%s instance at %s: public_id=%s, status=0x%x>' % ( self.__class__.__name__, hex(id(self)), - self.public_id.encode('hex'), + bytes.fromhex(self.public_id), self.status ) else: diff --git a/pyhsm/hmac_cmd.py b/pyhsm/hmac_cmd.py index 1946c81..d1cec42 100644 --- a/pyhsm/hmac_cmd.py +++ b/pyhsm/hmac_cmd.py @@ -133,6 +133,8 @@ def __repr__(self): self.__class__.__name__, hex(id(self)), self.key_handle, - self.hash_result[:4].encode('hex'), + self.hash_result[:4], self.final, ) + + diff --git a/pyhsm/ksm/db_export.py b/pyhsm/ksm/db_export.py index 22f3d58..7ff94cd 100644 --- a/pyhsm/ksm/db_export.py +++ b/pyhsm/ksm/db_export.py @@ -18,7 +18,7 @@ def insert_slash(string, every=2): """insert_slash insert / every 2 char""" - return os.path.join(string[i:i+every] for i in xrange(0, len(string), every)) + return os.path.join(string[i:i+every] for i in range(0, len(string), every)) def mkdir_p(path): diff --git a/pyhsm/ksm/db_import.py b/pyhsm/ksm/db_import.py index 21fc97d..28debcd 100644 --- a/pyhsm/ksm/db_import.py +++ b/pyhsm/ksm/db_import.py @@ -97,7 +97,7 @@ def main(): aead.key_handle = key_handle_to_int(keyhandle) if not insert_query(connection, public_id, aead, keyhandle, aeadobj): - print("WARNING: could not insert %s" % public_id) + print(("WARNING: could not insert %s" % public_id)) #close sqlalchemy connection.close() diff --git a/pyhsm/ksm/import_keys.py b/pyhsm/ksm/import_keys.py index 1655540..90c2f89 100644 --- a/pyhsm/ksm/import_keys.py +++ b/pyhsm/ksm/import_keys.py @@ -121,7 +121,7 @@ def args_fixup(args): sys.exit(1) if args.aes_key: - args.aes_key = args.aes_key.decode('hex') + args.aes_key = bytes.fromhex(args.aes_key).decode() keyhandles_fixup(args) @@ -154,29 +154,29 @@ def import_keys(hsm, args): l = line.split(',') modhex_id = l[1] - uid = l[2].decode('hex') - key = l[3].decode('hex') + uid = l[2] + key = l[3] if modhex_id and uid and key: public_id = pyhsm.yubikey.modhex_decode(modhex_id) padded_id = modhex_id.rjust(args.public_id_chars, 'c') if int(public_id, 16) == 0: - print "WARNING: Skipping import of key with public ID: %s" % (padded_id) - print "This public ID is unsupported by the YubiHSM.\n" + print("WARNING: Skipping import of key with public ID: %s" % (padded_id)) + print("This public ID is unsupported by the YubiHSM.\n") continue if args.verbose: - print " %s" % (padded_id) + print(" %s" % (padded_id)) secret = pyhsm.aead_cmd.YHSM_YubiKeySecret(key, uid) hsm.load_secret(secret) - for kh in args.key_handles.keys(): + for kh in list(args.key_handles.keys()): if(args.random_nonce): - nonce = "" + nonce = b"" else: - nonce = public_id.decode('hex') + nonce = bytes.fromhex(public_id) aead = hsm.generate_aead(nonce, kh) if args.internal_db: @@ -189,43 +189,43 @@ def import_keys(hsm, args): args.output_dir, args.key_handles[kh], padded_id) if args.verbose: - print " %4s, %i bytes (%s) -> %s" % \ - (args.key_handles[kh], len(aead.data), shorten_aead(aead), filename) + print(" %4s, %i bytes (%s) -> %s" % \ + (args.key_handles[kh], len(aead.data), shorten_aead(aead), filename)) aead.save(filename) if args.verbose: - print "" + print("") if res: - print "\nDone\n" + print("\nDone\n") else: - print "\nDone (one or more entries rejected)" + print("\nDone (one or more entries rejected)") return res def store_in_internal_db(args, hsm, modhex_id, public_id, kh, aead): """ Store record (AEAD) in YubiHSM internal DB """ if args.verbose: - print " %i bytes (%s) -> internal db..." % \ - (len(aead.data), shorten_aead(aead)), + print(" %i bytes (%s) -> internal db..." % \ + (len(aead.data), shorten_aead(aead)), end=' ') try: - hsm.db_store_yubikey(public_id.decode('hex'), kh, aead) + hsm.db_store_yubikey(bytes.fromhex(public_id), kh, aead) if args.verbose: - print "OK" + print("OK") except pyhsm.exception.YHSM_CommandFailed as e: if args.verbose: - print "%s" % (pyhsm.defines.status2str(e.status)) + print("%s" % (pyhsm.defines.status2str(e.status))) else: - print "Storing ID %s FAILED: %s" % (modhex_id, pyhsm.defines.status2str(e.status)) + print("Storing ID %s FAILED: %s" % (modhex_id, pyhsm.defines.status2str(e.status))) return False return True def shorten_aead(aead): """ Produce pretty-printable version of long AEAD. """ - head = aead.data[:4].encode('hex') - tail = aead.data[-4:].encode('hex') + head = aead.data[:4] + tail = aead.data[-4:] return "%s...%s" % (head, tail) @@ -253,10 +253,10 @@ def main(): "Did not get '# ykksm 1' header as first line of input.\n") sys.exit(1) - print "output dir : %s" % (args.output_dir) - print "key handles : %s" % (args.key_handles) - print "YHSM device : %s" % (args.device) - print "" + print("output dir : %s" % (args.output_dir)) + print("key handles : %s" % (args.key_handles)) + print("YHSM device : %s" % (args.device)) + print("") if args.aes_key: keys = {kh: args.aes_key for kh in args.key_handles} diff --git a/pyhsm/ksm/yubikey_ksm.py b/pyhsm/ksm/yubikey_ksm.py index 32d5d9b..4373697 100644 --- a/pyhsm/ksm/yubikey_ksm.py +++ b/pyhsm/ksm/yubikey_ksm.py @@ -35,7 +35,7 @@ import os import sys -import BaseHTTPServer +import http.server import socket import argparse import syslog @@ -68,7 +68,7 @@ context = daemon.DaemonContext() -class YHSM_KSMRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): +class YHSM_KSMRequestHandler(http.server.BaseHTTPRequestHandler): """ Handle a HTTP request. @@ -87,7 +87,7 @@ def __init__(self, hsm, aead_backend, args, *other_args, **kwargs): self.timeout = args.reqtimeout self.aead_backend = aead_backend self.proxy_ips = args.proxies - BaseHTTPServer.BaseHTTPRequestHandler.__init__(self, *other_args, **kwargs) + http.server.BaseHTTPRequestHandler.__init__(self, *other_args, **kwargs) def do_GET(self): """ Handle a HTTP GET request. """ @@ -198,7 +198,7 @@ def load_aead(self, public_id): try: aead.load(filename) if not aead.nonce: - aead.nonce = pyhsm.yubikey.modhex_decode(public_id).decode('hex') + aead.nonce = bytes.fromhex(pyhsm.yubikey.modhex_decode(public_id)) return aead except IOError: continue @@ -206,11 +206,10 @@ def load_aead(self, public_id): class SQLBackend(object): - def __init__(self, db_url, key_handles): - self.engine = sqlalchemy.create_engine(db_url, pool_pre_ping=True) + def __init__(self, db_url): + self.engine = sqlalchemy.create_engine(db_url) metadata = sqlalchemy.MetaData() self.aead_table = sqlalchemy.Table('aead_table', metadata, autoload=True, autoload_with=self.engine) - self.key_handles = key_handles def load_aead(self, public_id): """ Loads AEAD from the specified database. """ @@ -218,9 +217,7 @@ def load_aead(self, public_id): trans = connection.begin() try: - s = sqlalchemy.select([self.aead_table]).where( - (self.aead_table.c.public_id == public_id) - & self.aead_table.c.keyhandle.in_([kh[1] for kh in self.key_handles])) + s = sqlalchemy.select([self.aead_table]).where(self.aead_table.c.public_id == public_id) result = connection.execute(s) for row in result: @@ -229,21 +226,21 @@ def load_aead(self, public_id): aead.data = row['aead'] aead.nonce = row['nonce'] return aead - except Exception as e: + except Exception: trans.rollback() raise Exception("No AEAD in DB for public_id %s (%s)" % (public_id, str(e))) finally: connection.close() -class YHSM_KSMServer(BaseHTTPServer.HTTPServer): +class YHSM_KSMServer(http.server.HTTPServer): """ Wrapper class to properly initialize address_family for IPv6 addresses. """ def __init__(self, server_address, req_handler): if ":" in server_address[0]: self.address_family = socket.AF_INET6 - BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler) + http.server.HTTPServer.__init__(self, server_address, req_handler) def aead_filename(aead_dir, key_handle, public_id): @@ -432,7 +429,7 @@ def main(): if args.db_url: # Using an SQL database for AEADs try: - aead_backend = SQLBackend(args.db_url, args.key_handles) + aead_backend = SQLBackend(args.db_url) except Exception as e: my_log_message(args.debug or args.verbose, syslog.LOG_ERR, 'Could not connect to database "%s" : %s' % (args.db_url, e)) @@ -479,9 +476,9 @@ def main(): try: run(hsm, aead_backend, args) except KeyboardInterrupt: - print "" - print "Shutting down" - print "" + print("") + print("Shutting down") + print("") if __name__ == '__main__': diff --git a/pyhsm/oath_hotp.py b/pyhsm/oath_hotp.py index e85b810..66430ad 100644 --- a/pyhsm/oath_hotp.py +++ b/pyhsm/oath_hotp.py @@ -34,7 +34,7 @@ def search_for_oath_code(hsm, key_handle, nonce, aead, counter, user_code, look_ hsm.load_temp_key(nonce, key_handle, aead) # User might have produced codes never sent to us, so we support trying look_ahead # codes to see if we find the user's current code. - for j in xrange(look_ahead): + for j in range(look_ahead): this_counter = counter + j secret = struct.pack("> Q", this_counter) hmac_result = hsm.hmac_sha1(pyhsm.defines.YSM_TEMP_KEY_HANDLE, secret).get_hash() @@ -46,9 +46,9 @@ def search_for_oath_code(hsm, key_handle, nonce, aead, counter, user_code, look_ def truncate(hmac_result, length=6): """ Perform the truncating. """ assert(len(hmac_result) == 20) - offset = ord(hmac_result[19]) & 0xf - bin_code = (ord(hmac_result[offset]) & 0x7f) << 24 \ - | (ord(hmac_result[offset+1]) & 0xff) << 16 \ - | (ord(hmac_result[offset+2]) & 0xff) << 8 \ - | (ord(hmac_result[offset+3]) & 0xff) + offset = hmac_result[19] & 0xf + bin_code = (hmac_result[offset] & 0x7f) << 24 \ + | (hmac_result[offset+1] & 0xff) << 16 \ + | (hmac_result[offset+2] & 0xff) << 8 \ + | (hmac_result[offset+3] & 0xff) return bin_code % (10 ** length) diff --git a/pyhsm/soft_hsm.py b/pyhsm/soft_hsm.py index 35d80e6..f594b36 100644 --- a/pyhsm/soft_hsm.py +++ b/pyhsm/soft_hsm.py @@ -25,7 +25,7 @@ def _xor_block(a, b): """ XOR two blocks of equal length. """ - return ''.join([chr(ord(x) ^ ord(y)) for (x, y) in zip(a, b)]) + return bytes([x ^ y for x, y in zip(a, b)]) class _ctr_counter(): @@ -46,7 +46,7 @@ def next(self): return self.pack() def pack(self): - fmt = b'< B I %is BBB 2s' % (pyhsm.defines.YSM_AEAD_NONCE_SIZE) + fmt = '< B I %is BBB 2s' % (pyhsm.defines.YSM_AEAD_NONCE_SIZE) val = struct.pack('> H', self.value) return struct.pack(fmt, self.flags, @@ -62,14 +62,14 @@ def __init__(self, key, key_handle, nonce, data_len): """ Initialize CBC-MAC like the YubiHSM does. """ - flags = (((pyhsm.defines.YSM_AEAD_MAC_SIZE - 2) / 2) << 3) | (pyhsm.defines.YSM_CCM_CTR_SIZE - 1) + flags = (((pyhsm.defines.YSM_AEAD_MAC_SIZE - 2) // 2) << 3) | (pyhsm.defines.YSM_CCM_CTR_SIZE - 1) t = _ctr_counter(key_handle, nonce, flags = flags, value = data_len) t_mac = t.pack() self.mac_aes = AES.new(key, AES.MODE_ECB) self.mac = self.mac_aes.encrypt(t_mac) def update(self, block): - block = block.ljust(pyhsm.defines.YSM_BLOCK_SIZE, chr(0x0)) + block = block.ljust(pyhsm.defines.YSM_BLOCK_SIZE, b'\x00') t1 = _xor_block(self.mac, block) t2 = self.mac_aes.encrypt(t1) self.mac = t2 @@ -103,7 +103,7 @@ def aesCCM(key, key_handle, nonce, data, decrypt=False): mac = _cbc_mac(key, key_handle, nonce, len(data)) counter = _ctr_counter(key_handle, nonce, value = 0) - ctr_aes = AES.new(key, AES.MODE_CTR, counter = counter.next) + ctr_aes = AES.new(key, AES.MODE_CTR, initial_value = counter.next(), nonce = b'') out = [] while data: (thisblock, data) = _split_data(data, pyhsm.defines.YSM_BLOCK_SIZE) @@ -126,7 +126,7 @@ def aesCCM(key, key_handle, nonce, data, decrypt=False): raise pyhsm.exception.YHSM_Error('AEAD integrity check failed') else: out.append(mac.get()) - return ''.join(out) + return b''.join(out) def crc16(data): @@ -135,7 +135,7 @@ def crc16(data): """ m_crc = 0xffff for this in data: - m_crc ^= ord(this) + m_crc ^= this for _ in range(8): j = m_crc & 1 m_crc >>= 1 @@ -150,7 +150,7 @@ def __init__(self, keys, debug=False): self.debug = debug if not keys: raise ValueError('Data contains no key handles!') - for k, v in keys.items(): + for k, v in list(keys.items()): if len(v) not in AES.key_size: raise ValueError('Keyhandle of unsupported length: %d (was %d bytes)' % (k, len(v))) self.keys = keys @@ -166,8 +166,8 @@ def from_json(cls, data, debug=False): if not isinstance(data, dict): raise ValueError('Data does not contain object as root element.') keys = {} - for kh, aes_key_hex in data.items(): - keys[int(kh)] = aes_key_hex.decode('hex') + for kh, aes_key_hex in list(data.items()): + keys[int(kh)] = bytes.fromhex(aes_key_hex) return cls(keys, debug) def _get_key(self, kh, cmd): @@ -208,9 +208,9 @@ def load_random(self, num_bytes, offset = 0): self._buffer = self._buffer[:offset] + os.urandom(num_bytes) def generate_aead(self, nonce, key_handle): - if nonce == "": + if nonce == b"": # no hardware to generate it for us, so do it here. - nonce = os.urandom(6) + nonce = os.urandom(6).encode() aes_key = self._get_key(key_handle, pyhsm.defines.YSM_BUFFER_AEAD_GENERATE) ct = pyhsm.soft_hsm.aesCCM(aes_key, key_handle, nonce, self._buffer, False) diff --git a/pyhsm/stick_client.py b/pyhsm/stick_client.py index 79e8746..35ebff3 100644 --- a/pyhsm/stick_client.py +++ b/pyhsm/stick_client.py @@ -33,13 +33,13 @@ def pack_data(data): - if isinstance(data, basestring): + if isinstance(data, str): return data.encode('base64') return data def unpack_data(data): - if isinstance(data, basestring): + if isinstance(data, str): return data.decode('base64') elif isinstance(data, dict) and 'error' in data: return pyhsm.exception.YHSM_Error(data['error']) @@ -52,7 +52,7 @@ def read_sock(sf): def write_sock(sf, cmd, *args): - json.dump([cmd] + map(pack_data, args), sf) + json.dump([cmd] + list(map(pack_data, args)), sf) sf.write("\n") sf.flush() diff --git a/pyhsm/stick_daemon.py b/pyhsm/stick_daemon.py index 62560f8..01690dd 100644 --- a/pyhsm/stick_daemon.py +++ b/pyhsm/stick_daemon.py @@ -41,13 +41,13 @@ context = daemon.DaemonContext() def pack_data(data): - if isinstance(data, basestring): + if isinstance(data, str): return data.encode('base64') return data def unpack_data(data): - if isinstance(data, basestring): + if isinstance(data, str): return data.decode('base64') return data @@ -85,7 +85,7 @@ def serve(self): args=(cs,)) thread.start() except Exception as e: - print e + print(e) sys.exit(1) def invoke(self, cmd, *args): @@ -95,7 +95,7 @@ def invoke(self, cmd, *args): res = getattr(self._stick, COMMANDS[cmd])(*args) except Exception as e: res = e - print e + print(e) self._stick = None return res @@ -107,7 +107,7 @@ def client_handler(self, socket): while True: data = json.loads(socket_file.readline()) cmd = data[0] - args = map(unpack_data, data[1:]) + args = list(map(unpack_data, data[1:])) if cmd == CMD_LOCK: if not has_lock: self.lock.acquire() @@ -123,7 +123,7 @@ def client_handler(self, socket): socket_file.flush() else: err = 'Command run without holding lock!' - print err + print(err) json.dump({'error': err}, socket_file) socket_file.write("\n") socket_file.flush() @@ -161,12 +161,12 @@ def main(): args = parser.parse_args() - print 'Starting YubiHSM daemon for device: %s, listening on: %s:%d' % \ - (args.device, args.interface, args.port) + print('Starting YubiHSM daemon for device: %s, listening on: %s:%d' % \ + (args.device, args.interface, args.port)) server = YHSM_Stick_Server(args.device, (args.interface, args.port)) - print 'You can connect to the server using the following device string:' - print 'yhsm://127.0.0.1:%d' % args.port + print('You can connect to the server using the following device string:') + print('yhsm://127.0.0.1:%d' % args.port) server.pidfile = args.pid_file context.files_preserve = [server.socket] diff --git a/pyhsm/tools/decrypt_aead.py b/pyhsm/tools/decrypt_aead.py index 3476b5d..7760694 100644 --- a/pyhsm/tools/decrypt_aead.py +++ b/pyhsm/tools/decrypt_aead.py @@ -101,7 +101,7 @@ def parse_args(): args = parser.parse_args() # argument fixups args.format = args.format.lower() - args.aes_key = args.aes_key.decode('hex') + args.aes_key = bytes.fromhex(args.aes_key) if args.key_handle: args.key_handle = pyhsm.util.key_handle_to_int(args.key_handle) if args.start_id is not None: @@ -133,7 +133,7 @@ def parse_args(): sys.stderr.write("error: --key-handle-out is required when using --format aead.\n") return False # argument fixups - args.aes_key_out = args.aes_key_out.decode('hex') + args.aes_key_out = bytes.fromhex(args.aes_key_out) args.key_handle_out_orig = args.key_handle_out # save to use in AEAD output paths args.key_handle_out = pyhsm.util.key_handle_to_int(args.key_handle_out) return args @@ -192,18 +192,18 @@ def process_file(path, fn, args, state): state.log_failed(full_fn) return False aead.key_handle = args.key_handle - aead.nonce = pyhsm.yubikey.modhex_decode(fn).decode('hex') + aead.nonce = bytes.fromhex(pyhsm.yubikey.modhex_decode(fn)) if args.debug: sys.stderr.write("%s\n" % aead) - sys.stderr.write("AEAD len %i : %s\n" % (len(aead.data), aead.data.encode('hex'))) + sys.stderr.write("AEAD len %i : %s\n" % (len(aead.data), aead.data)) pt = pyhsm.soft_hsm.aesCCM(args.aes_key, aead.key_handle, aead.nonce, aead.data, decrypt = True) if args.print_filename: - print("%s " % (full_fn)), + print(("%s " % (full_fn)), end=' ') if args.format == 'raw': - print(pt.encode('hex')) + print(pt) elif args.format == 'aead': # encrypt secrets with new key ct = pyhsm.soft_hsm.aesCCM(args.aes_key_out, args.key_handle_out, aead.nonce, pt, decrypt = False) @@ -211,7 +211,7 @@ def process_file(path, fn, args, state): filename = aead_filename(args.output_dir, args.key_handle_out_orig, fn) aead_out.save(filename) if args.print_filename: - print "" + print("") elif args.format == 'yubikey-csv': key = pt[:pyhsm.defines.KEY_SIZE] uid = pt[pyhsm.defines.KEY_SIZE:] @@ -219,13 +219,13 @@ def process_file(path, fn, args, state): timestamp = '' global yknum yknum += 1 - print("%i,%s,%s,%s,%s,%s,,,,," % (yknum, + print(("%i,%s,%s,%s,%s,%s,,,,," % (yknum, fn, - uid.encode('hex'), - key.encode('hex'), + uid, + key, access_code, timestamp, - )) + ))) state.log_success(full_fn) @@ -254,7 +254,7 @@ def safe_process_files(path, files, args, state): try: if not process_file(path, fn, args, state): return False - except Exception, e: + except Exception as e: sys.stderr.write("error: %s\n%s\n" % (os.path.join(path, fn), traceback.format_exc())) state.log_failed(full_fn) if state.should_quit(): diff --git a/pyhsm/tools/generate_keys.py b/pyhsm/tools/generate_keys.py index 7b1d99d..3f044ca 100644 --- a/pyhsm/tools/generate_keys.py +++ b/pyhsm/tools/generate_keys.py @@ -138,42 +138,42 @@ def gen_keys(hsm, args): """ if args.verbose: - print "Generating %i keys :\n" % (args.count) + print("Generating %i keys :\n" % (args.count)) else: - print "Generating %i keys" % (args.count) + print("Generating %i keys" % (args.count)) for int_id in range(args.start_id, args.start_id + args.count): public_id = ("%x" % int_id).rjust(args.public_id_chars, '0') padded_id = pyhsm.yubikey.modhex_encode(public_id) if args.verbose: - print " %s" % (padded_id) + print(" %s" % (padded_id)) - num_bytes = len(pyhsm.aead_cmd.YHSM_YubiKeySecret('a' * 16, 'b' * 6).pack()) + num_bytes = len(pyhsm.aead_cmd.YHSM_YubiKeySecret(b'a' * 16, b'b' * 6).pack()) hsm.load_random(num_bytes) - for kh in args.key_handles.keys(): + for kh in list(args.key_handles.keys()): if args.random_nonce: - nonce = "" + nonce = b"" else: - nonce = public_id.decode('hex') + nonce = bytes.fromhex(public_id) aead = hsm.generate_aead(nonce, kh) filename = output_filename(args.output_dir, args.key_handles[kh], padded_id) if args.verbose: - print " %4s, %i bytes (%s) -> %s" % \ - (args.key_handles[kh], len(aead.data), shorten_aead(aead), filename) + print(" %4s, %i bytes (%s) -> %s" % \ + (args.key_handles[kh], len(aead.data), shorten_aead(aead), filename)) aead.save(filename) if args.verbose: - print "" + print("") - print "\nDone\n" + print("\nDone\n") def shorten_aead(aead): """ Produce pretty-printable version of long AEAD. """ - head = aead.data[:4].encode('hex') - tail = aead.data[-4:].encode('hex') + head = aead.data[:4] + tail = aead.data[-4:] return "%s...%s" % (head, tail) def output_filename(output_dir, key_handle, public_id): @@ -195,12 +195,12 @@ def main(): args_fixup(args) - print "output dir : %s" % (args.output_dir) - print "keys to generate : %s" % (args.count) - print "key handles : %s" % (args.key_handles) - print "start public_id : %s (0x%x)" % (args.start_id, args.start_id) - print "YHSM device : %s" % (args.device) - print "" + print("output dir : %s" % (args.output_dir)) + print("keys to generate : %s" % (args.count)) + print("key handles : %s" % (args.key_handles)) + print("start public_id : %s (0x%x)" % (args.start_id, args.start_id)) + print("YHSM device : %s" % (args.device)) + print("") if os.path.isfile(args.device): hsm = pyhsm.soft_hsm.SoftYHSM.from_file(args.device) diff --git a/pyhsm/tools/keystore_unlock.py b/pyhsm/tools/keystore_unlock.py index 2e8134e..c3b1d6a 100644 --- a/pyhsm/tools/keystore_unlock.py +++ b/pyhsm/tools/keystore_unlock.py @@ -67,12 +67,12 @@ def get_password(hsm, args): password = password[:-1] else: if args.debug: - password = raw_input('Enter %s (press enter to skip) (will be echoed) : ' % (name)) + password = input('Enter %s (press enter to skip) (will be echoed) : ' % (name)) else: password = getpass.getpass('Enter %s (press enter to skip) : ' % (name)) if len(password) <= expected_len: - password = password.decode('hex') + password = bytes.fromhex(password) if not password: return None return password @@ -91,7 +91,7 @@ def get_otp(hsm, args): while otp and otp[-1] == '\n': otp = otp[:-1] else: - otp = raw_input('Enter admin YubiKey OTP (press enter to skip) : ') + otp = input('Enter admin YubiKey OTP (press enter to skip) : ') if len(otp) == 44: # YubiHSM admin OTP's always have a public_id length of 6 bytes return otp @@ -109,25 +109,23 @@ def main(): hsm = pyhsm.base.YHSM(device=args.device, debug=args.debug) if args.debug or args.verbose: - print "Device : %s" % (args.device) - print "Version : %s" % (hsm.info()) - print "" + print("Device : %s" % (args.device)) + print("Version : %s" % (hsm.info())) + print("") password = get_password(hsm, args) otp = get_otp(hsm, args) if not password and not otp: - print "\nAborted\n" + print("\nAborted\n") return 1 else: if args.debug or args.verbose: - print "" + print("") if hsm.unlock(password = password, otp = otp): if args.debug or args.verbose: - print "OK\n" - except pyhsm.exception.YHSM_Error, e: + print("OK\n") + except pyhsm.exception.YHSM_Error as e: sys.stderr.write("ERROR: %s\n" % (e.reason)) - if e.reason == "YubiHSM did not respond to command YSM_SYSTEM_INFO_QUERY": - sys.stderr.write("Please check whether your YubiHSM is really at " + args.device + ", you can specify an alternate device using the option -D") return 1 return 0 diff --git a/pyhsm/tools/linux_add_entropy.py b/pyhsm/tools/linux_add_entropy.py index e328f07..4bcb337 100644 --- a/pyhsm/tools/linux_add_entropy.py +++ b/pyhsm/tools/linux_add_entropy.py @@ -72,7 +72,7 @@ def get_entropy(hsm, iterations, entropy_ratio): # __u32 buf[0]; # }; fmt = 'ii%is' % (pyhsm.defines.YSM_MAX_PKT_SIZE - 1) - for _ in xrange(iterations): + for _ in range(iterations): rnd = hsm.random(pyhsm.defines.YSM_MAX_PKT_SIZE - 1) this = struct.pack(fmt, entropy_ratio * len(rnd), len(rnd), rnd) fcntl.ioctl(fd, RNDADDENTROPY, this) diff --git a/pyhsm/util.py b/pyhsm/util.py index c7e743c..97eab61 100644 --- a/pyhsm/util.py +++ b/pyhsm/util.py @@ -26,14 +26,14 @@ def hexdump(src, length=8): offset = 0 result = '' for this in group(src, length): - hex_s = ' '.join(["%02x" % ord(x) for x in this]) + hex_s = ' '.join(["%02x" % x for x in this]) result += "%04X %s\n" % (offset, hex_s) offset += length return result def group(data, num): """ Split data into chunks of num chars each """ - return [data[i:i+num] for i in xrange(0, len(data), num)] + return [data[i:i+num] for i in range(0, len(data), num)] def key_handle_to_int(this): """ @@ -50,14 +50,14 @@ def key_handle_to_int(this): if this[:2] == "0x": return int(this, 16) if (len(this) == 4): - num = struct.unpack(' max_len: raise pyhsm.exception.YHSM_InputTooLong(name, max_len, len(string)) if exact_len != None and len(string) != exact_len: @@ -73,25 +73,25 @@ def input_validate_int(value, name, max_value=None): return value def input_validate_nonce(nonce, name='nonce', pad = False): - """ Input validation for nonces. """ - if type(nonce) is not str: + """ Input validation for nonces. NOW CHANGED TO BYTES""" + if type(nonce) is not bytes: raise pyhsm.exception.YHSM_WrongInputType( \ - name, str, type(nonce)) + name, bytes, type(nonce)) if len(nonce) > pyhsm.defines.YSM_AEAD_NONCE_SIZE: raise pyhsm.exception.YHSM_InputTooLong( name, pyhsm.defines.YSM_AEAD_NONCE_SIZE, len(nonce)) if pad: - return nonce.ljust(pyhsm.defines.YSM_AEAD_NONCE_SIZE, chr(0x0)) + return nonce.ljust(pyhsm.defines.YSM_AEAD_NONCE_SIZE, b'\x00') else: return nonce def input_validate_key_handle(key_handle, name='key_handle'): - """ Input validation for key_handles. """ - if type(key_handle) is not int: + """ Input validation for key_handles. NOW CHANGED TO BYTES""" + if type(key_handle) is not bytes: try: return key_handle_to_int(key_handle) except pyhsm.exception.YHSM_Error: - raise pyhsm.exception.YHSM_WrongInputType(name, int, type(key_handle)) + raise pyhsm.exception.YHSM_WrongInputType(name, bytes, type(key_handle)) return key_handle def input_validate_yubikey_secret(data, name='data'): @@ -140,8 +140,8 @@ def validate_cmd_response_str(name, got, expected, hex_encode=True): """ if got != expected: if hex_encode: - got_s = got.encode('hex') - exp_s = expected.encode('hex') + got_s = got + exp_s = expected else: got_s = got exp_s = expected @@ -156,9 +156,9 @@ def validate_cmd_response_nonce(got, used): A request nonce of 000000000000 means the HSM should generate a nonce internally though, so if 'used' is all zeros we actually check that 'got' does NOT match 'used'. """ - if used == '000000000000'.decode('hex'): + if used == bytes.fromhex('000000000000'): if got == used: raise(pyhsm.exception.YHSM_Error("Bad nonce in response (got %s, expected HSM generated nonce)" \ - % (got.encode('hex')))) + % (bytes.fromhex(got)))) return got return validate_cmd_response_str('nonce', got, used) diff --git a/pyhsm/val/init_oath_token.py b/pyhsm/val/init_oath_token.py index 215569d..b236f84 100644 --- a/pyhsm/val/init_oath_token.py +++ b/pyhsm/val/init_oath_token.py @@ -111,14 +111,14 @@ def generate_aead(hsm, args): nonce = hsm.get_nonce().nonce aead = hsm.generate_aead(nonce, args.key_handle) if args.debug: - print "AEAD: %s (%s)" % (aead.data.encode('hex'), aead) + print("AEAD: %s (%s)" % (aead.data, aead)) return nonce, aead def validate_oath_c(hsm, args, nonce, aead): if args.test_code: if args.verbose: - print "Trying to validate the OATH counter value in the range %i..%i." \ - % (args.oath_c, args.oath_c + args.look_ahead) + print("Trying to validate the OATH counter value in the range %i..%i." \ + % (args.oath_c, args.oath_c + args.look_ahead)) counter = pyhsm.oath_hotp.search_for_oath_code(hsm, args.key_handle, nonce, aead, \ args.oath_c, args.test_code, args.look_ahead \ ) @@ -127,21 +127,21 @@ def validate_oath_c(hsm, args, nonce, aead): % (args.test_code, args.oath_c, args.oath_c + args.look_ahead)) sys.exit(1) if args.verbose: - print "OATH C==%i validated with code %s" % (counter - 1, args.test_code) + print("OATH C==%i validated with code %s" % (counter - 1, args.test_code)) return counter return args.oath_c def get_oath_k(args): """ Get the OATH K value (secret key), either from args or by prompting. """ if args.oath_k: - decoded = args.oath_k.decode('hex') + decoded = bytes.fromhex(args.oath_k) else: - t = raw_input("Enter OATH key (hex encoded) : ") - decoded = t.decode('hex') + t = input("Enter OATH key (hex encoded) : ") + decoded = bytes.fromhex(t) if len(decoded) > 20: decoded = sha1(decoded).digest() - decoded = decoded.ljust(20, '\0') + decoded = decoded.ljust(20, b'\0') return decoded class ValOathDb(): @@ -185,8 +185,8 @@ def __init__(self, row): def store_oath_entry(args, nonce, aead, oath_c): """ Store the AEAD in the database. """ data = {"key": args.uid, - "aead": aead.data.encode('hex'), - "nonce": nonce.encode('hex'), + "aead": aead.data, + "nonce": nonce, "key_handle": args.key_handle, "oath_C": oath_c, "oath_T": None, @@ -197,7 +197,7 @@ def store_oath_entry(args, nonce, aead, oath_c): if args.force: db.delete(entry) db.add(entry) - except sqlite3.IntegrityError, e: + except sqlite3.IntegrityError as e: sys.stderr.write("ERROR: %s\n" % (e)) return False return True @@ -207,9 +207,9 @@ def main(): args_fixup(args) - print "Key handle : %s" % (args.key_handle) - print "YHSM device : %s" % (args.device) - print "" + print("Key handle : %s" % (args.key_handle)) + print("YHSM device : %s" % (args.device)) + print("") hsm = pyhsm.YHSM(device = args.device, debug=args.debug) diff --git a/pyhsm/val/validate_otp.py b/pyhsm/val/validate_otp.py index 57ce44c..b3c25dd 100644 --- a/pyhsm/val/validate_otp.py +++ b/pyhsm/val/validate_otp.py @@ -65,12 +65,12 @@ def validate_otp(hsm, args): try: res = pyhsm.yubikey.validate_otp(hsm, args.otp) if args.verbose: - print "OK counter=%04x low=%04x high=%02x use=%02x" % \ - (res.use_ctr, res.ts_low, res.ts_high, res.session_ctr) + print("OK counter=%04x low=%04x high=%02x use=%02x" % \ + (res.use_ctr, res.ts_low, res.ts_high, res.session_ctr)) return 0 - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if args.verbose: - print "%s" % (pyhsm.defines.status2str(e.status)) + print("%s" % (pyhsm.defines.status2str(e.status))) # figure out numerical response code for r in [pyhsm.defines.YSM_OTP_INVALID, \ pyhsm.defines.YSM_OTP_REPLAY, \ @@ -84,7 +84,7 @@ def validate_oath(hsm, args): """ Validate an OATH OTP. """ - print "ERROR: Not implemented, try 'yhsm-validation-server'." + print("ERROR: Not implemented, try 'yhsm-validation-server'.") return 0 @@ -92,8 +92,8 @@ def main(): args = parse_args() if args.debug: - print "YHSM device : %s" % (args.device) - print "" + print("YHSM device : %s" % (args.device)) + print("") hsm = pyhsm.YHSM(device = args.device, debug=args.debug) diff --git a/pyhsm/val/validation_server.py b/pyhsm/val/validation_server.py index e5c101e..087c434 100644 --- a/pyhsm/val/validation_server.py +++ b/pyhsm/val/validation_server.py @@ -88,8 +88,8 @@ import hashlib import sqlite3 import argparse -import urlparse -import BaseHTTPServer +import urllib.parse +import http.server import pyhsm import pyhsm.yubikey import pyhsm.oath_hotp @@ -114,7 +114,7 @@ client_ids = {} -class YHSM_VALRequestHandler(BaseHTTPServer.BaseHTTPRequestHandler): +class YHSM_VALRequestHandler(http.server.BaseHTTPRequestHandler): """ Handle HTTP GET requests according to configuration in global variable `args'. """ @@ -130,7 +130,7 @@ def do_GET(self): res = None log_res = None mode = None - params = urlparse.parse_qs(self.path[len(args.serve_url):]) + params = urllib.parse.parse_qs(self.path[len(args.serve_url):]) if "otp" in params: if args.mode_short_otp: # YubiKey internal db OTP in KSM mode @@ -170,12 +170,12 @@ def do_GET(self): self.log_message("%s validation result: %s -> %s", mode, self.path, log_res) - if res != None: + if res is not None: self.send_response(200) self.send_header('Content-type', 'text/plain') self.end_headers() - self.wfile.write(res) - self.wfile.write("\n") + self.wfile.write(bytes(res, "utf8")) + self.wfile.write(bytes("\n", "utf8")) else: self.log_error ("No validation result to '%s' (responding 403)" % (self.path)) self.send_response(403, 'Forbidden') @@ -199,14 +199,14 @@ def my_address_string(self): """ For logging client host without resolving. """ return self.client_address[0] -class YHSM_VALServer(BaseHTTPServer.HTTPServer): +class YHSM_VALServer(http.server.HTTPServer): """ Wrapper class to properly initialize address_family for IPv6 addresses. """ def __init__(self, server_address, req_handler): if ":" in server_address[0]: self.address_family = socket.AF_INET6 - BaseHTTPServer.HTTPServer.__init__(self, server_address, req_handler) + http.server.HTTPServer.__init__(self, server_address, req_handler) def validate_yubikey_otp_short(self, params): @@ -221,7 +221,7 @@ def validate_yubikey_otp_short(self, params): res = pyhsm.yubikey.validate_otp(hsm, from_key) return "OK counter=%04x low=%04x high=%02x use=%02x" % \ (res.use_ctr, res.ts_low, res.ts_high, res.session_ctr) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: return "ERR %s" % (pyhsm.defines.status2str(e.status)) def validate_yubikey_otp(self, params): @@ -270,7 +270,7 @@ def validate_yubikey_otp(self, params): vres["sl"] = "100" if "timestamp" in params: vres["t"] = time.strftime("%FT%TZ0000", time.gmtime()) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if e.status == pyhsm.defines.YSM_ID_NOT_FOUND: vres["status"] = "BAD_OTP" elif e.status == pyhsm.defines.YSM_OTP_REPLAY: @@ -350,24 +350,24 @@ def validate_oath_hotp(self, params): self.log_error("IN: %s, could not get UID/OTP ('%s'/'%s')" % (params, uid, otp)) return "ERR Invalid OATH-HOTP input" if args.debug: - print "OATH-HOTP uid %s, OTP %s" % (uid, otp) + print("OATH-HOTP uid %s, OTP %s" % (uid, otp)) # Fetch counter value for `uid' from database try: db = ValOathDb(args.db_file) entry = db.get(uid) - except Exception, e: + except Exception as e: self.log_error("IN: %s, database error : '%s'" % (params, e)) return "ERR Internal error" # Check for correct OATH-HOTP OTP - nonce = entry.data["nonce"].decode('hex') - aead = entry.data["aead"].decode('hex') + nonce = bytes.fromhex(entry.data["nonce"]) + aead = bytes.fromhex(entry.data["aead"]) new_counter = pyhsm.oath_hotp.search_for_oath_code(hsm, entry.data["key_handle"], nonce, aead, \ entry.data["oath_c"], otp, args.look_ahead) if args.debug: - print "OATH-HOTP %i..%i -> new C == %s" \ - % (entry.data["oath_c"], entry.data["oath_c"] + args.look_ahead, new_counter) + print("OATH-HOTP %i..%i -> new C == %s" \ + % (entry.data["oath_c"], entry.data["oath_c"] + args.look_ahead, new_counter)) if type(new_counter) != int: # XXX increase 'throttling parameter' to make brute forcing harder/impossible return "ERR Could not validate OATH-HOTP OTP" @@ -377,7 +377,7 @@ def validate_oath_hotp(self, params): return "OK counter=%04x" % (new_counter) else: return "ERR replayed OATH-HOTP" - except Exception, e: + except Exception as e: self.log_error("IN: %s, database error updating counter : %s" % (params, e)) return "ERR Internal error" @@ -395,25 +395,25 @@ def validate_oath_totp(self, params): self.log_error("IN: %s, could not get UID/OTP ('%s'/'%s')" % (params, uid, otp)) return "ERR Invalid OATH-TOTP input" if args.debug: - print "OATH-TOTP uid %s, OTP %s" % (uid, otp) + print("OATH-TOTP uid %s, OTP %s" % (uid, otp)) # Fetch counter value for `uid' from database try: db = ValOathDb(args.db_file) entry = db.get(uid) - except Exception, e: + except Exception as e: self.log_error("IN: %s, database error : '%s'" % (params, e)) return "ERR Internal error" # Check for correct OATH-TOTP OTP - nonce = entry.data["nonce"].decode('hex') - aead = entry.data["aead"].decode('hex') + nonce = bytes.fromhex(entry.data["nonce"]) + aead = bytes.fromhex(entry.data["aead"]) new_timecounter = pyhsm.oath_totp.search_for_oath_code( hsm, entry.data["key_handle"], nonce, aead, otp, args.interval, args.tolerance) if args.debug: - print "OATH-TOTP counter: %i, interval: %i -> new timecounter == %s" \ - % (entry.data["oath_c"], args.interval, new_timecounter) + print("OATH-TOTP counter: %i, interval: %i -> new timecounter == %s" \ + % (entry.data["oath_c"], args.interval, new_timecounter)) if type(new_timecounter) != int: return "ERR Could not validate OATH-TOTP OTP" try: @@ -423,7 +423,7 @@ def validate_oath_totp(self, params): return "OK timecounter=%04x" % (new_timecounter) else: return "ERR replayed OATH-TOTP" - except Exception, e: + except Exception as e: self.log_error("IN: %s, database error updating counter : %s" % (params, e)) return "ERR Internal error" @@ -432,10 +432,10 @@ def validate_pwhash(_self, params): Validate password hash using YubiHSM. """ pwhash, nonce, aead, key_handle = get_pwhash_bits(params) - d_aead = aead.decode('hex') + d_aead = bytes.fromhex(aead) plaintext_len = len(d_aead) - pyhsm.defines.YSM_AEAD_MAC_SIZE - pw = pwhash.ljust(plaintext_len, chr(0x0)) - if hsm.validate_aead(nonce.decode('hex'), key_handle, d_aead, pw): + pw = pwhash.ljust(plaintext_len, chr(0x0)).encode() + if hsm.validate_aead(bytes.fromhex(nonce), key_handle, d_aead, pw): return "OK pwhash validated" return "ERR Could not validate pwhash" @@ -670,7 +670,7 @@ def load_clients_file(filename): res = {} content = [] try: - fhandle = file(filename) + fhandle = open(filename) content = fhandle.readlines() fhandle.close() except IOError: @@ -740,7 +740,7 @@ def main(): global hsm try: hsm = pyhsm.YHSM(device = args.device, debug = args.debug) - except serial.SerialException, e: + except serial.SerialException as e: my_log_message(args, syslog.LOG_ERR, 'Failed opening YubiHSM device "%s" : %s' %(args.device, e)) return 1 @@ -749,9 +749,9 @@ def main(): try: run() except KeyboardInterrupt: - print "" - print "Shutting down" - print "" + print("") + print("Shutting down") + print("") if __name__ == '__main__': diff --git a/pyhsm/validate_cmd.py b/pyhsm/validate_cmd.py index b730dfe..061e9df 100644 --- a/pyhsm/validate_cmd.py +++ b/pyhsm/validate_cmd.py @@ -107,7 +107,7 @@ def __repr__(self): return '<%s instance at %s: public_id=%s, use_ctr=%i, session_ctr=%i, ts=%i/%i>' % ( self.__class__.__name__, hex(id(self)), - self.public_id.encode('hex'), + bytes.fromhex(self.public_id), self.use_ctr, self.session_ctr, self.ts_high, diff --git a/pyhsm/yubikey.py b/pyhsm/yubikey.py index 96d1d14..e3e130b 100644 --- a/pyhsm/yubikey.py +++ b/pyhsm/yubikey.py @@ -43,8 +43,8 @@ def validate_otp(hsm, from_key): @see: L{pyhsm.db_cmd.YHSM_Cmd_DB_Validate_OTP.parse_result} """ public_id, otp = split_id_otp(from_key) - return hsm.db_validate_yubikey_otp(modhex_decode(public_id).decode('hex'), - modhex_decode(otp).decode('hex') + return hsm.db_validate_yubikey_otp(modhex_decode(public_id), + modhex_decode(otp) ) def validate_yubikey_with_aead(hsm, from_key, aead, key_handle): @@ -84,9 +84,9 @@ def validate_yubikey_with_aead(hsm, from_key, aead, key_handle): otp = modhex_decode(otp) if not nonce: - nonce = public_id.decode('hex') + nonce = bytes.fromhex(public_id) - return hsm.validate_aead_otp(nonce, otp.decode('hex'), + return hsm.validate_aead_otp(nonce, otp, key_handle, aead) def modhex_decode(data): @@ -99,7 +99,7 @@ def modhex_decode(data): @returns: Hex @rtype: string """ - t_map = string.maketrans("cbdefghijklnrtuv", "0123456789abcdef") + t_map = bytes.maketrans(b"cbdefghijklnrtuv", b"0123456789abcdef") return data.translate(t_map) def modhex_encode(data): @@ -112,7 +112,7 @@ def modhex_encode(data): @returns: Modhex @rtype: string """ - t_map = string.maketrans("0123456789abcdef", "cbdefghijklnrtuv") + t_map = bytes.maketrans(b"0123456789abcdef", b"cbdefghijklnrtuv") return data.translate(t_map) def split_id_otp(from_key): @@ -125,12 +125,12 @@ def split_id_otp(from_key): @returns: public_id and OTP @rtype: tuple of string """ - if len(from_key) > 32: - public_id, otp = from_key[:-32], from_key[-32:] - elif len(from_key) == 32: - public_id = '' + if len(from_key) > 16: + public_id, otp = from_key[:-16], from_key[-16:] + elif len(from_key) == 16: + public_id = b'' otp = from_key else: - raise pyhsm.exception.YHSM_Error("Bad from_key length %i < 32 : %s" \ + raise pyhsm.exception.YHSM_Error("Bad from_key length %i < 16 : %s" \ % (len(from_key), from_key)) return public_id, otp diff --git a/setup.py b/setup.py index 25ac58c..d009c04 100644 --- a/setup.py +++ b/setup.py @@ -70,8 +70,8 @@ def get_version(): test_suite='test.test_init', tests_require=[], install_requires=[ - 'pyserial >= 2.3', - 'pycrypto >= 2.1' + 'pyserial >= 3.4', + 'pycryptodome >= 3.4.6' ], extras_require={ 'db': ['sqlalchemy'], diff --git a/test/configure_hsm.py b/test/configure_hsm.py index 20e03ce..49f203a 100644 --- a/test/configure_hsm.py +++ b/test/configure_hsm.py @@ -9,17 +9,17 @@ import pyhsm import pyhsm.util -import test_common +from . import test_common -from StringIO import StringIO -from test_common import CfgPassphrase, AdminYubiKeys, HsmPassphrase, PrimaryAdminYubiKey +from io import StringIO +from .test_common import CfgPassphrase, AdminYubiKeys, HsmPassphrase, PrimaryAdminYubiKey class ConfigureYubiHSMforTest(test_common.YHSM_TestCase): def test_aaa_echo(self): """ Test echo before reconfiguration. """ - self.assertTrue(self.hsm.echo('test')) + self.assertTrue(self.hsm.echo(b'test')) def test_configure_YHSM(self): """ @@ -54,7 +54,7 @@ def test_configure_YHSM(self): self.config_do ("hsm ffffffff\r%s\r%s\r%s\ryes" % (CfgPassphrase, AdminYubiKeysStr, HsmPassphrase)) self.hsm.drain() - self.add_keys(xrange(31)) + self.add_keys(range(31)) self.hsm.drain() self.config_do("keylist") @@ -70,7 +70,7 @@ def test_configure_YHSM(self): # get back into HSM mode sys.stderr.write("exit") - self.ser.write("exit\r") + self.ser.write(b"exit\r") self.hsm.drain() @@ -80,8 +80,8 @@ def test_zzz_unlock(self): """ Test unlock of keystore after reconfiguration. """ if self.hsm.version.have_unlock(): Params = PrimaryAdminYubiKey - YK = test_common.FakeYubiKey(pyhsm.yubikey.modhex_decode(Params[0]).decode('hex'), - Params[1].decode('hex'), Params[2].decode('hex') + YK = test_common.FakeYubiKey(bytes.fromhex(pyhsm.yubikey.modhex_decode(Params[0])), + bytes.fromhex(Params[1]), bytes.fromhex(Params[2]) ) # After reconfigure, we know the counter values for PrimaryAdminYubiKey is zero # in the internal db. However, the test suite initialization will unlock the keystore @@ -91,28 +91,28 @@ def test_zzz_unlock(self): # first verify counters 1/0 gives the expected YSM_OTP_REPLAY try: self.hsm.unlock(otp = YK.from_key()) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if e.status != pyhsm.defines.YSM_OTP_REPLAY: raise # now do real unlock with values 2/1 (there is an extra unlock done somewhere...) YK.use_ctr = 2 - self.assertTrue(self.hsm.unlock(password = HsmPassphrase.decode("hex"), otp = YK.from_key())) + self.assertTrue(self.hsm.unlock(password = bytes.fromhex(HsmPassphrase), otp = YK.from_key())) else: - self.assertTrue(self.hsm.unlock(password = HsmPassphrase.decode("hex"))) + self.assertTrue(self.hsm.unlock(password = bytes.fromhex(HsmPassphrase))) def test_zzz_echo(self): """ Test echo after reconfiguration. """ - self.assertTrue(self.hsm.echo('test')) + self.assertTrue(self.hsm.echo(b'test')) def config_do(self, cmd, add_cr = True): # Don't have to output command - it is echoed #sys.__stderr__.write("> " + cmd + "\n") if add_cr: - self.ser.write(cmd + "\r") + self.ser.write(cmd.encode() + b"\r") else: - self.ser.write(cmd) + self.ser.write(cmd.encode()) #time.sleep(0.5) - recv = '' + recv = b'' fail_count = 0 sys.stderr.write("< ") while True: @@ -121,11 +121,10 @@ def config_do(self, cmd, add_cr = True): fail_count += 1 if fail_count == 5: raise Exception("Did not get the next prompt", recv) - sys.stderr.write(b) recv += b - lines = recv.split('\n') - if re.match('^(NO_CFG|WSAPI|HSM).*> .*', lines[-1]): + lines = recv.split(b'\n') + if re.match('^(NO_CFG|WSAPI|HSM).*> .*', lines[-1].decode()): break return recv diff --git a/test/test_aead.py b/test/test_aead.py index 02e2533..4761905 100644 --- a/test/test_aead.py +++ b/test/test_aead.py @@ -5,24 +5,24 @@ import unittest import pyhsm -import test_common +from . import test_common class TestAEAD(test_common.YHSM_TestCase): def setUp(self): test_common.YHSM_TestCase.setUp(self) - self.nonce = "4d4d4d4d4d4d".decode('hex') - self.key = "A" * 16 - self.uid = '\x4d\x01\x4d\x02\x4d\x03' + self.nonce = bytes.fromhex('4d4d4d4d4d4d') + self.key = b"AAAAAAAAAAAAAAAA" + self.uid = bytes.fromhex('4d014d024d03') self.secret = pyhsm.aead_cmd.YHSM_YubiKeySecret(self.key, self.uid) def test_aead_cmd_class(self): """ Test YHSM_AEAD_Cmd class. """ this = pyhsm.aead_cmd.YHSM_AEAD_Cmd(None, None) # test repr method - self.assertEquals(str, type(str(this))) + self.assertEqual(str, type(str(this))) this.executed = True - self.assertEquals(str, type(str(this))) + self.assertEqual(str, type(str(this))) def test_generate_aead_simple(self): """ Test generate_aead_simple without specifying nonce. """ @@ -30,14 +30,14 @@ def test_generate_aead_simple(self): # HSM> < keyload - Load key data now using flags 00000002. Press ESC to quit # 00000002 - stored ok key_handle = 2 - nonce = '' + nonce = b'' aead = self.hsm.generate_aead_simple(nonce, key_handle, self.secret) self.assertNotEqual(aead.nonce, nonce) self.assertEqual(aead.key_handle, key_handle) # test repr method - self.assertEquals(str, type(str(aead))) + self.assertEqual(str, type(str(aead))) def test_generate_aead_simple_with_nonce(self): """ Test generate_aead_simple with specified nonce. """ @@ -62,8 +62,8 @@ def test_generate_aead_simple_nonce_blocked(self): try: res = self.hsm.generate_aead_simple(self.nonce, key_handle, self.secret) self.fail("Expected YSM_FUNCTION_DISABLED, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: - self.assertEquals(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) + except pyhsm.exception.YHSM_CommandFailed as e: + self.assertEqual(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) def test_generate_aead_simple_validates(self): """ Test validate_aead of generate_aead_simple result. """ @@ -72,7 +72,7 @@ def test_generate_aead_simple_validates(self): kh_gen = 0x2000 kh_val = 0x2000 - aead = self.hsm.generate_aead_simple('', kh_gen, self.secret) + aead = self.hsm.generate_aead_simple(b'', kh_gen, self.secret) # test that the YubiHSM validates the generated AEAD # and confirms it contains our secret @@ -86,7 +86,7 @@ def test_generate_aead_simple_hsm_nonce_validates(self): kh_gen = 0x2000 kh_val = 0x2000 - nonce = '000000000000'.decode('hex') + nonce = bytes.fromhex('000000000000') aead = self.hsm.generate_aead_simple(nonce, kh_gen, self.secret) @@ -106,8 +106,8 @@ def test_generate_aead_random_nonce_blocked(self): try: res = self.hsm.generate_aead_random(self.nonce, key_handle, 22) self.fail("Expected YSM_FUNCTION_DISABLED, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: - self.assertEquals(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) + except pyhsm.exception.YHSM_CommandFailed as e: + self.assertEqual(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) def test_generate_aead_random_nonce_permitted(self): """ Test generate_aead_random with nonce. """ @@ -127,7 +127,7 @@ def test_generate_aead_random_without_nonce(self): # 00000004 - stored ok key_handle = 4 - nonce = '' + nonce = b'' # Test a number of different sizes for num_bytes in (1, \ @@ -143,8 +143,8 @@ def test_generate_aead_random_without_nonce(self): try: res = self.hsm.generate_aead_random(nonce, key_handle, num_bytes) self.fail("Expected YSM_INVALID_PARAMETER, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: - self.assertEquals(e.status, pyhsm.defines.YSM_INVALID_PARAMETER) + except pyhsm.exception.YHSM_CommandFailed as e: + self.assertEqual(e.status, pyhsm.defines.YSM_INVALID_PARAMETER) def test_who_can_generate_random(self): """ Test what key handles can generate a random AEAD. """ @@ -169,7 +169,7 @@ def test_who_can_validate(self): gen_kh = 2 # Enabled flags 00000010 = YSM_AEAD_DECRYPT_CMP # 00000005 - stored ok - aead = self.hsm.generate_aead_simple('', gen_kh, self.secret) + aead = self.hsm.generate_aead_simple(b'', gen_kh, self.secret) this = lambda kh: self.hsm.validate_aead(aead.nonce, kh, \ aead, cleartext = self.secret.pack()) diff --git a/test/test_aes_ecb.py b/test/test_aes_ecb.py index bcd5132..b5a504e 100644 --- a/test/test_aes_ecb.py +++ b/test/test_aes_ecb.py @@ -5,7 +5,7 @@ import unittest import pyhsm -import test_common +from . import test_common class TestOtpValidate(test_common.YHSM_TestCase): @@ -20,13 +20,13 @@ def test_aes_ecb_cmd_class(self): """ Test YHSM_Cmd_AES_ECB class. """ this = pyhsm.aes_ecb_cmd.YHSM_Cmd_AES_ECB(None, None, '') # test repr method - self.assertEquals(str, type(str(this))) + self.assertEqual(str, type(str(this))) this.executed = True - self.assertEquals(str, type(str(this))) + self.assertEqual(str, type(str(this))) def test_encrypt_decrypt(self): """ Test to AES ECB decrypt something encrypted. """ - plaintext = 'Fjaellen 2011'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) # pad for compare after decrypt + plaintext = b'Fjaellen 2011'.ljust(pyhsm.defines.YSM_BLOCK_SIZE)# pad for compare after decrypt ciphertext = self.hsm.aes_ecb_encrypt(self.kh_encrypt, plaintext) @@ -38,20 +38,20 @@ def test_encrypt_decrypt(self): def test_compare(self): """ Test to AES ECB decrypt and then compare something. """ - plaintext = 'Maverick'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) + plaintext = b'Maverick'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) ciphertext = self.hsm.aes_ecb_encrypt(self.kh_encrypt, plaintext) self.assertTrue(self.hsm.aes_ecb_compare(self.kh_compare, ciphertext, plaintext)) - self.assertFalse(self.hsm.aes_ecb_compare(self.kh_compare, ciphertext, plaintext[:-1] + 'x')) + self.assertFalse(self.hsm.aes_ecb_compare(self.kh_compare, ciphertext, plaintext[:-1] + b'x')) def test_compare_bad(self): """ Test AES decrypt compare with incorrect plaintext. """ - plaintext = 'Maverick'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) + plaintext = b'Maverick'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) ciphertext = self.hsm.aes_ecb_encrypt(self.kh_encrypt, plaintext) - self.assertFalse(self.hsm.aes_ecb_compare(self.kh_compare, ciphertext, plaintext[:-1] + 'x')) + self.assertFalse(self.hsm.aes_ecb_compare(self.kh_compare, ciphertext, plaintext[:-1] + b'x')) def test_who_can_encrypt(self): """ Test what key handles can encrypt AES ECB encrypted blocks. """ @@ -59,7 +59,7 @@ def test_who_can_encrypt(self): # 0000000e - stored ok kh_enc = 0x0e - plaintext = 'sommar' + plaintext = b'sommar' this = lambda kh: self.hsm.aes_ecb_encrypt(kh, plaintext) self.who_can(this, expected = [kh_enc]) @@ -74,7 +74,7 @@ def test_who_can_decrypt(self): # 0000000f - stored ok kh_dec = 0x0f - plaintext = 'sommar' + plaintext = b'sommar' ciphertext = self.hsm.aes_ecb_encrypt(kh_enc, plaintext) this = lambda kh: self.hsm.aes_ecb_decrypt(kh, ciphertext) @@ -96,7 +96,7 @@ def test_who_can_compare(self): # 0000000f - stored ok kh_dec = 0x0f - plaintext = 'sommar' + plaintext = b'sommar' ciphertext = self.hsm.aes_ecb_encrypt(kh_enc, plaintext) this = lambda kh: self.hsm.aes_ecb_decrypt(kh, ciphertext) @@ -107,26 +107,26 @@ def test_aes_with_keystore_locked(self): if self.hsm.version.ver <= (0, 9, 8,): print ("Test for known bug in 0.9.8 disabled.") return None - cleartext = "reference" + cleartext = b"reference" res_before = self.hsm.aes_ecb_encrypt(0x2000, cleartext) # lock key store try: - res = self.hsm.key_storage_unlock("A" * 8) + res = self.hsm.key_storage_unlock(b"A" * 8) self.fail("Expected YSM_MISMATCH/YSM_KEY_STORAGE_LOCKED, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if self.hsm.version.have_key_store_decrypt(): - self.assertEquals(e.status, pyhsm.defines.YSM_MISMATCH) + self.assertEqual(e.status, pyhsm.defines.YSM_MISMATCH) else: - self.assertEquals(e.status, pyhsm.defines.YSM_KEY_STORAGE_LOCKED) + self.assertEqual(e.status, pyhsm.defines.YSM_KEY_STORAGE_LOCKED) # make sure we can't AES encrypt when keystore is locked try: res = self.hsm.aes_ecb_encrypt(0x2000, cleartext) self.fail("Expected YSM_KEY_STORAGE_LOCKED, got %s (before lock: %s)" \ - % (res.encode("hex"), res_before.encode("hex"))) - except pyhsm.exception.YHSM_CommandFailed, e: - self.assertEquals(e.status, pyhsm.defines.YSM_KEY_STORAGE_LOCKED) + % (res, res_before)) + except pyhsm.exception.YHSM_CommandFailed as e: + self.assertEqual(e.status, pyhsm.defines.YSM_KEY_STORAGE_LOCKED) # unlock key store with correct passphrase - self.assertTrue(self.hsm.key_storage_unlock(test_common.HsmPassphrase.decode("hex"))) + self.assertTrue(self.hsm.key_storage_unlock(bytes.fromhex(test_common.HsmPassphrase))) # make sure it is properly unlocked res_after = self.hsm.aes_ecb_encrypt(0x2000, cleartext) - self.assertEquals(res_before, res_after) + self.assertEqual(res_before, res_after) diff --git a/test/test_basics.py b/test/test_basics.py index d11ad49..0583c53 100644 --- a/test/test_basics.py +++ b/test/test_basics.py @@ -7,7 +7,7 @@ import serial import struct -import test_common +from . import test_common class TestBasics(test_common.YHSM_TestCase): @@ -16,7 +16,7 @@ def setUp(self): def test_echo(self): """ Test echo command. """ - self.assertTrue(self.hsm.echo('test')) + self.assertTrue(self.hsm.echo(b'test')) def test_random(self): """ Test random number generator . """ @@ -48,14 +48,14 @@ def test_nonce(self): def test_nonce_class(self): """ Test nonce class. """ # test repr method - self.assertEquals(str, type(str(self.hsm.get_nonce(0)))) + self.assertEqual(str, type(str(self.hsm.get_nonce(0)))) def test_random_reseed(self): """ Tets random reseed. """ # Unsure if we can test anything except the status returned is OK - self.assertTrue(self.hsm.random_reseed('A' * 32)) + self.assertTrue(self.hsm.random_reseed(b'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA')) # at least test we didn't disable the RNG r1 = self.hsm.random(10) r2 = self.hsm.random(10) @@ -63,14 +63,14 @@ def test_random_reseed(self): def test_load_temp_key(self): """ Test load_temp_key. """ - key = "A" * 16 - uid = '\x4d\x01\x4d\x02' - nonce = 'f1f2f3f4f5f6'.decode('hex') + key = b"AAAAAAAAAAAAAAAA" + uid = b'\x4d\x01\x4d\x02' + nonce = bytes.fromhex('f1f2f3f4f5f6') # key 0x2000 has all flags set key_handle = 0x2000 my_flags = struct.pack("< I", 0xffffffff) # full permissions when loaded into phantom key handle - my_key = 'C' * pyhsm.defines.YSM_MAX_KEY_SIZE + my_key = b'C' * pyhsm.defines.YSM_MAX_KEY_SIZE self.hsm.load_secret(my_key + my_flags) aead = self.hsm.generate_aead(nonce, key_handle) @@ -81,7 +81,7 @@ def test_load_temp_key(self): self.assertTrue(self.hsm.load_temp_key(nonce, key_handle, aead)) # Encrypt something with the phantom key - plaintext = 'Testing'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) # pad for compare after decrypt + plaintext = b'Testing'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) # pad for compare after decrypt ciphertext = self.hsm.aes_ecb_encrypt(pyhsm.defines.YSM_TEMP_KEY_HANDLE, plaintext) self.assertNotEqual(plaintext, ciphertext) @@ -92,12 +92,12 @@ def test_load_temp_key(self): def test_yhsm_class(self): """ Test YHSM class. """ # test repr method - self.assertEquals(str, type(str(self.hsm))) + self.assertEqual(str, type(str(self.hsm))) def test_yhsm_stick_class(self): """ Test YHSM_Stick class. """ # test repr method - self.assertEquals(str, type(str(self.hsm.stick))) + self.assertEqual(str, type(str(self.hsm.stick))) def test_set_debug(self): """ Test set_debug on YHSM. """ @@ -115,14 +115,14 @@ def test_sysinfo_cmd_class(self): """ Test YHSM_Cmd_System_Info class. """ this = pyhsm.basic_cmd.YHSM_Cmd_System_Info(None) # test repr method - self.assertEquals(str, type(str(this))) + self.assertEqual(str, type(str(this))) def test_sysinfo(self): """ Test sysinfo. """ info = self.hsm.info() self.assertTrue(info.version_major > 0 or info.version_minor > 0) self.assertEqual(12, len(info.system_uid)) - self.assertEquals(str, type(str(info))) + self.assertEqual(str, type(str(info))) def test_drain(self): """ Test YubiHSM drain. """ diff --git a/test/test_buffer.py b/test/test_buffer.py index 652c382..e366958 100644 --- a/test/test_buffer.py +++ b/test/test_buffer.py @@ -5,7 +5,7 @@ import unittest import pyhsm -import test_common +from . import test_common class TestBuffer(test_common.YHSM_TestCase): @@ -14,7 +14,7 @@ def setUp(self): def test_load_random(self): """ Test load_random. """ - nonce = "abc123" + nonce = b"abc123" # key 0x2000 has all flags set key_handle = 0x2000 self.hsm.load_random(16) @@ -29,7 +29,7 @@ def test_load_random(self): def test_would_overflow_buffer(self): """ Test overflow of buffer. """ - nonce = "abc123" + nonce = b"abc123" # key 0x2000 has all flags set key_handle = 0x2000 @@ -42,14 +42,14 @@ def test_would_overflow_buffer(self): def test_load_data(self): """ Test loading data into buffer. """ - c1 = self.hsm.load_data('Samp', offset = 0) + c1 = self.hsm.load_data(b'Samp', offset = 0) self.assertEqual(c1, 4) - c2 = self.hsm.load_data('123', offset = 3) + c2 = self.hsm.load_data(b'123', offset = 3) self.assertEqual(c2, 6) - c3 = self.hsm.load_data('ple #2', offset = 3) + c3 = self.hsm.load_data(b'ple #2', offset = 3) self.assertEqual(c3, 9) - nonce = "abc123" + nonce = b"abc123" # key 0x2000 has all flags set key_handle = 0x2000 aead = self.hsm.generate_aead(nonce, key_handle) - self.assertEqual(aead.data.encode('hex'), '18a88fbd7bd2275ba0a722bf80423ffab7') + self.assertEqual(aead.data, bytes.fromhex('18a88fbd7bd2275ba0a722bf80423ffab7')) diff --git a/test/test_common.py b/test/test_common.py index a47cd92..f077001 100644 --- a/test/test_common.py +++ b/test/test_common.py @@ -27,9 +27,9 @@ def setUp(self, device = os.getenv('YHSM_DEVICE', '/dev/ttyACM0'), debug = False # unlock keystore if our test configuration contains a passphrase if HsmPassphrase is not None and HsmPassphrase != "": try: - self.hsm.unlock(password = HsmPassphrase.decode("hex")) - self.otp_unlock() - except pyhsm.exception.YHSM_CommandFailed, e: + self.hsm.unlock(password = bytes.fromhex(HsmPassphrase)) + #self.otp_unlock() + except pyhsm.exception.YHSM_CommandFailed as e: # ignore errors from the unlock function, in case our test configuration # hasn't been loaded into the YubiHSM yet pass @@ -43,14 +43,14 @@ def who_can(self, what, expected = [], extra_khs = []): Try the lambda what() with all key handles between 1 and 32, except the expected one. Fail on anything but YSM_FUNCTION_DISABLED. """ - for kh in list(xrange(1, 32)) + extra_khs: + for kh in list(range(1, 32)) + extra_khs: if kh in expected: continue res = None try: res = what(kh) self.fail("Expected YSM_FUNCTION_DISABLED for key handle 0x%0x, got '%s'" % (kh, res)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if e.status != pyhsm.defines.YSM_FUNCTION_DISABLED: self.fail("Expected YSM_FUNCTION_DISABLED for key handle 0x%0x, got %s" \ % (kh, e.status_str)) @@ -64,8 +64,8 @@ def otp_unlock(self): if not self.hsm.version.have_unlock(): return None Params = PrimaryAdminYubiKey - YK = FakeYubiKey(pyhsm.yubikey.modhex_decode(Params[0]).decode('hex'), - Params[1].decode('hex'), Params[2].decode('hex') + YK = FakeYubiKey(bytes.fromhex(pyhsm.yubikey.modhex_decode(Params[0])), + bytes.fromhex(Params[1]), bytes.fromhex(Params[2]) ) YK.session_ctr = 0 use_ctr = 1 # the 16 bit power-up counter of the YubiKey @@ -77,7 +77,7 @@ def otp_unlock(self): self.assertTrue(res) # OK - if we got here we've got a successful response for this OTP break - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if e.status != pyhsm.defines.YSM_OTP_REPLAY: raise # don't bother with the session_ctr - test run 5 would mean we first have to @@ -144,9 +144,8 @@ def from_key(self, public_id, key): Return what the YubiKey would have returned when the button was pressed. """ from pyhsm.yubikey import modhex_encode, modhex_decode - otp = self.get_otp(key) - from_key = modhex_encode(public_id.encode('hex')) + modhex_encode(otp.encode('hex')) + from_key = modhex_encode(public_id) + modhex_encode(otp) return from_key class YubiKeyRnd(YubiKeyEmu): diff --git a/test/test_db.py b/test/test_db.py index 84fc7d8..820cdd3 100644 --- a/test/test_db.py +++ b/test/test_db.py @@ -6,17 +6,17 @@ import unittest import pyhsm -import test_common +from . import test_common -from test_yubikey_validate import YubiKeyEmu +from .test_yubikey_validate import YubiKeyEmu class TestInternalDB(test_common.YHSM_TestCase): def setUp(self): test_common.YHSM_TestCase.setUp(self) - self.key = "A" * 16 - self.uid = 'f0f1f2f3f4f5'.decode('hex') - self.public_id = '4d4d4d4d4d4d'.decode('hex') + self.key = b"AAAAAAAAAAAAAAAA" + self.uid = bytes.fromhex('f0f1f2f3f4f5') + self.public_id = bytes.fromhex('4d4d4d4d4d4d') def test_store_yubikey(self): """ Test storing a YubiKey in the internal database. """ @@ -32,14 +32,14 @@ def test_store_yubikey(self): # always zap the configuration before running the test suite. try: self.assertTrue(self.hsm.db_store_yubikey(self.public_id, key_handle, aead)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: self.assertEqual(e.status, pyhsm.defines.YSM_ID_DUPLICATE) # Now, try an invalid validation against that record try: - res = self.hsm.db_validate_yubikey_otp(self.public_id, "x" * 16) + res = self.hsm.db_validate_yubikey_otp(self.public_id, B"x" * 16) self.fail("Expected YSM_OTP_INVALID, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: self.assertEqual(e.status, pyhsm.defines.YSM_OTP_INVALID) def test_store_yubikey_with_nonce(self): @@ -48,10 +48,10 @@ def test_store_yubikey_with_nonce(self): raise unittest.SkipTest("Test of command introduced in 1.0.4 disabled.") # Key handle 0x2000 has all flags enabled key_handle = 0x2000 - public_id = '4d4d4d001122'.decode('hex') - nonce = '010203040506'.decode('hex') - key = 'T' * 16 - uid = 'F' * 6 + public_id = bytes.fromhex('4d4d4d001122') + nonce = bytes.fromhex('010203040506') + key = b'T' * 16 + uid = b'F' * 6 secret = pyhsm.aead_cmd.YHSM_YubiKeySecret(key, uid) self.hsm.load_secret(secret) @@ -62,14 +62,14 @@ def test_store_yubikey_with_nonce(self): # always zap the configuration before running the test suite. try: self.assertTrue(self.hsm.db_store_yubikey(public_id, key_handle, aead, nonce = nonce)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: self.assertEqual(e.status, pyhsm.defines.YSM_ID_DUPLICATE) # Now, try an invalid validation against that record try: - res = self.hsm.db_validate_yubikey_otp(public_id, "x" * 16) + res = self.hsm.db_validate_yubikey_otp(public_id, b"x" * 16) self.fail("Expected YSM_OTP_INVALID, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: self.assertEqual(e.status, pyhsm.defines.YSM_OTP_INVALID) def test_real_validate(self): @@ -90,7 +90,7 @@ def test_real_validate(self): # always zap the configuration before running the test suite. try: self.assertTrue(self.hsm.db_store_yubikey(this_public_id, key_handle, aead)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: self.assertEqual(e.status, pyhsm.defines.YSM_ID_DUPLICATE) # OK, now we know there is an entry for this_public_id in the database - @@ -107,7 +107,7 @@ def test_real_validate(self): self.assertEqual(res.use_ctr, use_ctr) # OK - if we got here we've got a successful response for this OTP break - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if e.status != pyhsm.defines.YSM_OTP_REPLAY: raise # don't bother with the session_ctr - test run 5 would mean we first have to @@ -120,7 +120,7 @@ def test_real_validate(self): try: res = self.hsm.db_validate_yubikey_otp(this_public_id, otp) self.fail("Expected YSM_OTP_REPLAY, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if e.status != pyhsm.defines.YSM_OTP_REPLAY: raise diff --git a/test/test_hmac.py b/test/test_hmac.py index a38ddbc..f48ef6a 100644 --- a/test/test_hmac.py +++ b/test/test_hmac.py @@ -5,7 +5,7 @@ import unittest import pyhsm -import test_common +from . import test_common class TestHMACSHA1(test_common.YHSM_TestCase): @@ -17,61 +17,61 @@ def setUp(self): def test_nist_test_vector(self): """ Test HMAC SHA1 with NIST PUB 198 A.2 test vector. """ - data = 'Sample #2' + data = b'Sample #2' this = self.hsm.hmac_sha1(self.kh, data).execute() - self.assertEquals(this.get_hash().encode('hex'), '0922d3405faa3d194f82a45830737d5cc6c75d24') + self.assertEqual(this.get_hash(), bytes.fromhex('0922d3405faa3d194f82a45830737d5cc6c75d24')) # test of repr method - self.assertEquals(str, type(str(this))) + self.assertEqual(str, type(str(this))) def test_hmac_numeric_flags(self): """ Test HMAC SHA1 with numeric flags. """ - data = 'Sample #2' + data = b'Sample #2' flags = pyhsm.defines.YSM_HMAC_SHA1_RESET | pyhsm.defines.YSM_HMAC_SHA1_FINAL this = self.hsm.hmac_sha1(self.kh, data, flags = flags).execute() - self.assertEquals(this.get_hash().encode('hex'), '0922d3405faa3d194f82a45830737d5cc6c75d24') + self.assertEqual(this.get_hash(), bytes.fromhex('0922d3405faa3d194f82a45830737d5cc6c75d24')) def test_hmac_continuation(self): """ Test HMAC continuation. """ - data = 'Sample #2' + data = b'Sample #2' this = self.hsm.hmac_sha1(self.kh, data[:3], final = False) - self.assertEquals(this.get_hash().encode('hex'), '00' * 20) + self.assertEqual(this.get_hash(), '\x00' * 20) this.next(data[3:], final = True).execute() - self.assertEquals(this.get_hash().encode('hex'), '0922d3405faa3d194f82a45830737d5cc6c75d24') + self.assertEqual(this.get_hash(), bytes.fromhex('0922d3405faa3d194f82a45830737d5cc6c75d24')) def test_hmac_continuation2(self): """ Test HMAC nasty continuation. """ - data = 'Sample #2' + data = b'Sample #2' - this = self.hsm.hmac_sha1(self.kh, '', final = False) - self.assertEquals(this.get_hash().encode('hex'), '00' * 20) + this = self.hsm.hmac_sha1(self.kh, b'', final = False) + self.assertEqual(this.get_hash(), '\x00' * 20) this.next(data[:3], final = False).execute() this.next(data[3:], final = False).execute() - this.next('', final = True).execute() - self.assertEquals(this.get_hash().encode('hex'), '0922d3405faa3d194f82a45830737d5cc6c75d24') + this.next(b'', final = True).execute() + self.assertEqual(this.get_hash(), bytes.fromhex('0922d3405faa3d194f82a45830737d5cc6c75d24')) def test_hmac_interrupted(self): """ Test interrupted HMAC. """ - data = 'Sample #2' + data = b'Sample #2' this = self.hsm.hmac_sha1(self.kh, data[:3], final = False) - self.assertEquals(this.get_hash().encode('hex'), '00' * 20) - self.assertTrue(self.hsm.echo('hmac unit test')) + self.assertEqual(this.get_hash(), '\x00' * 20) + self.assertTrue(self.hsm.echo(b'hmac unit test')) this.next(data[3:], final = True).execute() - self.assertEquals(this.get_hash().encode('hex'), '0922d3405faa3d194f82a45830737d5cc6c75d24') + self.assertEqual(this.get_hash(), bytes.fromhex('0922d3405faa3d194f82a45830737d5cc6c75d24')) def test_hmac_interrupted2(self): """ Test AES-interrupted HMAC. """ - data = 'Sample #2' - plaintext = 'Maverick'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) + data = b'Sample #2' + plaintext = b'Maverick'.ljust(pyhsm.defines.YSM_BLOCK_SIZE) kh_encrypt = 0x1001 kh_decrypt = 0x1001 this = self.hsm.hmac_sha1(self.kh, data[:3], final = False) - self.assertEquals(this.get_hash().encode('hex'), '00' * 20) + self.assertEqual(this.get_hash(), '\x00' * 20) # AES encrypt-decrypt in the middle of HMAC calculation ciphertext = self.hsm.aes_ecb_encrypt(kh_encrypt, plaintext) self.assertNotEqual(plaintext, ciphertext) @@ -79,21 +79,21 @@ def test_hmac_interrupted2(self): self.assertEqual(plaintext, decrypted) # continue HMAC this.next(data[3:], final = True).execute() - self.assertEquals(this.get_hash().encode('hex'), '0922d3405faa3d194f82a45830737d5cc6c75d24') + self.assertEqual(this.get_hash(), bytes.fromhex('0922d3405faa3d194f82a45830737d5cc6c75d24')) def test_hmac_wrong_key_handle(self): """ Test HMAC SHA1 operation with wrong key handle. """ try: - res = self.hsm.hmac_sha1(0x01, 'foo').execute() + res = self.hsm.hmac_sha1(0x01, b'foo').execute() self.fail("Expected YSM_FUNCTION_DISABLED, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: - self.assertEquals(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) + except pyhsm.exception.YHSM_CommandFailed as e: + self.assertEqual(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) def test_who_can_hash(self): """ Test what key handles can create HMAC SHA1 hashes. """ # Enabled flags 00010000 = YSM_HMAC_SHA1_GENERATE # 00000011 - stored ok - data = 'Sample #2' + data = b'Sample #2' this = lambda kh: self.hsm.hmac_sha1(kh, data).execute() self.who_can(this, expected = [0x11]) @@ -102,27 +102,27 @@ def test_generated_sha1_class(self): """ Test YHSM_GeneratedHMACSHA1 class. """ this = pyhsm.hmac_cmd.YHSM_GeneratedHMACSHA1(0x0, 'a' * 20, True) # test repr method - self.assertEquals(str, type(str(this))) + self.assertEqual(str, type(str(this))) def test_sha1_to_buffer(self): """ Test HMAC SHA1 to internal buffer. """ self.assertEqual(0, self.hsm.load_random(0, offset = 0)) # offset = 0 clears buffer - self.hsm.hmac_sha1(self.kh, 'testing is fun!', to_buffer = True) + self.hsm.hmac_sha1(self.kh, b'testing is fun!', to_buffer = True) # Verify there is now 20 bytes in the buffer self.assertEqual(pyhsm.defines.YSM_SHA1_HASH_SIZE, self.hsm.load_random(0, offset = 1)) def test_hmac_continuation_with_buffer(self): """ Test HMAC continuation with buffer. """ - data = 'Sample #2' + data = b'Sample #2' self.assertEqual(0, self.hsm.load_random(0, offset = 0)) # offset = 0 clears buffer self.assertEqual(0, self.hsm.load_random(0, offset = 1)) - this = self.hsm.hmac_sha1(self.kh, '', final = False) - self.assertEquals(this.get_hash().encode('hex'), '00' * 20) + this = self.hsm.hmac_sha1(self.kh, b'', final = False) + self.assertEqual(this.get_hash(), '\x00' * 20) this.next(data[:3], final = False).execute() this.next(data[3:], final = False).execute() - this.next('', final = True, to_buffer = True).execute() + this.next(b'', final = True, to_buffer = True).execute() # Verify there is now 20 bytes in the buffer self.assertEqual(pyhsm.defines.YSM_SHA1_HASH_SIZE, self.hsm.load_random(0, offset = 1)) diff --git a/test/test_init.py b/test/test_init.py index 0cd0fa1..86aa155 100644 --- a/test/test_init.py +++ b/test/test_init.py @@ -6,37 +6,39 @@ import unittest import pyhsm -import test_aead -import test_aes_ecb -import test_basics -import test_buffer -import test_db -import test_hmac -import test_oath -import test_otp_validate -import test_stick -import test_util -import test_yubikey_validate -import test_misc -import test_soft_hsm +from . import test_aead +from . import test_aes_ecb +from . import test_basics +from . import test_buffer +from . import test_db +from . import test_hmac +from . import test_oath +from . import test_otp_validate +from . import test_stick +from . import test_util +from . import test_yubikey_validate +from . import test_misc +from . import test_soft_hsm +from . import configure_hsm -test_modules = [test_aead, - test_aes_ecb, - test_basics, - test_buffer, - test_db, - test_hmac, - test_oath, - test_otp_validate, - test_stick, - test_util, - test_yubikey_validate, - test_misc, - test_soft_hsm, +test_modules = [#configure_hsm, + test_aead, #ok + test_aes_ecb, #ok + test_basics, #ok + test_buffer, #ok + test_db, #ok + test_hmac, #ok + test_oath, #ok + test_otp_validate, #ok + test_stick, #ok + test_util, #ok + test_yubikey_validate, #ok + test_misc, #ok + test_soft_hsm, #kazkas blogai su crypto, ziuret kaip counteri pakeist i dict(able) objecta, nes kazkodel baitai netinka ] # special, should not be addded to test_modules -import configure_hsm + def suite(): @@ -52,9 +54,9 @@ def suite(): # Check if we have a YubiHSM present, and start with locking it's keystore # XXX produce a better error message than 'error: None' when initializing fails - hsm = pyhsm.YHSM(device = os.getenv('YHSM_DEVICE', '/dev/ttyACM0')) + hsm = pyhsm.YHSM(device = os.getenv('YHSM_DEVICE', '/dev/tty.usbmodem25142549281')) try: - hsm.unlock("BADPASSPHRASE99") + hsm.unlock(b"BADPASSPHRASE99") except pyhsm.exception.YHSM_CommandFailed as e: if hsm.version.have_key_store_decrypt(): if e.status != pyhsm.defines.YSM_MISMATCH: diff --git a/test/test_misc.py b/test/test_misc.py index 5c77cf1..f8baa78 100644 --- a/test/test_misc.py +++ b/test/test_misc.py @@ -5,7 +5,7 @@ import unittest import pyhsm -import test_common +from . import test_common class TestUtil(test_common.YHSM_TestCase): @@ -20,37 +20,37 @@ def test_using_disabled_keyhandle(self): # 00002001 - stored ok # HSM> < keydis 2001 try: - res = self.hsm.aes_ecb_encrypt(0x2001, "klartext") + res = self.hsm.aes_ecb_encrypt(0x2001, b"klartext") self.fail("Expected YSM_FUNCTION_DISABLED, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: - self.assertEquals(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) + except pyhsm.exception.YHSM_CommandFailed as e: + self.assertEqual(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) def test_keystore_unlock(self): """ Test locking and then unlocking keystore. """ if self.hsm.version.ver <= (0, 9, 8,): print ("Test for known bug in 0.9.8 disabled.") return None - cleartext = "reference" - nonce = '010203040506'.decode('hex') + cleartext = b"reference" + nonce = bytes.fromhex('010203040506') res_before = self.hsm.generate_aead_simple(nonce, 0x2000, cleartext) # lock key store try: - res = self.hsm.key_storage_unlock("A" * 8) + res = self.hsm.key_storage_unlock(b"A" * 8) self.fail("Expected YSM_MISMATCH/YSM_KEY_STORAGE_LOCKED, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: + except pyhsm.exception.YHSM_CommandFailed as e: if self.hsm.version.have_key_store_decrypt(): - self.assertEquals(e.status, pyhsm.defines.YSM_MISMATCH) + self.assertEqual(e.status, pyhsm.defines.YSM_MISMATCH) else: - self.assertEquals(e.status, pyhsm.defines.YSM_KEY_STORAGE_LOCKED) + self.assertEqual(e.status, pyhsm.defines.YSM_KEY_STORAGE_LOCKED) # make sure we can't generate AEADs when keystore is locked try: res = self.hsm.generate_aead_simple(nonce, 0x2000, cleartext) self.fail("Expected YSM_KEY_STORAGE_LOCKED, got %s (before lock: %s)" \ - % (res.data.encode('hex'), res_before.data.encode('hex'))) - except pyhsm.exception.YHSM_CommandFailed, e: - self.assertEquals(e.status, pyhsm.defines.YSM_KEY_STORAGE_LOCKED) + % (res.data, res_before.data)) + except pyhsm.exception.YHSM_CommandFailed as e: + self.assertEqual(e.status, pyhsm.defines.YSM_KEY_STORAGE_LOCKED) # unlock key store with correct passphrase - self.assertTrue(self.hsm.key_storage_unlock(test_common.HsmPassphrase.decode("hex"))) + self.assertTrue(self.hsm.key_storage_unlock(bytes.fromhex(test_common.HsmPassphrase))) # make sure it is properly unlocked res_after = self.hsm.generate_aead_simple(nonce, 0x2000, cleartext) - self.assertEquals(res_before.data, res_after.data) + self.assertEqual(res_before.data, res_after.data) diff --git a/test/test_oath.py b/test/test_oath.py index eb2378b..3822675 100644 --- a/test/test_oath.py +++ b/test/test_oath.py @@ -6,17 +6,17 @@ import pyhsm import pyhsm.oath_hotp -import test_common +from . import test_common class TestOath(test_common.YHSM_TestCase): def setUp(self): test_common.YHSM_TestCase.setUp(self) - key = "3132333435363738393031323334353637383930".decode('hex') + key = bytes.fromhex("3132333435363738393031323334353637383930") # Enabled flags 00010000 = YSM_HMAC_SHA1_GENERATE flags = struct.pack("< I", 0x10000) - self.nonce = 'f1f2f3f4f5f6'.decode('hex') + self.nonce = bytes.fromhex('f1f2f3f4f5f6') # key 0x2000 has all flags set self.key_handle = 0x2000 self.phantom = pyhsm.defines.YSM_TEMP_KEY_HANDLE @@ -46,7 +46,7 @@ def test_OATH_HOTP_values(self): for c, expected, code in test_vectors: hmac_result = self.hsm.hmac_sha1(self.phantom, struct.pack("> Q", c)).get_hash() - self.assertEqual(expected, hmac_result.encode('hex')) + self.assertEqual(bytes.fromhex(expected), hmac_result) self.assertEqual(code, pyhsm.oath_hotp.truncate(hmac_result, length=6)) def test_OATH_HOTP_validation(self): diff --git a/test/test_otp_validate.py b/test/test_otp_validate.py index 2b84723..84e493a 100644 --- a/test/test_otp_validate.py +++ b/test/test_otp_validate.py @@ -5,7 +5,7 @@ import unittest import pyhsm -import test_common +from . import test_common class TestOtpValidate(test_common.YHSM_TestCase): @@ -14,9 +14,9 @@ def setUp(self): def test_load_secret_wrong_key(self): """ Test load_secret with key that should not be allowed to. """ - key = "A" * 16 - uid = '\x4d\x4d\x4d\x4d\x4d\x4d' - public_id = 'f0f1f2f3f4f5'.decode('hex') + key = b'AAAAAAAAAAAAAAAA' + uid = b'\x4d\x4d\x4d\x4d\x4d\x4d' + public_id = bytes.fromhex('f0f1f2f3f4f5') # Enabled flags 00000100 = YHSM_AEAD_STORE # HSM> < keyload - Load key data now using flags 00000100. Press ESC to quit # 00000009 - stored ok @@ -28,14 +28,14 @@ def test_load_secret_wrong_key(self): try: res = self.hsm.generate_aead(public_id, key_handle) self.fail("Expected YSM_FUNCTION_DISABLED, got %s" % (res)) - except pyhsm.exception.YHSM_CommandFailed, e: - self.assertEquals(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) + except pyhsm.exception.YHSM_CommandFailed as e: + self.assertEqual(e.status, pyhsm.defines.YSM_FUNCTION_DISABLED) def test_load_secret(self): """ Test load_secret. """ - key = "A" * 16 - uid = '\x4d\x01\x4d\x02' - public_id = 'f1f2f3f4f5f6'.decode('hex') + key = b"A" * 16 + uid = b'\x4d\x01\x4d\x02' + public_id = bytes.fromhex('f1f2f3f4f5f6') if self.hsm.version.have_YSM_BUFFER_LOAD(): # Enabled flags 60000004 = YSM_BUFFER_AEAD_GENERATE,YSM_USER_NONCE,YSM_BUFFER_LOAD # HSM (keys changed)> < keyload - Load key data now using flags 60000004. Press ESC to quit @@ -59,6 +59,6 @@ def test_load_secret(self): def test_yubikey_secrets(self): """ Test the class representing the YUBIKEY_SECRETS struct. """ - aes_128_key = 'a' * 16 - first = pyhsm.aead_cmd.YHSM_YubiKeySecret(aes_128_key, 'b') + aes_128_key = b'aaaaaaaaaaaaaaaa' + first = pyhsm.aead_cmd.YHSM_YubiKeySecret(aes_128_key, b'b') self.assertEqual(len(first.pack()), pyhsm.defines.KEY_SIZE + pyhsm.defines.UID_SIZE) diff --git a/test/test_soft_hsm.py b/test/test_soft_hsm.py index 7be13a7..5c23901 100644 --- a/test/test_soft_hsm.py +++ b/test/test_soft_hsm.py @@ -5,39 +5,39 @@ import unittest import pyhsm -import test_common +from . import test_common class TestSoftHSM(test_common.YHSM_TestCase): def setUp(self): test_common.YHSM_TestCase.setUp(self) - self.nonce = "4d4d4d4d4d4d".decode('hex') - self.key = "A" * 16 + self.nonce = bytes.fromhex("4d4d4d4d4d4d") + self.key = b"A" * 16 def test_aes_CCM_encrypt_decrypt(self): """ Test decrypting encrypted data. """ - key = chr(0x09) * 16 + key = bytes([0x09] * 16) key_handle = 1 - plaintext = "foo".ljust(16, chr(0x0)) + plaintext = b"foo".ljust(16, b'\x00') ct = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, plaintext, decrypt = False) pt = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, ct, decrypt = True) - self.assertEquals(plaintext, pt) + self.assertEqual(plaintext, pt) def test_aes_CCM_wrong_key(self): """ Test decrypting encrypted data with wrong key. """ - key = chr(0x09) * 16 + key = bytes([0x09] * 16) key_handle = 1 - plaintext = "foo".ljust(16, chr(0x0)) + plaintext = b"foo".ljust(16, b'\x00') ct = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, plaintext, decrypt = False) - key = chr(0x08) * 16 + key = bytes([0x08] * 16) self.assertRaises(pyhsm.exception.YHSM_Error, pyhsm.soft_hsm.aesCCM, key, key_handle, self.nonce, ct, decrypt = True) def test_aes_CCM_wrong_key_handle(self): """ Test decrypting encrypted data with wrong key_handle. """ - key = chr(0x09) * 16 + key = bytes([0x09] * 16) key_handle = 1 - plaintext = "foo".ljust(16, chr(0x0)) + plaintext = b"foo".ljust(16, b'\x00') ct = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, plaintext, decrypt = False) key_handle = 2 self.assertRaises(pyhsm.exception.YHSM_Error, pyhsm.soft_hsm.aesCCM, @@ -46,48 +46,48 @@ def test_aes_CCM_wrong_key_handle(self): def test_soft_simple_aead_generation(self): """ Test soft_hsm simple AEAD generation. """ key_handle = 0x2000 - plaintext = 'foo'.ljust(16, chr(0x0)) - key = str("2000" * 16).decode('hex') + plaintext = b'foo'.ljust(16, b'\x00') + key = bytes.fromhex("2000" * 16) # generate soft AEAD ct = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, plaintext, decrypt = False) # generate hard AEAD aead = self.hsm.generate_aead_simple(self.nonce, key_handle, plaintext) ct = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, plaintext, decrypt = False) - self.assertEquals(aead.data, ct) + self.assertEqual(aead.data, ct) # decrypt the AEAD again pt = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, ct, decrypt = True) - self.assertEquals(plaintext, pt) + self.assertEqual(plaintext, pt) def test_soft_generate_long_aead(self): """ Test soft_hsm generation of long AEAD. """ key_handle = 0x2000 - plaintext = 'A' * 64 - key = str("2000" * 16).decode('hex') + plaintext = b'A' * 64 + key = bytes.fromhex("2000" * 16) # generate soft AEAD ct = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, plaintext, decrypt = False) # generate hard AEAD aead = self.hsm.generate_aead_simple(self.nonce, key_handle, plaintext) - self.assertEquals(aead.data, ct) + self.assertEqual(aead.data, ct) # decrypt the AEAD again pt = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, ct, decrypt = True) - self.assertEquals(plaintext, pt) + self.assertEqual(plaintext, pt) def test_soft_generate_yubikey_secrets_aead(self): """ Test soft_hsm generation of YubiKey secrets AEAD. """ key_handle = 0x2000 - plaintext = 'A' * 22 - key = str("2000" * 16).decode('hex') + plaintext = b'A' * 22 + key = bytes.fromhex("2000" * 16) # generate soft AEAD ct = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, plaintext, decrypt = False) # generate hard AEAD aead = self.hsm.generate_aead_simple(self.nonce, key_handle, plaintext) - self.assertEquals(aead.data, ct) + self.assertEqual(aead.data, ct) # decrypt the AEAD again pt = pyhsm.soft_hsm.aesCCM(key, key_handle, self.nonce, ct, decrypt = True) - self.assertEquals(plaintext, pt) + self.assertEqual(plaintext, pt) diff --git a/test/test_stick.py b/test/test_stick.py index 3e8bb03..8b6b195 100644 --- a/test/test_stick.py +++ b/test/test_stick.py @@ -5,7 +5,7 @@ import unittest import pyhsm -import test_common +from . import test_common class TestUtil(test_common.YHSM_TestCase): @@ -19,7 +19,7 @@ def setUp(self): def test_debug_output(self): """ Test debug output of YubiHSM communication. """ - self.assertTrue(self.hsm.echo('testing')) + self.assertTrue(self.hsm.echo(b'testing')) self.assertTrue(self.hsm.drain()) def tearDown(self): diff --git a/test/test_util.py b/test/test_util.py index b0bce99..3fbaed3 100644 --- a/test/test_util.py +++ b/test/test_util.py @@ -5,7 +5,7 @@ import unittest import pyhsm -import test_common +from . import test_common class TestUtil(test_common.YHSM_TestCase): @@ -14,11 +14,11 @@ def setUp(self): def test_hexdump(self): """ Test hexdump function. """ - data1 = ''.join([chr(x) for x in xrange(8)]) - self.assertEquals('0000 00 01 02 03 04 05 06 07\n', pyhsm.util.hexdump(data1)) - data2 = ''.join([chr(x) for x in xrange(64)]) - self.assertEquals(248, len(pyhsm.util.hexdump(data2))) - self.assertEquals('', pyhsm.util.hexdump('')) + data1 = bytes(range(8)) + self.assertEqual('0000 00 01 02 03 04 05 06 07\n', pyhsm.util.hexdump(data1)) + data2 = bytes(range(64)) + self.assertEqual(248, len(pyhsm.util.hexdump(data2))) + self.assertEqual('', pyhsm.util.hexdump('')) def test_response_validation(self): """ Test response validation functions. """ @@ -34,10 +34,10 @@ def test_input_validate_str(self): 0, 'foo', exact_len = 5) self.assertRaises(pyhsm.exception.YHSM_InputTooLong, pyhsm.util.input_validate_str, \ - '1234', 'foo', max_len = 3) - self.assertEquals('1234', pyhsm.util.input_validate_str('1234', 'foo', max_len = 4)) - self.assertEquals('1234', pyhsm.util.input_validate_str('1234', 'foo', max_len = 14)) + b'1234', 'foo', max_len = 3) + self.assertEqual(b'1234', pyhsm.util.input_validate_str(b'1234', 'foo', max_len = 4)) + self.assertEqual(b'1234', pyhsm.util.input_validate_str(b'1234', 'foo', max_len = 14)) self.assertRaises(pyhsm.exception.YHSM_WrongInputSize, pyhsm.util.input_validate_str, \ - '1234', 'foo', exact_len = 5) - self.assertEquals('1234', pyhsm.util.input_validate_str('1234', 'foo', exact_len = 4)) + b'1234', 'foo', exact_len = 5) + self.assertEqual(b'1234', pyhsm.util.input_validate_str(b'1234', 'foo', exact_len = 4)) diff --git a/test/test_yubikey_validate.py b/test/test_yubikey_validate.py index f1bb8e7..3e99f73 100644 --- a/test/test_yubikey_validate.py +++ b/test/test_yubikey_validate.py @@ -6,18 +6,18 @@ import unittest import pyhsm -import test_common -from test_common import YubiKeyEmu, YubiKeyRnd +from . import test_common +from .test_common import YubiKeyEmu, YubiKeyRnd class TestYubikeyValidate(test_common.YHSM_TestCase): def setUp(self): test_common.YHSM_TestCase.setUp(self) - self.yk_key = 'F' * 16 # 128 bit AES key - self.yk_uid = '\x4d\x01\x4d\x02\x4d\x4d' + self.yk_key = b'F' * 16 # 128 bit AES key + self.yk_uid = b'\x4d\x01\x4d\x02\x4d\x4d' self.yk_rnd = YubiKeyRnd(self.yk_uid) - self.yk_public_id = '4d4d4d4d4d4d'.decode('hex') + self.yk_public_id = bytes.fromhex('4d4d4d4d4d4d') secret = pyhsm.aead_cmd.YHSM_YubiKeySecret(self.yk_key, self.yk_uid) self.hsm.load_secret(secret) @@ -34,17 +34,17 @@ def test_validate_aead_cmp(self): secret = pyhsm.aead_cmd.YHSM_YubiKeySecret(self.yk_key, self.yk_uid) cleartext = secret.pack() self.assertTrue(self.hsm.validate_aead(self.yk_public_id, self.kh_validate, self.aead, cleartext)) - wrong_cleartext = 'X' + cleartext[1:] + wrong_cleartext = b'X' + cleartext[1:] self.assertFalse(self.hsm.validate_aead(self.yk_public_id, self.kh_validate, self.aead, wrong_cleartext)) def test_validate_aead_cmp_long(self): """ Test validating a long AEAD """ - cleartext = 'C' * 36 + cleartext = b'C' * 36 key_handle = 0x2000 # key 0x2000 has all flags set - nonce = '123456' + nonce = b'123456' aead = self.hsm.generate_aead_simple(nonce, key_handle, cleartext) self.assertTrue(self.hsm.validate_aead(nonce, key_handle, aead, cleartext)) - wrong_cleartext = 'X' + cleartext[1:] + wrong_cleartext = b'X' + cleartext[1:] self.assertFalse(self.hsm.validate_aead(nonce, key_handle, aead, wrong_cleartext)) def test_validate_yubikey(self): @@ -55,15 +55,15 @@ def test_validate_yubikey(self): def test_modhex_encode_decode(self): """ Test modhex encoding/decoding. """ - h = '4d014d024d4ddd5382b11195144da07d' - self.assertEquals(h, pyhsm.yubikey.modhex_decode( pyhsm.yubikey.modhex_encode(h) ) ) + h = b'4d014d024d4ddd5382b11195144da07d' + self.assertEqual(h, pyhsm.yubikey.modhex_decode( pyhsm.yubikey.modhex_encode(h) ) ) def test_split_id_otp(self): """ Test public_id + OTP split function. """ - public_id, otp, = pyhsm.yubikey.split_id_otp("ft" * 16) - self.assertEqual(public_id, '') - self.assertEqual(otp, "ft" * 16) + public_id, otp, = pyhsm.yubikey.split_id_otp(("\xff" * 16)) + self.assertEqual(public_id, b'') + self.assertEqual(otp, "\xff" * 16) - public_id, otp, = pyhsm.yubikey.split_id_otp("cc" + "ft" * 16) - self.assertEqual(public_id, 'cc') - self.assertEqual(otp, "ft" * 16) + public_id, otp, = pyhsm.yubikey.split_id_otp("\xcc" + "\xff" * 16) + self.assertEqual(public_id, '\xcc') + self.assertEqual(otp, "\xff" * 16) From a4c1084c8ebb51fc4ab5af9307a225c75b2c16d8 Mon Sep 17 00:00:00 2001 From: IchiBan360 <79260104+IchiBan360@users.noreply.github.com> Date: Mon, 5 Aug 2024 15:38:25 +0300 Subject: [PATCH 2/7] check to see if payload is actually bytes or not --- pyhsm/cmd.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pyhsm/cmd.py b/pyhsm/cmd.py index d40296e..ef126bb 100644 --- a/pyhsm/cmd.py +++ b/pyhsm/cmd.py @@ -62,6 +62,8 @@ def execute(self, read_response=True): cmd_buf = struct.pack('BB', len(self.payload) + 1, self.command) else: cmd_buf = bytes([self.command]) + if not isinstance(self.payload, (bytes, bytearray)): + self.payload = self.payload.encode() cmd_buf += self.payload debug_info = None unlock = self.stick.acquire() From 4cbe026287c06103cad89ce7889698e866ee5112 Mon Sep 17 00:00:00 2001 From: IchiBan360 <79260104+IchiBan360@users.noreply.github.com> Date: Mon, 5 Aug 2024 15:57:28 +0300 Subject: [PATCH 3/7] Changed back default HSM path --- test/test_init.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/test_init.py b/test/test_init.py index 86aa155..03ee523 100644 --- a/test/test_init.py +++ b/test/test_init.py @@ -54,7 +54,7 @@ def suite(): # Check if we have a YubiHSM present, and start with locking it's keystore # XXX produce a better error message than 'error: None' when initializing fails - hsm = pyhsm.YHSM(device = os.getenv('YHSM_DEVICE', '/dev/tty.usbmodem25142549281')) + hsm = pyhsm.YHSM(device = os.getenv('YHSM_DEVICE', '/dev/ttyACM0')) try: hsm.unlock(b"BADPASSPHRASE99") except pyhsm.exception.YHSM_CommandFailed as e: From 2259bf714bb18068bc2e818bd8c4a4e4fcc30ec6 Mon Sep 17 00:00:00 2001 From: IchiBan360 <79260104+IchiBan360@users.noreply.github.com> Date: Wed, 7 Aug 2024 10:54:37 +0300 Subject: [PATCH 4/7] Check from_key type --- pyhsm/yubikey.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/pyhsm/yubikey.py b/pyhsm/yubikey.py index e3e130b..d0d2e88 100644 --- a/pyhsm/yubikey.py +++ b/pyhsm/yubikey.py @@ -125,12 +125,16 @@ def split_id_otp(from_key): @returns: public_id and OTP @rtype: tuple of string """ - if len(from_key) > 16: - public_id, otp = from_key[:-16], from_key[-16:] - elif len(from_key) == 16: + if type(from_key) == bytes: + from_key_len = 16 + else: + from_key_len = 32 + if len(from_key) > from_key_len: + public_id, otp = from_key[:-from_key_len], from_key[-from_key_len:] + elif len(from_key) == from_key_len: public_id = b'' otp = from_key else: - raise pyhsm.exception.YHSM_Error("Bad from_key length %i < 16 : %s" \ - % (len(from_key), from_key)) + raise pyhsm.exception.YHSM_Error("Bad from_key length %i < %i : %s" \ + % (len(from_key), from_key_len, from_key)) return public_id, otp From ba11390cb5de36dfc93c62d669f5c363dd8ab3e6 Mon Sep 17 00:00:00 2001 From: IchiBan360 <79260104+IchiBan360@users.noreply.github.com> Date: Wed, 7 Aug 2024 11:35:53 +0300 Subject: [PATCH 5/7] remove configure_hsm from test_modules --- test/test_init.py | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/test/test_init.py b/test/test_init.py index 03ee523..517e8a7 100644 --- a/test/test_init.py +++ b/test/test_init.py @@ -21,24 +21,23 @@ from . import test_soft_hsm from . import configure_hsm -test_modules = [#configure_hsm, - test_aead, #ok - test_aes_ecb, #ok - test_basics, #ok - test_buffer, #ok - test_db, #ok - test_hmac, #ok - test_oath, #ok - test_otp_validate, #ok - test_stick, #ok - test_util, #ok - test_yubikey_validate, #ok - test_misc, #ok - test_soft_hsm, #kazkas blogai su crypto, ziuret kaip counteri pakeist i dict(able) objecta, nes kazkodel baitai netinka +test_modules = [test_aead, + test_aes_ecb, + test_basics, + test_buffer, + test_db, + test_hmac, + test_oath, + test_otp_validate, + test_stick, + test_util, + test_yubikey_validate, + test_misc, + test_soft_hsm, ] # special, should not be addded to test_modules - +import configure_hsm def suite(): From 3af5a2b6d2c269764141b9c872dbb693c68e78ca Mon Sep 17 00:00:00 2001 From: IchiBan360 <79260104+IchiBan360@users.noreply.github.com> Date: Wed, 7 Aug 2024 11:50:45 +0300 Subject: [PATCH 6/7] converted html write data to bytes --- pyhsm/ksm/yubikey_ksm.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pyhsm/ksm/yubikey_ksm.py b/pyhsm/ksm/yubikey_ksm.py index 4373697..9caba77 100644 --- a/pyhsm/ksm/yubikey_ksm.py +++ b/pyhsm/ksm/yubikey_ksm.py @@ -102,14 +102,14 @@ def do_GET(self): self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() - self.wfile.write(val_res) - self.wfile.write("\n") + self.wfile.write(bytes(val_res, 'utf-8')) + self.wfile.write(bytes("\n", 'utf-8')) elif self.stats_url and self.path == self.stats_url: self.send_response(200) self.send_header('Content-type', 'text/html') self.end_headers() for key in stats: - self.wfile.write("%s %d\n" % (key, stats[key])) + self.wfile.write(bytes("%s %d\n" % (key, stats[key]), 'utf-8')) else: self.log_error("Bad URL '%s' - I'm serving '%s' (responding 403)" % (self.path, self.serve_url)) self.send_response(403, 'Forbidden') From 99f80c3da52ab9c858373399058fec435a20ce7f Mon Sep 17 00:00:00 2001 From: IchiBan360 <79260104+IchiBan360@users.noreply.github.com> Date: Wed, 7 Aug 2024 12:34:45 +0300 Subject: [PATCH 7/7] Update cmd.py --- pyhsm/cmd.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pyhsm/cmd.py b/pyhsm/cmd.py index ef126bb..60d6916 100644 --- a/pyhsm/cmd.py +++ b/pyhsm/cmd.py @@ -137,8 +137,8 @@ def _handle_invalid_read_response(self, res, expected_len): for this in lines: if re.match('^(NO_CFG|WSAPI|HSM).*> .*', this): raise pyhsm.exception.YHSM_Error('YubiHSM is in configuration mode') - raise pyhsm.exception.YHSM_Error('Unknown response from serial device %s, %s : "%s"' \ - % (self.stick.device, res, len(res.encode()))) + raise pyhsm.exception.YHSM_Error('Unknown response from serial device %s : "%s"' \ + % (self.stick.device, res())) def parse_result(self, data): """