From 59429146dbd12bac4fa578820e3658b99fd0279e Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Tue, 11 Nov 2025 22:47:21 +0100 Subject: [PATCH 01/13] WIP: add PRONOM export --- src/jsonid/export.py | 21 +++++++++-- src/jsonid/jsonid.py | 4 ++- src/jsonid/pronom.py | 85 ++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 106 insertions(+), 4 deletions(-) create mode 100644 src/jsonid/pronom.py diff --git a/src/jsonid/export.py b/src/jsonid/export.py index c062aa8..f3d55f5 100644 --- a/src/jsonid/export.py +++ b/src/jsonid/export.py @@ -6,20 +6,21 @@ from datetime import timezone try: + import pronom import registry_data import version except ModuleNotFoundError: try: - from src.jsonid import registry_data, version + from src.jsonid import pronom, registry_data, version except ModuleNotFoundError: - from jsonid import registry_data, version + from jsonid import pronom, registry_data, version logger = logging.getLogger(__name__) def exportJSON() -> None: # pylint: disable=C0103 """Export to JSON.""" - logger.debug("exporting registry ad JSON") + logger.debug("exporting registry as JSON") data = registry_data.registry() json_obj = [] id_ = { @@ -35,3 +36,17 @@ def exportJSON() -> None: # pylint: disable=C0103 for datum in data: json_obj.append(datum.json()) print(json.dumps(json_obj, indent=2)) + + +def exportPRONOM() -> None: + """Export a PRONOM compatible set of signatures.""" + logger.debug("exporting registry as PRONOM") + data = registry_data.registry() + for datum in data: + print("---") + pronom.process_markers(datum.json()["markers"]) + # break + + +def exportPRONOMXML() -> None: + """Export a PRONOM compatible set of signatures.""" diff --git a/src/jsonid/jsonid.py b/src/jsonid/jsonid.py index 0dfa6b8..aae413d 100644 --- a/src/jsonid/jsonid.py +++ b/src/jsonid/jsonid.py @@ -160,6 +160,7 @@ def main() -> None: "--pronom", help="return a PRONOM-centric view of the results", required=False, + action="store_true", ) parser.add_argument( "--export", @@ -220,7 +221,8 @@ def main() -> None: if args.registry: raise NotImplementedError("custom registry is not yet available") if args.pronom: - raise NotImplementedError("pronom view is not yet implemented") + export.exportPRONOM() + sys.exit() if args.language: raise NotImplementedError("multiple languages are not yet implemented") if args.export: diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py new file mode 100644 index 0000000..1192a0d --- /dev/null +++ b/src/jsonid/pronom.py @@ -0,0 +1,85 @@ +"""PRONOM export routines.""" + +import logging + +logger = logging.getLogger(__name__) + + +def _str_to_hex_str(s: str) -> str: + """todo...""" + + k = "" + for c in s: + b = hex(ord(c)) + k = f"{k}{b}" + return k.replace("0x", "") + + +def process_markers(markers: list): + """todo... + + dict_keys(['CONTAINS']) + dict_keys(['ENDSWITH']) + dict_keys(['IS'] + dict_keys(['ISTYPE']) + dict_keys(['STARTSWITH']) + + """ + + print("1. {0-4095}7B") + + for idx, marker in enumerate(markers, 2): + + if "GOTO" in marker.keys(): + logger.error("GOTO not yet handled") + break + if "INDEX" in marker.keys(): + logger.error("INDEX not yet handled") + break + + k1 = _str_to_hex_str(marker["KEY"]) + + # how to model whitespace? + s = f"22{k1.upper()}22" + + if "EXISTS" in marker.keys(): + print(f"{idx}.", s) + continue + + if "ISTYPE" in marker.keys(): + logger.info("no idea how to handle ISTYPE...") + """ + boolean == true/false + int == lexicographically between 30 and 39? 0 and 65000? + string... length is a problem... + list == begins with [ + dict == begins with { + """ + break + + if "IS" in marker.keys(): + k2 = _str_to_hex_str(marker["KEY"]) + isk = f"{idx}. 22{k2}22" + print(isk) + continue + + if "STARTSWITH" in marker.keys(): + k2 = _str_to_hex_str(marker["KEY"]) + isk = f"{idx}. 22{k2}" + print(isk) + continue + + if "ENDSWITH" in marker.keys(): + k2 = _str_to_hex_str(marker["KEY"]) + isk = f"{idx}. {k2}22" + print(isk) + continue + + if "CONTAINS" in marker.keys(): + k2 = _str_to_hex_str(marker["KEY"]) + isk = f"{idx}. *{k2}*" + print(isk) + continue + + marker.pop("KEY") + print(marker.keys()) From a6c792002a828fc06bfab05b0a00ab3fd6c16856 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Tue, 11 Nov 2025 23:30:37 +0100 Subject: [PATCH 02/13] WIP: exception improves return --- src/jsonid/export.py | 11 ++++++++--- src/jsonid/pronom.py | 31 ++++++++++++++++++++----------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/src/jsonid/export.py b/src/jsonid/export.py index f3d55f5..bb92e61 100644 --- a/src/jsonid/export.py +++ b/src/jsonid/export.py @@ -43,9 +43,14 @@ def exportPRONOM() -> None: logger.debug("exporting registry as PRONOM") data = registry_data.registry() for datum in data: - print("---") - pronom.process_markers(datum.json()["markers"]) - # break + print("--- START ---") + print(f"--- {datum.json()['identifier']} {datum.json()['name']} ---") + try: + res = pronom.process_markers(datum.json()["markers"]) + for r in res: + print(r) + except pronom.UnprocessableEntity as err: + logger.error("can't yet process: %s", err) def exportPRONOMXML() -> None: diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index 1192a0d..915ab4c 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -5,6 +5,11 @@ logger = logging.getLogger(__name__) +class UnprocessableEntity(Exception): + """Provide a way to give complete feedback to the caller to allow + it to exit.""" + + def _str_to_hex_str(s: str) -> str: """todo...""" @@ -15,7 +20,7 @@ def _str_to_hex_str(s: str) -> str: return k.replace("0x", "") -def process_markers(markers: list): +def process_markers(markers: list) -> list: """todo... dict_keys(['CONTAINS']) @@ -26,16 +31,19 @@ def process_markers(markers: list): """ - print("1. {0-4095}7B") + res = [] + + res.append("1. {0-4095}7B") for idx, marker in enumerate(markers, 2): if "GOTO" in marker.keys(): logger.error("GOTO not yet handled") - break + raise UnprocessableEntity("GOTO") + if "INDEX" in marker.keys(): logger.error("INDEX not yet handled") - break + raise UnprocessableEntity("INDEX") k1 = _str_to_hex_str(marker["KEY"]) @@ -43,11 +51,10 @@ def process_markers(markers: list): s = f"22{k1.upper()}22" if "EXISTS" in marker.keys(): - print(f"{idx}.", s) + res.append(f"{idx}.{s}") continue if "ISTYPE" in marker.keys(): - logger.info("no idea how to handle ISTYPE...") """ boolean == true/false int == lexicographically between 30 and 39? 0 and 65000? @@ -55,31 +62,33 @@ def process_markers(markers: list): list == begins with [ dict == begins with { """ - break + raise UnprocessableEntity("ISTYPE") if "IS" in marker.keys(): k2 = _str_to_hex_str(marker["KEY"]) isk = f"{idx}. 22{k2}22" - print(isk) + res.append(isk) continue if "STARTSWITH" in marker.keys(): k2 = _str_to_hex_str(marker["KEY"]) isk = f"{idx}. 22{k2}" - print(isk) + res.append(isk) continue if "ENDSWITH" in marker.keys(): k2 = _str_to_hex_str(marker["KEY"]) isk = f"{idx}. {k2}22" - print(isk) + res.append(isk) continue if "CONTAINS" in marker.keys(): k2 = _str_to_hex_str(marker["KEY"]) isk = f"{idx}. *{k2}*" - print(isk) + res.append(isk) continue marker.pop("KEY") print(marker.keys()) + + return res From 1ef376836282d101f1e989a94dd9ae99edd047b3 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Wed, 12 Nov 2025 09:15:19 +0100 Subject: [PATCH 03/13] WIP: more types --- src/jsonid/pronom.py | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index 915ab4c..c1643da 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -10,6 +10,17 @@ class UnprocessableEntity(Exception): it to exit.""" +def _type_to_str(t: type) -> str: + """todo...""" + if t == "integer": + # how do we represent larger numbers? + return "[30:39]" + if t == "bool": + # true | false + return "22(74727565|66616C7365)22" + raise UnprocessableEntity(f"{t}") + + def _str_to_hex_str(s: str) -> str: """todo...""" @@ -29,6 +40,9 @@ def process_markers(markers: list) -> list: dict_keys(['ISTYPE']) dict_keys(['STARTSWITH']) + + key(0-n):(0-n)value + """ res = [] @@ -62,7 +76,10 @@ def process_markers(markers: list) -> list: list == begins with [ dict == begins with { """ - raise UnprocessableEntity("ISTYPE") + t = _type_to_str(marker["ISTYPE"]) + k2 = f"{t}" + res.append(k2) + continue if "IS" in marker.keys(): k2 = _str_to_hex_str(marker["KEY"]) From 179a9cd09ed0a42b15c2b74f48dfea40bb59745e Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Wed, 12 Nov 2025 22:17:35 +0100 Subject: [PATCH 04/13] WIP: annotations --- src/jsonid/pronom.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index c1643da..d63e14a 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -12,12 +12,19 @@ class UnprocessableEntity(Exception): def _type_to_str(t: type) -> str: """todo...""" - if t == "integer": - # how do we represent larger numbers? + if t == "integer" or t == "float": + # how do we represent larger numbers? and do we need to? return "[30:39]" if t == "bool": # true | false return "22(74727565|66616C7365)22" + if t == "map": + # { == 7B. + return "7B" + if t == "list": + # [ == 5B. + return "5B" + # This should only be string at this point. raise UnprocessableEntity(f"{t}") @@ -31,9 +38,13 @@ def _str_to_hex_str(s: str) -> str: return k.replace("0x", "") -def process_markers(markers: list) -> list: +def process_markers(markers: list) -> tuple[list | bool]: """todo... + returns a tuple describing the processed value and a flag to + highlight the result is potentially lossless, e.g. in the case + of matching types, e.g. strings. + dict_keys(['CONTAINS']) dict_keys(['ENDSWITH']) dict_keys(['IS'] @@ -52,11 +63,15 @@ def process_markers(markers: list) -> list: for idx, marker in enumerate(markers, 2): if "GOTO" in marker.keys(): - logger.error("GOTO not yet handled") + # first key exists like regular key, then we have to + # search for the next key... + logger.error("GOTO not yet handled: %s", marker) raise UnprocessableEntity("GOTO") if "INDEX" in marker.keys(): - logger.error("INDEX not yet handled") + # first we have a square bracket that then needs a search + # parameter for the next key... + logger.error("INDEX not yet handled: %s", marker) raise UnprocessableEntity("INDEX") k1 = _str_to_hex_str(marker["KEY"]) @@ -77,7 +92,7 @@ def process_markers(markers: list) -> list: dict == begins with { """ t = _type_to_str(marker["ISTYPE"]) - k2 = f"{t}" + k2 = f"{idx} {t}" res.append(k2) continue From 51938c981b6bc333cb573ae2cf1752e41f64d90d Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Sun, 16 Nov 2025 14:04:47 +0000 Subject: [PATCH 05/13] WIP: string handling --- src/jsonid/pronom.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index d63e14a..56f2c13 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -18,13 +18,16 @@ def _type_to_str(t: type) -> str: if t == "bool": # true | false return "22(74727565|66616C7365)22" + if t == "string": + # string begins with a double quote and ends in a double quote. + return "22*22" if t == "map": # { == 7B. return "7B" if t == "list": # [ == 5B. return "5B" - # This should only be string at this point. + # This should only trigger for incorrect values at this point.. raise UnprocessableEntity(f"{t}") From d867c40f83386093cbfc41fe3b358ec723b70cb9 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Mon, 17 Nov 2025 21:18:30 +0000 Subject: [PATCH 06/13] WIP: consts --- src/jsonid/helpers.py | 28 ++++++++++++++++++++-------- src/jsonid/pronom.py | 30 +++++++++++++++++++++--------- 2 files changed, 41 insertions(+), 17 deletions(-) diff --git a/src/jsonid/helpers.py b/src/jsonid/helpers.py index ca28bc1..11ae524 100644 --- a/src/jsonid/helpers.py +++ b/src/jsonid/helpers.py @@ -2,7 +2,7 @@ import logging import time -from typing import Union +from typing import Union, Final try: import htm_template @@ -75,6 +75,18 @@ def format_marker(marker_text: str, marker: dict) -> str: return f"{marker_text}{marker_formatted}\n" +TYPE_BOOL: Final[str] = "bool" +TYPE_FLOAT: Final[str] = "float" +TYPE_INTEGER: Final[str] = "integer" +TYPE_LIST: Final[str] = "list" +TYPE_NONE: Final[str] = "NoneType" +TYPE_MAP: Final[str] = "map" +TYPE_STRING: Final[str] = "string" + + + + + def substitute_type_text(replace_me: Union[str, type]): """Output a text substitution for a type that will otherwise not pretty-print. @@ -83,19 +95,19 @@ def substitute_type_text(replace_me: Union[str, type]): # pylint: disable=R0911 if replace_me.__name__ == "dict": - return "map" + return TYPE_MAP if replace_me.__name__ == "int": - return "integer" + return TYPE_INTEGER if replace_me.__name__ == "list": - return "list" + return TYPE_LIST if replace_me.__name__ == "str": - return "string" + return TYPE_STRING if replace_me.__name__ == "float": - return "float" + return TYPE_NONE if replace_me.__name__ == "bool": - return "bool" + return TYPE_BOOL if replace_me.__name__ == "NoneType": - return "NoneType" + return TYPE_NONE if not isinstance(replace_me, type): pass return replace_me diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index 56f2c13..cd14e64 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -2,9 +2,21 @@ import logging +try: + import helpers +except ModuleNotFoundError: + try: + from src.jsonid import helpers + except ModuleNotFoundError: + from jsonid import helpers + + logger = logging.getLogger(__name__) + + + class UnprocessableEntity(Exception): """Provide a way to give complete feedback to the caller to allow it to exit.""" @@ -12,21 +24,21 @@ class UnprocessableEntity(Exception): def _type_to_str(t: type) -> str: """todo...""" - if t == "integer" or t == "float": + if t == helpers.TYPE_INTEGER or t == helpers.TYPE_FLOAT: # how do we represent larger numbers? and do we need to? return "[30:39]" - if t == "bool": + if t == helpers.TYPE_BOOL: # true | false return "22(74727565|66616C7365)22" - if t == "string": + if t == helpers.TYPE_STRING: # string begins with a double quote and ends in a double quote. return "22*22" - if t == "map": - # { == 7B. - return "7B" - if t == "list": - # [ == 5B. - return "5B" + if t == helpers.TYPE_MAP: + # { == 7B; } == 7D + return "7B*7D" + if t == helpers.TYPE_LIST: + # [ == 5B; ] == 5D + return "5B*5D" # This should only trigger for incorrect values at this point.. raise UnprocessableEntity(f"{t}") From 06f9c0baae0fe516f36426866a2345286676e186 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Mon, 17 Nov 2025 23:56:35 +0100 Subject: [PATCH 07/13] WIP: conversions nearly complete --- src/jsonid/export.py | 18 ++++-- src/jsonid/helpers.py | 5 +- src/jsonid/pronom.py | 132 +++++++++++++++++++++++------------------- 3 files changed, 86 insertions(+), 69 deletions(-) diff --git a/src/jsonid/export.py b/src/jsonid/export.py index bb92e61..20c47e2 100644 --- a/src/jsonid/export.py +++ b/src/jsonid/export.py @@ -43,14 +43,20 @@ def exportPRONOM() -> None: logger.debug("exporting registry as PRONOM") data = registry_data.registry() for datum in data: - print("--- START ---") - print(f"--- {datum.json()['identifier']} {datum.json()['name']} ---") + markers = datum.json()["markers"] try: - res = pronom.process_markers(datum.json()["markers"]) - for r in res: - print(r) + _ = pronom.process_markers(markers.copy()) except pronom.UnprocessableEntity as err: - logger.error("can't yet process: %s", err) + logger.error( + "%s %s: cannot handle: %s", + datum.json()["identifier"], + datum.json()["name"], + err, + ) + for marker in markers: + logger.debug("--- START ---") + logger.debug("marker: %s", marker) + logger.debug("--- END ---") def exportPRONOMXML() -> None: diff --git a/src/jsonid/helpers.py b/src/jsonid/helpers.py index 11ae524..3a0ccac 100644 --- a/src/jsonid/helpers.py +++ b/src/jsonid/helpers.py @@ -2,7 +2,7 @@ import logging import time -from typing import Union, Final +from typing import Final, Union try: import htm_template @@ -84,9 +84,6 @@ def format_marker(marker_text: str, marker: dict) -> str: TYPE_STRING: Final[str] = "string" - - - def substitute_type_text(replace_me: Union[str, type]): """Output a text substitution for a type that will otherwise not pretty-print. diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index cd14e64..fccfd34 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -1,22 +1,21 @@ """PRONOM export routines.""" import logging +from typing import Final try: import helpers + import registry_matchers except ModuleNotFoundError: try: - from src.jsonid import helpers + from src.jsonid import helpers, registry_matchers except ModuleNotFoundError: - from jsonid import helpers + from jsonid import helpers, registry_matchers logger = logging.getLogger(__name__) - - - class UnprocessableEntity(Exception): """Provide a way to give complete feedback to the caller to allow it to exit.""" @@ -39,13 +38,20 @@ def _type_to_str(t: type) -> str: if t == helpers.TYPE_LIST: # [ == 5B; ] == 5D return "5B*5D" + if t == helpers.TYPE_NONE: + # null + return "6E756C6C" # This should only trigger for incorrect values at this point.. - raise UnprocessableEntity(f"{t}") + raise UnprocessableEntity(f"type_to_str: {t}") -def _str_to_hex_str(s: str) -> str: +def _complex_is_type() -> str: """todo...""" + raise UnprocessableEntity("complex IS type") + +def _str_to_hex_str(s: str) -> str: + """todo...""" k = "" for c in s: b = hex(ord(c)) @@ -71,71 +77,79 @@ def process_markers(markers: list) -> tuple[list | bool]: """ + COLON: Final[str] = "3A" + CURLY_OPEN: Final[str] = "7B" + SQUARE_OPEN: Final[str] = "5B" + DOUBLE_QUOTE: Final[str] = "22" + WS: Final[str] = "(0-10)" + res = [] - res.append("1. {0-4095}7B") + res.append("{0-4095}7B") for idx, marker in enumerate(markers, 2): - if "GOTO" in marker.keys(): + logger.debug("marker: %s", marker) + + if registry_matchers.MARKER_GOTO in marker.keys(): # first key exists like regular key, then we have to # search for the next key... - logger.error("GOTO not yet handled: %s", marker) - raise UnprocessableEntity("GOTO") - - if "INDEX" in marker.keys(): + k0 = _str_to_hex_str(marker["GOTO"]) + k1 = _str_to_hex_str(marker["KEY"]) + k0 = f"{DOUBLE_QUOTE}{k0}{DOUBLE_QUOTE}" + k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" + k1 = f"{k0}{WS}{COLON}*{WS}{k1}{WS}{COLON}" + marker.pop("KEY") + if registry_matchers.MARKER_INDEX in marker.keys(): # first we have a square bracket that then needs a search - # parameter for the next key... - logger.error("INDEX not yet handled: %s", marker) - raise UnprocessableEntity("INDEX") - - k1 = _str_to_hex_str(marker["KEY"]) - - # how to model whitespace? - s = f"22{k1.upper()}22" - - if "EXISTS" in marker.keys(): - res.append(f"{idx}.{s}") + # parameter for the next object (curly bracket) and then + # key... + k0 = SQUARE_OPEN + k1 = _str_to_hex_str(marker["KEY"]) + k1 = f"{WS}{k0}*{CURLY_OPEN}*{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" + if "KEY" in marker.keys(): + k1 = _str_to_hex_str(marker["KEY"]) + k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" + marker.pop("KEY") + # Given a key, each of the remaining rule parts must result in + # exiting early. + if registry_matchers.MARKER_KEY_EXISTS in marker.keys(): + res.append(f"k.{k1}{WS}{COLON}".upper()) continue - - if "ISTYPE" in marker.keys(): - """ - boolean == true/false - int == lexicographically between 30 and 39? 0 and 65000? - string... length is a problem... - list == begins with [ - dict == begins with { - """ + if registry_matchers.MARKER_IS_TYPE in marker.keys(): t = _type_to_str(marker["ISTYPE"]) - k2 = f"{idx} {t}" - res.append(k2) + k1 = f"k.{k1}{WS}{COLON}{WS} v.{t}" + res.append(k1.upper()) continue - - if "IS" in marker.keys(): - k2 = _str_to_hex_str(marker["KEY"]) - isk = f"{idx}. 22{k2}22" - res.append(isk) + if registry_matchers.MARKER_IS in marker.keys(): + marker_is = marker["IS"] + if not isinstance(marker_is, str): + _complex_is_type() + k2 = _str_to_hex_str(marker_is) + isk = f"k.{k1}{WS}{COLON}{WS} v.{k2}" + res.append(isk.upper()) continue - - if "STARTSWITH" in marker.keys(): - k2 = _str_to_hex_str(marker["KEY"]) - isk = f"{idx}. 22{k2}" - res.append(isk) + if registry_matchers.MARKER_STARTSWITH in marker.keys(): + k2 = _str_to_hex_str(marker["STARTSWITH"]) + isk = f"k.{k1}{WS}{COLON}{WS} v.22{k2}" + res.append(isk.upper()) continue - - if "ENDSWITH" in marker.keys(): - k2 = _str_to_hex_str(marker["KEY"]) - isk = f"{idx}. {k2}22" - res.append(isk) + if registry_matchers.MARKER_ENDSWITH in marker.keys(): + k2 = _str_to_hex_str(marker["ENDSWITH"]) + isk = f"k.{k1}{WS}{COLON}{WS} v.*{k2}22" + res.append(isk.upper()) continue - - if "CONTAINS" in marker.keys(): - k2 = _str_to_hex_str(marker["KEY"]) - isk = f"{idx}. *{k2}*" - res.append(isk) + if registry_matchers.MARKER_CONTAINS in marker.keys(): + k2 = _str_to_hex_str(marker["CONTAINS"]) + isk = f"k.{k1}{WS}{COLON}{WS} v.*{k2}*" + res.append(isk.upper()) continue - - marker.pop("KEY") - print(marker.keys()) - + if registry_matchers.MARKER_REGEX in marker.keys(): + raise UnprocessableEntity("REGEX not yet implemented") + if registry_matchers.MARKER_KEY_NO_EXIST in marker.keys(): + raise UnprocessableEntity("KEY NO EXIST not yet implemented") + res.append("7D{0-4095}") + # Debug logging to demonstrate output. + for idx, item in enumerate(res, 1): + logger.debug("%s. %s", idx, item) return res From cf775550e3fc1744724e45800eae0127a010bf79 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Wed, 19 Nov 2025 23:54:16 +0100 Subject: [PATCH 08/13] WIP: moving the posts --- src/jsonid/export.py | 26 +++++++++++++++++++++++--- src/jsonid/pronom.py | 20 ++++++++++++-------- 2 files changed, 35 insertions(+), 11 deletions(-) diff --git a/src/jsonid/export.py b/src/jsonid/export.py index 20c47e2..f09c530 100644 --- a/src/jsonid/export.py +++ b/src/jsonid/export.py @@ -42,21 +42,41 @@ def exportPRONOM() -> None: """Export a PRONOM compatible set of signatures.""" logger.debug("exporting registry as PRONOM") data = registry_data.registry() + all_sequences = [] for datum in data: + + id_ = datum.json()["identifier"] + name_ = datum.json()["name"] + markers = datum.json()["markers"] try: - _ = pronom.process_markers(markers.copy()) + sequences = pronom.process_markers(markers.copy()) + all_sequences.append((id_, name_, sequences)) except pronom.UnprocessableEntity as err: logger.error( "%s %s: cannot handle: %s", - datum.json()["identifier"], - datum.json()["name"], + id_, + name_, err, ) for marker in markers: logger.debug("--- START ---") logger.debug("marker: %s", marker) logger.debug("--- END ---") + # Process all the results. + for sequences in all_sequences: + if not isinstance(sequences[2], list): + raise TypeError + print("-----") + print(f"{sequences[0]}: {sequences[1][0]["@en"]}") + print("") + for idx, sequence in enumerate(sequences[2]): + # Need to return a set of internal signatures: + # + # ... bytesequences... + # + # + print(idx, ".", sequence) def exportPRONOMXML() -> None: diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index fccfd34..0e5cc55 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -75,6 +75,10 @@ def process_markers(markers: list) -> tuple[list | bool]: key(0-n):(0-n)value + Need to return something like: + + + """ COLON: Final[str] = "3A" @@ -85,7 +89,7 @@ def process_markers(markers: list) -> tuple[list | bool]: res = [] - res.append("{0-4095}7B") + res.append("BOF: {0-4095}7B") for idx, marker in enumerate(markers, 2): @@ -114,11 +118,11 @@ def process_markers(markers: list) -> tuple[list | bool]: # Given a key, each of the remaining rule parts must result in # exiting early. if registry_matchers.MARKER_KEY_EXISTS in marker.keys(): - res.append(f"k.{k1}{WS}{COLON}".upper()) + res.append(f"BOF: k.{k1}{WS}{COLON}".upper()) continue if registry_matchers.MARKER_IS_TYPE in marker.keys(): t = _type_to_str(marker["ISTYPE"]) - k1 = f"k.{k1}{WS}{COLON}{WS} v.{t}" + k1 = f"BOF: k.{k1}{WS}{COLON}{WS} v.{t}" res.append(k1.upper()) continue if registry_matchers.MARKER_IS in marker.keys(): @@ -126,29 +130,29 @@ def process_markers(markers: list) -> tuple[list | bool]: if not isinstance(marker_is, str): _complex_is_type() k2 = _str_to_hex_str(marker_is) - isk = f"k.{k1}{WS}{COLON}{WS} v.{k2}" + isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.{k2}" res.append(isk.upper()) continue if registry_matchers.MARKER_STARTSWITH in marker.keys(): k2 = _str_to_hex_str(marker["STARTSWITH"]) - isk = f"k.{k1}{WS}{COLON}{WS} v.22{k2}" + isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.22{k2}" res.append(isk.upper()) continue if registry_matchers.MARKER_ENDSWITH in marker.keys(): k2 = _str_to_hex_str(marker["ENDSWITH"]) - isk = f"k.{k1}{WS}{COLON}{WS} v.*{k2}22" + isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.*{k2}22" res.append(isk.upper()) continue if registry_matchers.MARKER_CONTAINS in marker.keys(): k2 = _str_to_hex_str(marker["CONTAINS"]) - isk = f"k.{k1}{WS}{COLON}{WS} v.*{k2}*" + isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.*{k2}*" res.append(isk.upper()) continue if registry_matchers.MARKER_REGEX in marker.keys(): raise UnprocessableEntity("REGEX not yet implemented") if registry_matchers.MARKER_KEY_NO_EXIST in marker.keys(): raise UnprocessableEntity("KEY NO EXIST not yet implemented") - res.append("7D{0-4095}") + res.append("EOF: 7D{0-4095}") # Debug logging to demonstrate output. for idx, item in enumerate(res, 1): logger.debug("%s. %s", idx, item) From d3fad9de635add16963429214006dbf00d47ac0f Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Sat, 22 Nov 2025 17:30:37 +0100 Subject: [PATCH 09/13] WIP: notes --- src/jsonid/export.py | 1 + src/jsonid/pronom.py | 26 ++++++++++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/src/jsonid/export.py b/src/jsonid/export.py index f09c530..e1386f5 100644 --- a/src/jsonid/export.py +++ b/src/jsonid/export.py @@ -77,6 +77,7 @@ def exportPRONOM() -> None: # # print(idx, ".", sequence) + return def exportPRONOMXML() -> None: diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index 0e5cc55..07c67d5 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -53,6 +53,11 @@ def _complex_is_type() -> str: def _str_to_hex_str(s: str) -> str: """todo...""" k = "" + + x = s.encode("UTF-32") + for g in x: + print(hex(g).replace("0x", "")) + for c in s: b = hex(ord(c)) k = f"{k}{b}" @@ -79,6 +84,24 @@ def process_markers(markers: list) -> tuple[list | bool]: + + Different encodings need to be accounted for, e.g. + + UTF-32: + + 00000000: fffe 0000 2000 0000 2000 0000 2000 0000 .... ... ... ... + 00000010: 2000 0000 2000 0000 2000 0000 0a00 0000 ... ... ....... + 00000020: 0a00 0000 0a00 0000 0a00 0000 7b00 0000 ............{... + 00000030: 2200 0000 6100 0000 2200 0000 3a00 0000 "...a..."...:... + 00000040: 2000 0000 2200 0000 6200 0000 2200 0000 ..."...b..."... + 00000050: 7d00 0000 0a00 0000 }....... + + UTF-16: + + 00000000: fffe 2000 2000 2000 2000 2000 2000 0a00 .. . . . . . ... + 00000010: 0a00 0a00 0a00 7b00 2200 6100 2200 3a00 ......{.".a.".:. + 00000020: 2000 2200 6200 2200 7d00 0a00 .".b.".}... + """ COLON: Final[str] = "3A" @@ -103,6 +126,7 @@ def process_markers(markers: list) -> tuple[list | bool]: k0 = f"{DOUBLE_QUOTE}{k0}{DOUBLE_QUOTE}" k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" k1 = f"{k0}{WS}{COLON}*{WS}{k1}{WS}{COLON}" + marker.pop("GOTO") marker.pop("KEY") if registry_matchers.MARKER_INDEX in marker.keys(): # first we have a square bracket that then needs a search @@ -111,6 +135,8 @@ def process_markers(markers: list) -> tuple[list | bool]: k0 = SQUARE_OPEN k1 = _str_to_hex_str(marker["KEY"]) k1 = f"{WS}{k0}*{CURLY_OPEN}*{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" + marker.pop("INDEX") + marker.pop("KEY") if "KEY" in marker.keys(): k1 = _str_to_hex_str(marker["KEY"]) k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" From 7e670cd6efbccfda30711014d403f829b904a604 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Sun, 23 Nov 2025 18:16:10 +0100 Subject: [PATCH 10/13] WIP: structures --- src/jsonid/pronom.py | 403 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 356 insertions(+), 47 deletions(-) diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index 07c67d5..3d68228 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -1,7 +1,11 @@ """PRONOM export routines.""" +import codecs import logging -from typing import Final +import xml.dom.minidom +from dataclasses import dataclass +from functools import lru_cache +from typing import Final, Any try: import helpers @@ -16,55 +20,286 @@ logger = logging.getLogger(__name__) +UTC_TIME_FORMAT: Final[str] = "%Y-%m-%dT%H:%M:%SZ" +DISK_SECTOR_SIZE: Final[int] = 4095 + + class UnprocessableEntity(Exception): """Provide a way to give complete feedback to the caller to allow it to exit.""" -def _type_to_str(t: type) -> str: +@dataclass +class ExternalSignature: + id: str + signature: str + type: str + + +@dataclass +class ByteSequence: + id: str + pos: str + min_off: str + max_off: str + endian: str + value: str + + +@dataclass +class InternalSignature: + id: str + name: str + byte_sequences: list[ByteSequence] + + +@dataclass +class Priority: + type: str + id: str + + +@dataclass +class Identifier: + type: str + value: str + + +@dataclass +class Format: + id: str + name: str + version: str + puid: str + mime: str + classification: str + external_signatures: list[ExternalSignature] + internal_signatures: list[InternalSignature] + priorities: list[int] + + +def create_many_to_one_byte_sequence(internal_signatures: list[InternalSignature]): + """Create a many to one byte sequence, i.e. a format with multiple + Internal Signatures. + """ + internal_signature = "" + for internal in internal_signatures: + id_ = internal.id + bs = create_one_to_many_byte_sequence(internal.byte_sequences) + internal_signature = f""" +{internal_signature} + {bs} + + """ + return internal_signature.strip() + + +def calculate_variable_off_bof(item: ByteSequence): + """Given variable offsets, calculate the correct syntax.""" + seq = item.value + if ( + item.min_off != "" + and int(item.min_off) > 0 + and item.max_off != "" + and int(item.max_off) > 0 + ): + seq = f"{{{item.min_off}-{int(item.min_off)+int(item.max_off)}}}{seq}" + elif item.max_off != "" and int(item.max_off) > 0: + seq = f"{{0-{item.max_off}}}{seq}" + elif item.min_off != "" and int(item.min_off) > 0: + seq = f"{{{item.min_off}}}{seq}" + return seq + + +def calculate_variable_off_eof(item: ByteSequence): + """Given variable offsets, calculate the correct syntax.""" + seq = item.value + if ( + item.min_off != "" + and int(item.min_off) > 0 + and item.max_off != "" + and int(item.max_off) > 0 + ): + seq = f"{seq}{{{item.min_off}-{int(item.min_off)+int(item.max_off)}}}" + elif item.max_off != "" and int(item.max_off) > 0: + seq = f"{seq}{{0-{item.max_off}}}" + elif item.min_off != "" and int(item.min_off) > 0: + seq = f"{seq}{{{item.min_off}}}" + return seq + + +def create_one_to_many_byte_sequence(byte_sequences: list[ByteSequence]): + """Create a byte sequence object.""" + byte_sequence = "" + for item in byte_sequences: + seq = item.value + if item.pos.startswith("EOF"): + seq = calculate_variable_off_eof(item) + elif item.pos.startswith("BOF"): + seq = calculate_variable_off_bof(item) + byte_sequence = f""" +{byte_sequence.strip()} + + """ + return byte_sequence.strip() + + +def create_file_format_collection(fmt: list[Format]): + """Create the FileFormatCollection object. + + ``` + + 1 + ext + + + + 880 + 881 + ai + 86 + 331 + 332 + 771 + 773 + + ``` + + """ + EXT: Final[str] = "File extension" + internal_sigs = [ + f"{sig.id}" + for sig in fmt.internal_signatures + ] + external_sigs = [ + f"{sig.signature}" + for sig in fmt.external_signatures + if sig.type == EXT + ] + priorities = [ + f"{priority.id}" + for priority in fmt.priorities + ] + ff = f""" + + {"".join(internal_sigs).strip()} + {"".join(external_sigs).strip()} + {"".join(priorities).strip()} + + """ + return ff.strip() + + +def process_formats_and_save(formats: list[Format], filename: str): + """Process the collected formats and output a signature file. + + NB. Given our dataclasses here, we have the opportunity to rework + this data into many new structures. We output XML because DROID + expects XML. + """ + isc = [] + ffc = [] + for fmt in formats: + ffc.append(create_file_format_collection(fmt)) + if fmt.internal_signatures: + isc.append(create_many_to_one_byte_sequence(fmt.internal_signatures)) + droid_template = f""" + + + + {"".join(isc).strip()} + + + {"".join(ffc).strip()} + + + """ + dom = xml.dom.minidom.parseString(droid_template.strip().replace("\n", "")) + pretty_xml = dom.toprettyxml(indent=" ", encoding="utf-8") + prettier_xml = new_prettify(pretty_xml) + logger.info("outputting to: %s", filename) + with open(filename, "w", encoding="utf=8") as output_file: + output_file.write(prettier_xml) + + +def _type_to_str(t: type, encoding: str) -> str: """todo...""" + if t == helpers.TYPE_INTEGER or t == helpers.TYPE_FLOAT: # how do we represent larger numbers? and do we need to? return "[30:39]" if t == helpers.TYPE_BOOL: # true | false - return "22(74727565|66616C7365)22" + return ( + f"{'\x22'.encode(encoding)}(74727565|66616C7365){'\x22'.encode(encoding)}" + ) if t == helpers.TYPE_STRING: # string begins with a double quote and ends in a double quote. - return "22*22" + return f"{'\x22'.encode(encoding)}*{'\x22'.encode(encoding)}" if t == helpers.TYPE_MAP: # { == 7B; } == 7D - return "7B*7D" + return f"{'\x7B'.encode('utf-8')}*{'\x7D'.encode('utf-8')}" if t == helpers.TYPE_LIST: # [ == 5B; ] == 5D - return "5B*5D" + return f"{'\x5b'.encode(encoding)}*{'\x5d'.encode(encoding)}" if t == helpers.TYPE_NONE: # null - return "6E756C6C" + return "\x6e\x75\x6c\x6c".encode(encoding) # This should only trigger for incorrect values at this point.. raise UnprocessableEntity(f"type_to_str: {t}") -def _complex_is_type() -> str: +def _complex_is_type(marker: Any) -> str: """todo...""" - raise UnprocessableEntity("complex IS type") + raise UnprocessableEntity(f"complex IS type: '{marker}' (WIP)") + + +@lru_cache() +def _get_bom(ttl_hash=None) -> list: + """Todo...""" + replaces = [ + codecs.BOM, + codecs.BOM_BE, + codecs.BOM_LE, + codecs.BOM_UTF8, + codecs.BOM_UTF16, + codecs.BOM_UTF16_BE, + codecs.BOM_UTF16_LE, + codecs.BOM_UTF32, + codecs.BOM_UTF32_BE, + codecs.BOM_UTF32_LE, + ] + + res = [] + for bom in replaces: + hex_bom = "" + for marker in bom: + char = hex(marker) + hex_bom = f"{hex_bom}{char.replace("0x", "")}".upper() + res.append(hex_bom) -def _str_to_hex_str(s: str) -> str: - """todo...""" - k = "" + return res - x = s.encode("UTF-32") - for g in x: - print(hex(g).replace("0x", "")) - for c in s: - b = hex(ord(c)) - k = f"{k}{b}" - return k.replace("0x", "") +def _str_to_hex_str(s: str, encoding: str) -> str: + """todo...""" + encoded_s = s.encode(encoding) + bytes = [] + replaces = _get_bom() + for byte_ in encoded_s: + bytes.append(hex(byte_).replace("0x", "")) + hex_str = "".join(bytes).upper() + for bom in replaces: + if not hex_str.startswith(bom): + continue + hex_str = hex_str.replace(bom, "", 1) + break + return hex_str -def process_markers(markers: list) -> tuple[list | bool]: +def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: """todo... returns a tuple describing the processed value and a flag to @@ -85,25 +320,45 @@ def process_markers(markers: list) -> tuple[list | bool]: - Different encodings need to be accounted for, e.g. + Different encodings need to be accounted for, e.g. (with added + whitespace below) + + UTF-32-LE: + + 00000000: 2000 0000 2000 0000 2000 0000 2000 0000 ... ... ... ... + 00000010: 2000 0000 2000 0000 0a00 0000 0a00 0000 ... ........... + 00000020: 0a00 0000 0a00 0000 7b00 0000 2200 0000 ........{..."... + 00000030: 6100 0000 2200 0000 3a00 0000 2000 0000 a..."...:... ... + 00000040: 2200 0000 6200 0000 2200 0000 7d00 0000 "...b..."...}... + 00000050: 0a00 0000 .... - UTF-32: + UTF-32-BE: - 00000000: fffe 0000 2000 0000 2000 0000 2000 0000 .... ... ... ... - 00000010: 2000 0000 2000 0000 2000 0000 0a00 0000 ... ... ....... - 00000020: 0a00 0000 0a00 0000 0a00 0000 7b00 0000 ............{... - 00000030: 2200 0000 6100 0000 2200 0000 3a00 0000 "...a..."...:... - 00000040: 2000 0000 2200 0000 6200 0000 2200 0000 ..."...b..."... - 00000050: 7d00 0000 0a00 0000 }....... + 00000000: 0000 0020 0000 0020 0000 0020 0000 0020 ... ... ... ... + 00000010: 0000 0020 0000 0020 0000 000a 0000 000a ... ... ........ + 00000020: 0000 000a 0000 000a 0000 007b 0000 0022 ...........{..." + 00000030: 0000 0061 0000 0022 0000 003a 0000 0020 ...a..."...:... + 00000040: 0000 0022 0000 0062 0000 0022 0000 007d ..."...b..."...} + 00000050: 0000 000a .... - UTF-16: - 00000000: fffe 2000 2000 2000 2000 2000 2000 0a00 .. . . . . . ... - 00000010: 0a00 0a00 0a00 7b00 2200 6100 2200 3a00 ......{.".a.".:. - 00000020: 2000 2200 6200 2200 7d00 0a00 .".b.".}... + UTF-16-LE: + + 00000000: 2000 2000 2000 2000 2000 2000 0a00 0a00 . . . . . ..... + 00000010: 0a00 0a00 7b00 2200 6100 2200 3a00 2000 ....{.".a.".:. . + 00000020: 2200 6200 2200 7d00 0a00 ".b.".}... + + UTF-16-BE: + + 00000000: 0020 0020 0020 0020 0020 0020 000a 000a . . . . . . .... + 00000010: 000a 000a 007b 0022 0061 0022 003a 0020 .....{.".a.".:. + 00000020: 0022 0062 0022 007d 000a .".b.".}.. + """ + encoding = "utf-8" + COLON: Final[str] = "3A" CURLY_OPEN: Final[str] = "7B" SQUARE_OPEN: Final[str] = "5B" @@ -112,8 +367,6 @@ def process_markers(markers: list) -> tuple[list | bool]: res = [] - res.append("BOF: {0-4095}7B") - for idx, marker in enumerate(markers, 2): logger.debug("marker: %s", marker) @@ -121,8 +374,8 @@ def process_markers(markers: list) -> tuple[list | bool]: if registry_matchers.MARKER_GOTO in marker.keys(): # first key exists like regular key, then we have to # search for the next key... - k0 = _str_to_hex_str(marker["GOTO"]) - k1 = _str_to_hex_str(marker["KEY"]) + k0 = _str_to_hex_str(marker["GOTO"], encoding=encoding) + k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) k0 = f"{DOUBLE_QUOTE}{k0}{DOUBLE_QUOTE}" k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" k1 = f"{k0}{WS}{COLON}*{WS}{k1}{WS}{COLON}" @@ -133,12 +386,12 @@ def process_markers(markers: list) -> tuple[list | bool]: # parameter for the next object (curly bracket) and then # key... k0 = SQUARE_OPEN - k1 = _str_to_hex_str(marker["KEY"]) + k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) k1 = f"{WS}{k0}*{CURLY_OPEN}*{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" marker.pop("INDEX") marker.pop("KEY") if "KEY" in marker.keys(): - k1 = _str_to_hex_str(marker["KEY"]) + k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" marker.pop("KEY") # Given a key, each of the remaining rule parts must result in @@ -147,30 +400,30 @@ def process_markers(markers: list) -> tuple[list | bool]: res.append(f"BOF: k.{k1}{WS}{COLON}".upper()) continue if registry_matchers.MARKER_IS_TYPE in marker.keys(): - t = _type_to_str(marker["ISTYPE"]) + t = _type_to_str(marker["ISTYPE"], encoding=encoding) # TODO... k1 = f"BOF: k.{k1}{WS}{COLON}{WS} v.{t}" res.append(k1.upper()) continue if registry_matchers.MARKER_IS in marker.keys(): marker_is = marker["IS"] if not isinstance(marker_is, str): - _complex_is_type() - k2 = _str_to_hex_str(marker_is) + _complex_is_type(marker_is) + k2 = _str_to_hex_str(marker_is, encoding=encoding) isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.{k2}" res.append(isk.upper()) continue if registry_matchers.MARKER_STARTSWITH in marker.keys(): - k2 = _str_to_hex_str(marker["STARTSWITH"]) + k2 = _str_to_hex_str(marker["STARTSWITH"], encoding=encoding) isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.22{k2}" res.append(isk.upper()) continue if registry_matchers.MARKER_ENDSWITH in marker.keys(): - k2 = _str_to_hex_str(marker["ENDSWITH"]) + k2 = _str_to_hex_str(marker["ENDSWITH"], encoding=encoding) isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.*{k2}22" res.append(isk.upper()) continue if registry_matchers.MARKER_CONTAINS in marker.keys(): - k2 = _str_to_hex_str(marker["CONTAINS"]) + k2 = _str_to_hex_str(marker["CONTAINS"], encoding=encoding) isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.*{k2}*" res.append(isk.upper()) continue @@ -178,8 +431,64 @@ def process_markers(markers: list) -> tuple[list | bool]: raise UnprocessableEntity("REGEX not yet implemented") if registry_matchers.MARKER_KEY_NO_EXIST in marker.keys(): raise UnprocessableEntity("KEY NO EXIST not yet implemented") - res.append("EOF: 7D{0-4095}") + + BOF = f"{{0-{DISK_SECTOR_SIZE}}}7B" + EOF = f"7D{{0-{DISK_SECTOR_SIZE}}}" + + bs_res = [] + + bs_res.append( + ByteSequence( + id=1, + pos="BOF", + min_off=0, + max_off=f"{DISK_SECTOR_SIZE}", + endian="", + value=BOF, + ) + ) + # Debug logging to demonstrate output. - for idx, item in enumerate(res, 1): + for idx, item in enumerate(res, 2): logger.debug("%s. %s", idx, item) - return res + + bs = ByteSequence( + id=idx, + pos="BOF", + min_off=1, + max_off="", + endian="", + value=item, + ) + bs_res.append(bs) + + bs_res.append( + ByteSequence( + id=1, + pos="EOF", + min_off="0", + max_off=f"{DISK_SECTOR_SIZE}", + endian="", + value=BOF, + ) + ) + + """ + class ByteSequence: + id: str + pos: str + min_off: str + max_off: str + endian: str + value: str + """ + + return bs_res + + +def create_baseline_json_sequences(): + """Create baseline JSON sequences that match map and list types + with various different encodings. + """ + + # TODO... From a627ebcf11efb35e4ae1fcf576f235bbd6d38fd8 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Sun, 23 Nov 2025 18:52:23 +0100 Subject: [PATCH 11/13] WIP: FIRST XML!!! --- src/jsonid/export.py | 37 ++++++++++++++++-- src/jsonid/export_helpers.py | 30 +++++++++++++++ src/jsonid/pronom.py | 74 +++++++++++++++++++++++++++--------- 3 files changed, 118 insertions(+), 23 deletions(-) create mode 100644 src/jsonid/export_helpers.py diff --git a/src/jsonid/export.py b/src/jsonid/export.py index e1386f5..8343012 100644 --- a/src/jsonid/export.py +++ b/src/jsonid/export.py @@ -43,15 +43,20 @@ def exportPRONOM() -> None: logger.debug("exporting registry as PRONOM") data = registry_data.registry() all_sequences = [] - for datum in data: + formats = [] + + for datum in data: id_ = datum.json()["identifier"] name_ = datum.json()["name"] - markers = datum.json()["markers"] + + format_sequences = [] + try: sequences = pronom.process_markers(markers.copy()) all_sequences.append((id_, name_, sequences)) + format_sequences.append(sequences) except pronom.UnprocessableEntity as err: logger.error( "%s %s: cannot handle: %s", @@ -63,6 +68,28 @@ def exportPRONOM() -> None: logger.debug("--- START ---") logger.debug("marker: %s", marker) logger.debug("--- END ---") + continue + + format = pronom.Format( + id=0, + name=name_, + version="", + puid=id_, + mime="TODO", + classification="structured text", # TODO: magic + external_signatures=[ + pronom.ExternalSignature( + id=0, + signature="JSON", + type="TODO", + ) + ], + internal_signatures=format_sequences[0], + priorities=[], + ) + + formats.append(format) + # Process all the results. for sequences in all_sequences: if not isinstance(sequences[2], list): @@ -76,8 +103,10 @@ def exportPRONOM() -> None: # ... bytesequences... # # - print(idx, ".", sequence) - return + print(idx, ".", sequence.byte_sequences) + # return + + pronom.process_formats_and_save(formats, "abc.xml") def exportPRONOMXML() -> None: diff --git a/src/jsonid/export_helpers.py b/src/jsonid/export_helpers.py new file mode 100644 index 0000000..0dfec2f --- /dev/null +++ b/src/jsonid/export_helpers.py @@ -0,0 +1,30 @@ +"""Helpers for the export functions.""" + +import datetime +from datetime import timezone +from typing import Final +from xml.dom.minidom import parseString + +UTC_TIME_FORMAT: Final[str] = "%Y-%m-%dT%H:%M:%SZ" + + +def get_utc_timestamp_now(): + """Get a formatted UTC timestamp for 'now' that can be used when + a timestamp is needed. + """ + return datetime.datetime.now(timezone.utc).strftime(UTC_TIME_FORMAT) + + +def new_prettify(c): + """Remove excess newlines from DOM output. + + via: https://stackoverflow.com/a/14493981 + """ + reparsed = parseString(c) + return "\n".join( + [ + line + for line in reparsed.toprettyxml(indent=" " * 2).split("\n") + if line.strip() + ] + ) diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index 3d68228..a1157a9 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -5,22 +5,22 @@ import xml.dom.minidom from dataclasses import dataclass from functools import lru_cache -from typing import Final, Any +from typing import Any, Final try: + import export_helpers import helpers import registry_matchers except ModuleNotFoundError: try: - from src.jsonid import helpers, registry_matchers + from src.jsonid import export_helpers, helpers, registry_matchers except ModuleNotFoundError: - from jsonid import helpers, registry_matchers + from jsonid import export_helpers, helpers, registry_matchers logger = logging.getLogger(__name__) -UTC_TIME_FORMAT: Final[str] = "%Y-%m-%dT%H:%M:%SZ" DISK_SECTOR_SIZE: Final[int] = 4095 @@ -206,7 +206,7 @@ def process_formats_and_save(formats: list[Format], filename: str): isc.append(create_many_to_one_byte_sequence(fmt.internal_signatures)) droid_template = f""" - + {"".join(isc).strip()} @@ -215,9 +215,18 @@ def process_formats_and_save(formats: list[Format], filename: str): """ - dom = xml.dom.minidom.parseString(droid_template.strip().replace("\n", "")) + dom = None + signature_file = droid_template.strip().replace("\n", "") + try: + dom = xml.dom.minidom.parseString(signature_file) + except xml.parsers.expat.ExpatError as err: + logger.error("cannot process xml: %s", err) + print("xxxxxxxxxxxx") + print(signature_file) + return + pretty_xml = dom.toprettyxml(indent=" ", encoding="utf-8") - prettier_xml = new_prettify(pretty_xml) + prettier_xml = export_helpers.new_prettify(pretty_xml) logger.info("outputting to: %s", filename) with open(filename, "w", encoding="utf=8") as output_file: output_file.write(prettier_xml) @@ -231,18 +240,22 @@ def _type_to_str(t: type, encoding: str) -> str: return "[30:39]" if t == helpers.TYPE_BOOL: # true | false + # TODO: + """ENCODE then hexlify... return ( f"{'\x22'.encode(encoding)}(74727565|66616C7365){'\x22'.encode(encoding)}" ) + """ + return "22(74727565|66616C7365)22" if t == helpers.TYPE_STRING: # string begins with a double quote and ends in a double quote. - return f"{'\x22'.encode(encoding)}*{'\x22'.encode(encoding)}" + return "'22*22" if t == helpers.TYPE_MAP: # { == 7B; } == 7D - return f"{'\x7B'.encode('utf-8')}*{'\x7D'.encode('utf-8')}" + return "7B*7D" if t == helpers.TYPE_LIST: # [ == 5B; ] == 5D - return f"{'\x5b'.encode(encoding)}*{'\x5d'.encode(encoding)}" + return "5b*5d" if t == helpers.TYPE_NONE: # null return "\x6e\x75\x6c\x6c".encode(encoding) @@ -469,21 +482,44 @@ def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: min_off="0", max_off=f"{DISK_SECTOR_SIZE}", endian="", - value=BOF, + value=EOF, ) ) """ - class ByteSequence: - id: str - pos: str - min_off: str - max_off: str - endian: str - value: str + internal_sigs = [] + for internal in internal_signatures: + sig_id = _get_node_value("SignatureID", internal) + sig_name = _get_node_value("SignatureName", internal) + _ = _get_node_value("SignatureNote", internal) + try: + byte_sequences = internal.getElementsByTagName("ByteSequence") + sequences = get_bytes(byte_sequences) + except IndexError: + continue + internal_sigs.append( + InternalSignature( + id=sig_id, + name=sig_name, + byte_sequences=sequences, + ) + ) """ - return bs_res + iss = InternalSignature( + id=0, + name="", + byte_sequences=bs_res, + ) + + return [iss] + + +def create_xml(all: list[InternalSignature]) -> None: + """todo...""" + for x in all: + print("============") + print(x) def create_baseline_json_sequences(): From eb84ebeefda0f620b25f3f7196153336ff0a5872 Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Sun, 23 Nov 2025 22:58:44 +0100 Subject: [PATCH 12/13] WIP: better handling of sequences Encodings are not yet working smoothly... --- src/jsonid/export.py | 18 +------- src/jsonid/pronom.py | 97 +++++++++++++++++++++++++------------------- 2 files changed, 56 insertions(+), 59 deletions(-) diff --git a/src/jsonid/export.py b/src/jsonid/export.py index 8343012..f325bae 100644 --- a/src/jsonid/export.py +++ b/src/jsonid/export.py @@ -48,7 +48,7 @@ def exportPRONOM() -> None: for datum in data: id_ = datum.json()["identifier"] - name_ = datum.json()["name"] + name_ = datum.json()["name"][0]["@en"] markers = datum.json()["markers"] format_sequences = [] @@ -90,22 +90,6 @@ def exportPRONOM() -> None: formats.append(format) - # Process all the results. - for sequences in all_sequences: - if not isinstance(sequences[2], list): - raise TypeError - print("-----") - print(f"{sequences[0]}: {sequences[1][0]["@en"]}") - print("") - for idx, sequence in enumerate(sequences[2]): - # Need to return a set of internal signatures: - # - # ... bytesequences... - # - # - print(idx, ".", sequence.byte_sequences) - # return - pronom.process_formats_and_save(formats, "abc.xml") diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index a1157a9..6192ff8 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -1,5 +1,9 @@ -"""PRONOM export routines.""" +"""PRONOM export routines. +XML tooling: https://xmllint.com/ +""" + +import binascii import codecs import logging import xml.dom.minidom @@ -23,6 +27,15 @@ DISK_SECTOR_SIZE: Final[int] = 4095 +# Common PRONOM characters. +COLON: Final[str] = "3A" +CURLY_OPEN: Final[str] = "7B" +CURLY_CLOSE: Final[str] = "7D" +SQUARE_OPEN: Final[str] = "5B" +SQUARE_CLOSE: Final[str] = "5D" +DOUBLE_QUOTE: Final[str] = "22" +WS_REGEX: Final[str] = "(0-10)" + class UnprocessableEntity(Exception): """Provide a way to give complete feedback to the caller to allow @@ -221,10 +234,7 @@ def process_formats_and_save(formats: list[Format], filename: str): dom = xml.dom.minidom.parseString(signature_file) except xml.parsers.expat.ExpatError as err: logger.error("cannot process xml: %s", err) - print("xxxxxxxxxxxx") - print(signature_file) return - pretty_xml = dom.toprettyxml(indent=" ", encoding="utf-8") prettier_xml = export_helpers.new_prettify(pretty_xml) logger.info("outputting to: %s", filename) @@ -232,6 +242,19 @@ def process_formats_and_save(formats: list[Format], filename: str): output_file.write(prettier_xml) +def encode_roundtrip(val: str, encoding: str) -> str: + """We want to get a plain-text byte-sequence into a new + encoding. It takes a few hops and skips. + """ + val = val.strip() + try: + re_encoded = binascii.unhexlify(val).decode("utf-8").encode(encoding) + except (binascii.Error, UnicodeDecodeError) as err: + logger.error("cannot convert: %s len: %s ('%s')", val, len(val), err) + return val + return binascii.hexlify(re_encoded).decode().upper() + + def _type_to_str(t: type, encoding: str) -> str: """todo...""" @@ -240,25 +263,19 @@ def _type_to_str(t: type, encoding: str) -> str: return "[30:39]" if t == helpers.TYPE_BOOL: # true | false - # TODO: - """ENCODE then hexlify... - return ( - f"{'\x22'.encode(encoding)}(74727565|66616C7365){'\x22'.encode(encoding)}" - ) - """ - return "22(74727565|66616C7365)22" + return f"{encode_roundtrip(DOUBLE_QUOTE, encoding)}({encode_roundtrip('74727565', encoding)}|{encode_roundtrip('66616C7365', encoding)}){encode_roundtrip(DOUBLE_QUOTE , encoding)}" if t == helpers.TYPE_STRING: # string begins with a double quote and ends in a double quote. - return "'22*22" + return f"'{encode_roundtrip(DOUBLE_QUOTE, encoding)}*{encode_roundtrip(DOUBLE_QUOTE, encoding)}" if t == helpers.TYPE_MAP: # { == 7B; } == 7D - return "7B*7D" + return f"{encode_roundtrip(CURLY_OPEN, encoding)}*{encode_roundtrip(CURLY_CLOSE, encoding)}" if t == helpers.TYPE_LIST: # [ == 5B; ] == 5D - return "5b*5d" + return f"{encode_roundtrip(SQUARE_OPEN, encoding)}*{encode_roundtrip(SQUARE_CLOSE, encoding)}" if t == helpers.TYPE_NONE: # null - return "\x6e\x75\x6c\x6c".encode(encoding) + return f"{encode_roundtrip('6e756c6c', encoding)}".encode(encoding) # This should only trigger for incorrect values at this point.. raise UnprocessableEntity(f"type_to_str: {t}") @@ -372,12 +389,6 @@ def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: encoding = "utf-8" - COLON: Final[str] = "3A" - CURLY_OPEN: Final[str] = "7B" - SQUARE_OPEN: Final[str] = "5B" - DOUBLE_QUOTE: Final[str] = "22" - WS: Final[str] = "(0-10)" - res = [] for idx, marker in enumerate(markers, 2): @@ -391,7 +402,8 @@ def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) k0 = f"{DOUBLE_QUOTE}{k0}{DOUBLE_QUOTE}" k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" - k1 = f"{k0}{WS}{COLON}*{WS}{k1}{WS}{COLON}" + k1 = f"{encode_roundtrip(k0, encoding)}{WS_REGEX}{COLON}*{WS_REGEX}{encode_roundtrip(k1, encoding)}{WS_REGEX}{COLON}" + # k1 = {encode_roundtrip(k1, encoding)} marker.pop("GOTO") marker.pop("KEY") if registry_matchers.MARKER_INDEX in marker.keys(): @@ -400,53 +412,54 @@ def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: # key... k0 = SQUARE_OPEN k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) - k1 = f"{WS}{k0}*{CURLY_OPEN}*{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" + k1 = f"{WS_REGEX}{k0}*{CURLY_OPEN}*{DOUBLE_QUOTE}{encode_roundtrip(k1, encoding)}{DOUBLE_QUOTE}" marker.pop("INDEX") marker.pop("KEY") if "KEY" in marker.keys(): k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" + k1 = f"{encode_roundtrip(k1, encoding)}" marker.pop("KEY") # Given a key, each of the remaining rule parts must result in # exiting early. if registry_matchers.MARKER_KEY_EXISTS in marker.keys(): - res.append(f"BOF: k.{k1}{WS}{COLON}".upper()) + res.append(f"{k1}{WS_REGEX}{COLON}".upper()) continue if registry_matchers.MARKER_IS_TYPE in marker.keys(): - t = _type_to_str(marker["ISTYPE"], encoding=encoding) # TODO... - k1 = f"BOF: k.{k1}{WS}{COLON}{WS} v.{t}" + is_type = _type_to_str(marker["ISTYPE"], encoding=encoding) # TODO... + k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}{is_type}" res.append(k1.upper()) continue if registry_matchers.MARKER_IS in marker.keys(): marker_is = marker["IS"] if not isinstance(marker_is, str): _complex_is_type(marker_is) - k2 = _str_to_hex_str(marker_is, encoding=encoding) - isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.{k2}" - res.append(isk.upper()) + is_val = _str_to_hex_str(marker_is, encoding=encoding) + k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}{is_val}" + res.append(k1.upper()) continue if registry_matchers.MARKER_STARTSWITH in marker.keys(): - k2 = _str_to_hex_str(marker["STARTSWITH"], encoding=encoding) - isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.22{k2}" - res.append(isk.upper()) + starts_with = _str_to_hex_str(marker["STARTSWITH"], encoding=encoding) + k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}{encode_roundtrip(DOUBLE_QUOTE, encoding)}{starts_with}" + res.append(k1.upper()) continue if registry_matchers.MARKER_ENDSWITH in marker.keys(): - k2 = _str_to_hex_str(marker["ENDSWITH"], encoding=encoding) - isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.*{k2}22" - res.append(isk.upper()) + ends_with = _str_to_hex_str(marker["ENDSWITH"], encoding=encoding) + k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}*{ends_with}{encode_roundtrip(DOUBLE_QUOTE, encoding)}" + res.append(k1.upper()) continue if registry_matchers.MARKER_CONTAINS in marker.keys(): - k2 = _str_to_hex_str(marker["CONTAINS"], encoding=encoding) - isk = f"BOF: k.{k1}{WS}{COLON}{WS} v.*{k2}*" - res.append(isk.upper()) + contains = _str_to_hex_str(marker["CONTAINS"], encoding=encoding) + k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}{encode_roundtrip(DOUBLE_QUOTE, encoding)}*{contains}*{encode_roundtrip(DOUBLE_QUOTE, encoding)}" + res.append(k1.upper()) continue if registry_matchers.MARKER_REGEX in marker.keys(): raise UnprocessableEntity("REGEX not yet implemented") if registry_matchers.MARKER_KEY_NO_EXIST in marker.keys(): raise UnprocessableEntity("KEY NO EXIST not yet implemented") - BOF = f"{{0-{DISK_SECTOR_SIZE}}}7B" - EOF = f"7D{{0-{DISK_SECTOR_SIZE}}}" + BOF = f"{encode_roundtrip(CURLY_OPEN, encoding)}" + EOF = f"{encode_roundtrip(CURLY_CLOSE, encoding)}" bs_res = [] @@ -467,8 +480,8 @@ def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: bs = ByteSequence( id=idx, - pos="BOF", - min_off=1, + pos="VAR", + min_off="", max_off="", endian="", value=item, From 1b360da449541e4f9b04b0810aecb925a83ef5fb Mon Sep 17 00:00:00 2001 From: ross-spencer Date: Sun, 23 Nov 2025 23:35:45 +0100 Subject: [PATCH 13/13] WIP: encoding looks to be working --- src/jsonid/pronom.py | 90 +++++++++++++++++++++++++++----------------- 1 file changed, 56 insertions(+), 34 deletions(-) diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py index 6192ff8..ae8c0de 100644 --- a/src/jsonid/pronom.py +++ b/src/jsonid/pronom.py @@ -258,21 +258,28 @@ def encode_roundtrip(val: str, encoding: str) -> str: def _type_to_str(t: type, encoding: str) -> str: """todo...""" + colon_encoded = encode_roundtrip(COLON, encoding) + curly_open_encoded = encode_roundtrip(CURLY_OPEN, encoding) + curly_close_encoded: Final[str] = encode_roundtrip(CURLY_CLOSE, encoding) + square_open_encoded: Final[str] = encode_roundtrip(SQUARE_OPEN, encoding) + square_close_encoded: Final[str] = encode_roundtrip(SQUARE_CLOSE, encoding) + double_quote_encoded: Final[str] = encode_roundtrip(DOUBLE_QUOTE, encoding) + if t == helpers.TYPE_INTEGER or t == helpers.TYPE_FLOAT: # how do we represent larger numbers? and do we need to? return "[30:39]" if t == helpers.TYPE_BOOL: # true | false - return f"{encode_roundtrip(DOUBLE_QUOTE, encoding)}({encode_roundtrip('74727565', encoding)}|{encode_roundtrip('66616C7365', encoding)}){encode_roundtrip(DOUBLE_QUOTE , encoding)}" + return f"{double_quote_encoded}({encode_roundtrip('74727565', encoding)}|{encode_roundtrip('66616C7365', encoding)}){double_quote_encoded}" if t == helpers.TYPE_STRING: # string begins with a double quote and ends in a double quote. - return f"'{encode_roundtrip(DOUBLE_QUOTE, encoding)}*{encode_roundtrip(DOUBLE_QUOTE, encoding)}" + return f"'{double_quote_encoded}*{double_quote_encoded}" if t == helpers.TYPE_MAP: # { == 7B; } == 7D - return f"{encode_roundtrip(CURLY_OPEN, encoding)}*{encode_roundtrip(CURLY_CLOSE, encoding)}" + return f"{curly_open_encoded}*{curly_close_encoded}" if t == helpers.TYPE_LIST: # [ == 5B; ] == 5D - return f"{encode_roundtrip(SQUARE_OPEN, encoding)}*{encode_roundtrip(SQUARE_CLOSE, encoding)}" + return f"{square_open_encoded}*{square_close_encoded}" if t == helpers.TYPE_NONE: # null return f"{encode_roundtrip('6e756c6c', encoding)}".encode(encoding) @@ -313,7 +320,7 @@ def _get_bom(ttl_hash=None) -> list: return res -def _str_to_hex_str(s: str, encoding: str) -> str: +def _str_to_hex_str_(s: str, encoding: str) -> str: """todo...""" encoded_s = s.encode(encoding) bytes = [] @@ -329,6 +336,15 @@ def _str_to_hex_str(s: str, encoding: str) -> str: return hex_str +def _str_to_hex_str(s: str) -> str: + """todo...""" + bytes = [] + for byte_ in s.encode(): + bytes.append(hex(byte_).replace("0x", "")) + hex_str = "".join(bytes).upper() + return hex_str + + def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: """todo... @@ -387,7 +403,14 @@ def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: """ - encoding = "utf-8" + encoding = "utf-16" + + colon_encoded = encode_roundtrip(COLON, encoding) + curly_open_encoded = encode_roundtrip(CURLY_OPEN, encoding) + curly_close_encoded: Final[str] = encode_roundtrip(CURLY_CLOSE, encoding) + square_open_encoded: Final[str] = encode_roundtrip(SQUARE_OPEN, encoding) + square_close_encoded: Final[str] = encode_roundtrip(SQUARE_CLOSE, encoding) + double_quote_encoded: Final[str] = encode_roundtrip(DOUBLE_QUOTE, encoding) res = [] @@ -398,59 +421,58 @@ def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: if registry_matchers.MARKER_GOTO in marker.keys(): # first key exists like regular key, then we have to # search for the next key... - k0 = _str_to_hex_str(marker["GOTO"], encoding=encoding) - k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) - k0 = f"{DOUBLE_QUOTE}{k0}{DOUBLE_QUOTE}" - k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" - k1 = f"{encode_roundtrip(k0, encoding)}{WS_REGEX}{COLON}*{WS_REGEX}{encode_roundtrip(k1, encoding)}{WS_REGEX}{COLON}" - # k1 = {encode_roundtrip(k1, encoding)} + k0 = _str_to_hex_str(marker["GOTO"]) + k1 = _str_to_hex_str(marker["KEY"]) + k0 = f"{double_quote_encoded}{encode_roundtrip(k0, encoding)}{double_quote_encoded}" + k1 = f"{double_quote_encoded}{encode_roundtrip(k1, encoding)}{double_quote_encoded}" + k1 = ( + f"{k0}{WS_REGEX}{colon_encoded}*{WS_REGEX}{k1}{WS_REGEX}{colon_encoded}" + ) marker.pop("GOTO") marker.pop("KEY") if registry_matchers.MARKER_INDEX in marker.keys(): - # first we have a square bracket that then needs a search - # parameter for the next object (curly bracket) and then - # key... - k0 = SQUARE_OPEN - k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) - k1 = f"{WS_REGEX}{k0}*{CURLY_OPEN}*{DOUBLE_QUOTE}{encode_roundtrip(k1, encoding)}{DOUBLE_QUOTE}" + # we want to match a list so first we have a square bracket + # that then needs a search parameter for the next object + # (curly bracket) and then key... + k1 = _str_to_hex_str(marker["KEY"]) + k1 = f"{WS_REGEX}{square_open_encoded}*{curly_close_encoded}*{double_quote_encoded}{encode_roundtrip(k1, encoding)}{double_quote_encoded}" marker.pop("INDEX") marker.pop("KEY") if "KEY" in marker.keys(): - k1 = _str_to_hex_str(marker["KEY"], encoding=encoding) - k1 = f"{DOUBLE_QUOTE}{k1}{DOUBLE_QUOTE}" - k1 = f"{encode_roundtrip(k1, encoding)}" + k1 = _str_to_hex_str(marker["KEY"]) + k1 = f"{double_quote_encoded}{encode_roundtrip(k1, encoding)}{double_quote_encoded}" marker.pop("KEY") # Given a key, each of the remaining rule parts must result in # exiting early. if registry_matchers.MARKER_KEY_EXISTS in marker.keys(): - res.append(f"{k1}{WS_REGEX}{COLON}".upper()) + res.append(f"{k1}{WS_REGEX}{colon_encoded}".upper()) continue if registry_matchers.MARKER_IS_TYPE in marker.keys(): - is_type = _type_to_str(marker["ISTYPE"], encoding=encoding) # TODO... - k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}{is_type}" + is_type = _type_to_str(marker["ISTYPE"], encoding=encoding) + k1 = f"{k1}{WS_REGEX}{colon_encoded}{WS_REGEX}{is_type}" res.append(k1.upper()) continue if registry_matchers.MARKER_IS in marker.keys(): marker_is = marker["IS"] if not isinstance(marker_is, str): _complex_is_type(marker_is) - is_val = _str_to_hex_str(marker_is, encoding=encoding) - k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}{is_val}" + is_val = _str_to_hex_str(marker_is) + k1 = f"{k1}{WS_REGEX}{WS_REGEX}{encode_roundtrip(is_val, encoding)}" res.append(k1.upper()) continue if registry_matchers.MARKER_STARTSWITH in marker.keys(): - starts_with = _str_to_hex_str(marker["STARTSWITH"], encoding=encoding) - k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}{encode_roundtrip(DOUBLE_QUOTE, encoding)}{starts_with}" + starts_with = _str_to_hex_str(marker["STARTSWITH"]) + k1 = f"{k1}{WS_REGEX}{colon_encoded}{WS_REGEX}{double_quote_encoded}{starts_with}" res.append(k1.upper()) continue if registry_matchers.MARKER_ENDSWITH in marker.keys(): - ends_with = _str_to_hex_str(marker["ENDSWITH"], encoding=encoding) - k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}*{ends_with}{encode_roundtrip(DOUBLE_QUOTE, encoding)}" + ends_with = _str_to_hex_str(marker["ENDSWITH"]) + k1 = f"{k1}{WS_REGEX}{colon_encoded}{WS_REGEX}*{ends_with}{double_quote_encoded}" res.append(k1.upper()) continue if registry_matchers.MARKER_CONTAINS in marker.keys(): - contains = _str_to_hex_str(marker["CONTAINS"], encoding=encoding) - k1 = f"{k1}{WS_REGEX}{COLON}{WS_REGEX}{encode_roundtrip(DOUBLE_QUOTE, encoding)}*{contains}*{encode_roundtrip(DOUBLE_QUOTE, encoding)}" + contains = _str_to_hex_str(marker["CONTAINS"]) + k1 = f"{k1}{WS_REGEX}{colon_encoded}{WS_REGEX}{double_quote_encoded}*{contains}*{double_quote_encoded}" res.append(k1.upper()) continue if registry_matchers.MARKER_REGEX in marker.keys(): @@ -458,8 +480,8 @@ def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: if registry_matchers.MARKER_KEY_NO_EXIST in marker.keys(): raise UnprocessableEntity("KEY NO EXIST not yet implemented") - BOF = f"{encode_roundtrip(CURLY_OPEN, encoding)}" - EOF = f"{encode_roundtrip(CURLY_CLOSE, encoding)}" + BOF = f"{curly_open_encoded}" + EOF = f"{curly_close_encoded}" bs_res = []