diff --git a/src/jsonid/export.py b/src/jsonid/export.py index c062aa8..f325bae 100644 --- a/src/jsonid/export.py +++ b/src/jsonid/export.py @@ -6,20 +6,21 @@ from datetime import timezone try: + import pronom import registry_data import version except ModuleNotFoundError: try: - from src.jsonid import registry_data, version + from src.jsonid import pronom, registry_data, version except ModuleNotFoundError: - from jsonid import registry_data, version + from jsonid import pronom, registry_data, version logger = logging.getLogger(__name__) def exportJSON() -> None: # pylint: disable=C0103 """Export to JSON.""" - logger.debug("exporting registry ad JSON") + logger.debug("exporting registry as JSON") data = registry_data.registry() json_obj = [] id_ = { @@ -35,3 +36,62 @@ def exportJSON() -> None: # pylint: disable=C0103 for datum in data: json_obj.append(datum.json()) print(json.dumps(json_obj, indent=2)) + + +def exportPRONOM() -> None: + """Export a PRONOM compatible set of signatures.""" + logger.debug("exporting registry as PRONOM") + data = registry_data.registry() + all_sequences = [] + + formats = [] + + for datum in data: + id_ = datum.json()["identifier"] + name_ = datum.json()["name"][0]["@en"] + markers = datum.json()["markers"] + + format_sequences = [] + + try: + sequences = pronom.process_markers(markers.copy()) + all_sequences.append((id_, name_, sequences)) + format_sequences.append(sequences) + except pronom.UnprocessableEntity as err: + logger.error( + "%s %s: cannot handle: %s", + id_, + name_, + err, + ) + for marker in markers: + logger.debug("--- START ---") + logger.debug("marker: %s", marker) + logger.debug("--- END ---") + continue + + format = pronom.Format( + id=0, + name=name_, + version="", + puid=id_, + mime="TODO", + classification="structured text", # TODO: magic + external_signatures=[ + pronom.ExternalSignature( + id=0, + signature="JSON", + type="TODO", + ) + ], + internal_signatures=format_sequences[0], + priorities=[], + ) + + formats.append(format) + + pronom.process_formats_and_save(formats, "abc.xml") + + +def exportPRONOMXML() -> None: + """Export a PRONOM compatible set of signatures.""" diff --git a/src/jsonid/export_helpers.py b/src/jsonid/export_helpers.py new file mode 100644 index 0000000..0dfec2f --- /dev/null +++ b/src/jsonid/export_helpers.py @@ -0,0 +1,30 @@ +"""Helpers for the export functions.""" + +import datetime +from datetime import timezone +from typing import Final +from xml.dom.minidom import parseString + +UTC_TIME_FORMAT: Final[str] = "%Y-%m-%dT%H:%M:%SZ" + + +def get_utc_timestamp_now(): + """Get a formatted UTC timestamp for 'now' that can be used when + a timestamp is needed. + """ + return datetime.datetime.now(timezone.utc).strftime(UTC_TIME_FORMAT) + + +def new_prettify(c): + """Remove excess newlines from DOM output. + + via: https://stackoverflow.com/a/14493981 + """ + reparsed = parseString(c) + return "\n".join( + [ + line + for line in reparsed.toprettyxml(indent=" " * 2).split("\n") + if line.strip() + ] + ) diff --git a/src/jsonid/helpers.py b/src/jsonid/helpers.py index ca28bc1..3a0ccac 100644 --- a/src/jsonid/helpers.py +++ b/src/jsonid/helpers.py @@ -2,7 +2,7 @@ import logging import time -from typing import Union +from typing import Final, Union try: import htm_template @@ -75,6 +75,15 @@ def format_marker(marker_text: str, marker: dict) -> str: return f"{marker_text}{marker_formatted}\n" +TYPE_BOOL: Final[str] = "bool" +TYPE_FLOAT: Final[str] = "float" +TYPE_INTEGER: Final[str] = "integer" +TYPE_LIST: Final[str] = "list" +TYPE_NONE: Final[str] = "NoneType" +TYPE_MAP: Final[str] = "map" +TYPE_STRING: Final[str] = "string" + + def substitute_type_text(replace_me: Union[str, type]): """Output a text substitution for a type that will otherwise not pretty-print. @@ -83,19 +92,19 @@ def substitute_type_text(replace_me: Union[str, type]): # pylint: disable=R0911 if replace_me.__name__ == "dict": - return "map" + return TYPE_MAP if replace_me.__name__ == "int": - return "integer" + return TYPE_INTEGER if replace_me.__name__ == "list": - return "list" + return TYPE_LIST if replace_me.__name__ == "str": - return "string" + return TYPE_STRING if replace_me.__name__ == "float": - return "float" + return TYPE_NONE if replace_me.__name__ == "bool": - return "bool" + return TYPE_BOOL if replace_me.__name__ == "NoneType": - return "NoneType" + return TYPE_NONE if not isinstance(replace_me, type): pass return replace_me diff --git a/src/jsonid/jsonid.py b/src/jsonid/jsonid.py index 0dfa6b8..aae413d 100644 --- a/src/jsonid/jsonid.py +++ b/src/jsonid/jsonid.py @@ -160,6 +160,7 @@ def main() -> None: "--pronom", help="return a PRONOM-centric view of the results", required=False, + action="store_true", ) parser.add_argument( "--export", @@ -220,7 +221,8 @@ def main() -> None: if args.registry: raise NotImplementedError("custom registry is not yet available") if args.pronom: - raise NotImplementedError("pronom view is not yet implemented") + export.exportPRONOM() + sys.exit() if args.language: raise NotImplementedError("multiple languages are not yet implemented") if args.export: diff --git a/src/jsonid/pronom.py b/src/jsonid/pronom.py new file mode 100644 index 0000000..ae8c0de --- /dev/null +++ b/src/jsonid/pronom.py @@ -0,0 +1,565 @@ +"""PRONOM export routines. + +XML tooling: https://xmllint.com/ +""" + +import binascii +import codecs +import logging +import xml.dom.minidom +from dataclasses import dataclass +from functools import lru_cache +from typing import Any, Final + +try: + import export_helpers + import helpers + import registry_matchers +except ModuleNotFoundError: + try: + from src.jsonid import export_helpers, helpers, registry_matchers + except ModuleNotFoundError: + from jsonid import export_helpers, helpers, registry_matchers + + +logger = logging.getLogger(__name__) + + +DISK_SECTOR_SIZE: Final[int] = 4095 + +# Common PRONOM characters. +COLON: Final[str] = "3A" +CURLY_OPEN: Final[str] = "7B" +CURLY_CLOSE: Final[str] = "7D" +SQUARE_OPEN: Final[str] = "5B" +SQUARE_CLOSE: Final[str] = "5D" +DOUBLE_QUOTE: Final[str] = "22" +WS_REGEX: Final[str] = "(0-10)" + + +class UnprocessableEntity(Exception): + """Provide a way to give complete feedback to the caller to allow + it to exit.""" + + +@dataclass +class ExternalSignature: + id: str + signature: str + type: str + + +@dataclass +class ByteSequence: + id: str + pos: str + min_off: str + max_off: str + endian: str + value: str + + +@dataclass +class InternalSignature: + id: str + name: str + byte_sequences: list[ByteSequence] + + +@dataclass +class Priority: + type: str + id: str + + +@dataclass +class Identifier: + type: str + value: str + + +@dataclass +class Format: + id: str + name: str + version: str + puid: str + mime: str + classification: str + external_signatures: list[ExternalSignature] + internal_signatures: list[InternalSignature] + priorities: list[int] + + +def create_many_to_one_byte_sequence(internal_signatures: list[InternalSignature]): + """Create a many to one byte sequence, i.e. a format with multiple + Internal Signatures. + """ + internal_signature = "" + for internal in internal_signatures: + id_ = internal.id + bs = create_one_to_many_byte_sequence(internal.byte_sequences) + internal_signature = f""" +{internal_signature} + {bs} + + """ + return internal_signature.strip() + + +def calculate_variable_off_bof(item: ByteSequence): + """Given variable offsets, calculate the correct syntax.""" + seq = item.value + if ( + item.min_off != "" + and int(item.min_off) > 0 + and item.max_off != "" + and int(item.max_off) > 0 + ): + seq = f"{{{item.min_off}-{int(item.min_off)+int(item.max_off)}}}{seq}" + elif item.max_off != "" and int(item.max_off) > 0: + seq = f"{{0-{item.max_off}}}{seq}" + elif item.min_off != "" and int(item.min_off) > 0: + seq = f"{{{item.min_off}}}{seq}" + return seq + + +def calculate_variable_off_eof(item: ByteSequence): + """Given variable offsets, calculate the correct syntax.""" + seq = item.value + if ( + item.min_off != "" + and int(item.min_off) > 0 + and item.max_off != "" + and int(item.max_off) > 0 + ): + seq = f"{seq}{{{item.min_off}-{int(item.min_off)+int(item.max_off)}}}" + elif item.max_off != "" and int(item.max_off) > 0: + seq = f"{seq}{{0-{item.max_off}}}" + elif item.min_off != "" and int(item.min_off) > 0: + seq = f"{seq}{{{item.min_off}}}" + return seq + + +def create_one_to_many_byte_sequence(byte_sequences: list[ByteSequence]): + """Create a byte sequence object.""" + byte_sequence = "" + for item in byte_sequences: + seq = item.value + if item.pos.startswith("EOF"): + seq = calculate_variable_off_eof(item) + elif item.pos.startswith("BOF"): + seq = calculate_variable_off_bof(item) + byte_sequence = f""" +{byte_sequence.strip()} + + """ + return byte_sequence.strip() + + +def create_file_format_collection(fmt: list[Format]): + """Create the FileFormatCollection object. + + ``` + + 1 + ext + + + + 880 + 881 + ai + 86 + 331 + 332 + 771 + 773 + + ``` + + """ + EXT: Final[str] = "File extension" + internal_sigs = [ + f"{sig.id}" + for sig in fmt.internal_signatures + ] + external_sigs = [ + f"{sig.signature}" + for sig in fmt.external_signatures + if sig.type == EXT + ] + priorities = [ + f"{priority.id}" + for priority in fmt.priorities + ] + ff = f""" + + {"".join(internal_sigs).strip()} + {"".join(external_sigs).strip()} + {"".join(priorities).strip()} + + """ + return ff.strip() + + +def process_formats_and_save(formats: list[Format], filename: str): + """Process the collected formats and output a signature file. + + NB. Given our dataclasses here, we have the opportunity to rework + this data into many new structures. We output XML because DROID + expects XML. + """ + isc = [] + ffc = [] + for fmt in formats: + ffc.append(create_file_format_collection(fmt)) + if fmt.internal_signatures: + isc.append(create_many_to_one_byte_sequence(fmt.internal_signatures)) + droid_template = f""" + + + + {"".join(isc).strip()} + + + {"".join(ffc).strip()} + + + """ + dom = None + signature_file = droid_template.strip().replace("\n", "") + try: + dom = xml.dom.minidom.parseString(signature_file) + except xml.parsers.expat.ExpatError as err: + logger.error("cannot process xml: %s", err) + return + pretty_xml = dom.toprettyxml(indent=" ", encoding="utf-8") + prettier_xml = export_helpers.new_prettify(pretty_xml) + logger.info("outputting to: %s", filename) + with open(filename, "w", encoding="utf=8") as output_file: + output_file.write(prettier_xml) + + +def encode_roundtrip(val: str, encoding: str) -> str: + """We want to get a plain-text byte-sequence into a new + encoding. It takes a few hops and skips. + """ + val = val.strip() + try: + re_encoded = binascii.unhexlify(val).decode("utf-8").encode(encoding) + except (binascii.Error, UnicodeDecodeError) as err: + logger.error("cannot convert: %s len: %s ('%s')", val, len(val), err) + return val + return binascii.hexlify(re_encoded).decode().upper() + + +def _type_to_str(t: type, encoding: str) -> str: + """todo...""" + + colon_encoded = encode_roundtrip(COLON, encoding) + curly_open_encoded = encode_roundtrip(CURLY_OPEN, encoding) + curly_close_encoded: Final[str] = encode_roundtrip(CURLY_CLOSE, encoding) + square_open_encoded: Final[str] = encode_roundtrip(SQUARE_OPEN, encoding) + square_close_encoded: Final[str] = encode_roundtrip(SQUARE_CLOSE, encoding) + double_quote_encoded: Final[str] = encode_roundtrip(DOUBLE_QUOTE, encoding) + + if t == helpers.TYPE_INTEGER or t == helpers.TYPE_FLOAT: + # how do we represent larger numbers? and do we need to? + return "[30:39]" + if t == helpers.TYPE_BOOL: + # true | false + return f"{double_quote_encoded}({encode_roundtrip('74727565', encoding)}|{encode_roundtrip('66616C7365', encoding)}){double_quote_encoded}" + if t == helpers.TYPE_STRING: + # string begins with a double quote and ends in a double quote. + return f"'{double_quote_encoded}*{double_quote_encoded}" + if t == helpers.TYPE_MAP: + # { == 7B; } == 7D + return f"{curly_open_encoded}*{curly_close_encoded}" + if t == helpers.TYPE_LIST: + # [ == 5B; ] == 5D + return f"{square_open_encoded}*{square_close_encoded}" + if t == helpers.TYPE_NONE: + # null + return f"{encode_roundtrip('6e756c6c', encoding)}".encode(encoding) + # This should only trigger for incorrect values at this point.. + raise UnprocessableEntity(f"type_to_str: {t}") + + +def _complex_is_type(marker: Any) -> str: + """todo...""" + raise UnprocessableEntity(f"complex IS type: '{marker}' (WIP)") + + +@lru_cache() +def _get_bom(ttl_hash=None) -> list: + """Todo...""" + replaces = [ + codecs.BOM, + codecs.BOM_BE, + codecs.BOM_LE, + codecs.BOM_UTF8, + codecs.BOM_UTF16, + codecs.BOM_UTF16_BE, + codecs.BOM_UTF16_LE, + codecs.BOM_UTF32, + codecs.BOM_UTF32_BE, + codecs.BOM_UTF32_LE, + ] + + res = [] + + for bom in replaces: + hex_bom = "" + for marker in bom: + char = hex(marker) + hex_bom = f"{hex_bom}{char.replace("0x", "")}".upper() + res.append(hex_bom) + + return res + + +def _str_to_hex_str_(s: str, encoding: str) -> str: + """todo...""" + encoded_s = s.encode(encoding) + bytes = [] + replaces = _get_bom() + for byte_ in encoded_s: + bytes.append(hex(byte_).replace("0x", "")) + hex_str = "".join(bytes).upper() + for bom in replaces: + if not hex_str.startswith(bom): + continue + hex_str = hex_str.replace(bom, "", 1) + break + return hex_str + + +def _str_to_hex_str(s: str) -> str: + """todo...""" + bytes = [] + for byte_ in s.encode(): + bytes.append(hex(byte_).replace("0x", "")) + hex_str = "".join(bytes).upper() + return hex_str + + +def process_markers(markers: list, encoding: str = "") -> tuple[list | bool]: + """todo... + + returns a tuple describing the processed value and a flag to + highlight the result is potentially lossless, e.g. in the case + of matching types, e.g. strings. + + dict_keys(['CONTAINS']) + dict_keys(['ENDSWITH']) + dict_keys(['IS'] + dict_keys(['ISTYPE']) + dict_keys(['STARTSWITH']) + + + key(0-n):(0-n)value + + Need to return something like: + + + + + Different encodings need to be accounted for, e.g. (with added + whitespace below) + + UTF-32-LE: + + 00000000: 2000 0000 2000 0000 2000 0000 2000 0000 ... ... ... ... + 00000010: 2000 0000 2000 0000 0a00 0000 0a00 0000 ... ........... + 00000020: 0a00 0000 0a00 0000 7b00 0000 2200 0000 ........{..."... + 00000030: 6100 0000 2200 0000 3a00 0000 2000 0000 a..."...:... ... + 00000040: 2200 0000 6200 0000 2200 0000 7d00 0000 "...b..."...}... + 00000050: 0a00 0000 .... + + UTF-32-BE: + + 00000000: 0000 0020 0000 0020 0000 0020 0000 0020 ... ... ... ... + 00000010: 0000 0020 0000 0020 0000 000a 0000 000a ... ... ........ + 00000020: 0000 000a 0000 000a 0000 007b 0000 0022 ...........{..." + 00000030: 0000 0061 0000 0022 0000 003a 0000 0020 ...a..."...:... + 00000040: 0000 0022 0000 0062 0000 0022 0000 007d ..."...b..."...} + 00000050: 0000 000a .... + + + UTF-16-LE: + + 00000000: 2000 2000 2000 2000 2000 2000 0a00 0a00 . . . . . ..... + 00000010: 0a00 0a00 7b00 2200 6100 2200 3a00 2000 ....{.".a.".:. . + 00000020: 2200 6200 2200 7d00 0a00 ".b.".}... + + UTF-16-BE: + + 00000000: 0020 0020 0020 0020 0020 0020 000a 000a . . . . . . .... + 00000010: 000a 000a 007b 0022 0061 0022 003a 0020 .....{.".a.".:. + 00000020: 0022 0062 0022 007d 000a .".b.".}.. + + + """ + + encoding = "utf-16" + + colon_encoded = encode_roundtrip(COLON, encoding) + curly_open_encoded = encode_roundtrip(CURLY_OPEN, encoding) + curly_close_encoded: Final[str] = encode_roundtrip(CURLY_CLOSE, encoding) + square_open_encoded: Final[str] = encode_roundtrip(SQUARE_OPEN, encoding) + square_close_encoded: Final[str] = encode_roundtrip(SQUARE_CLOSE, encoding) + double_quote_encoded: Final[str] = encode_roundtrip(DOUBLE_QUOTE, encoding) + + res = [] + + for idx, marker in enumerate(markers, 2): + + logger.debug("marker: %s", marker) + + if registry_matchers.MARKER_GOTO in marker.keys(): + # first key exists like regular key, then we have to + # search for the next key... + k0 = _str_to_hex_str(marker["GOTO"]) + k1 = _str_to_hex_str(marker["KEY"]) + k0 = f"{double_quote_encoded}{encode_roundtrip(k0, encoding)}{double_quote_encoded}" + k1 = f"{double_quote_encoded}{encode_roundtrip(k1, encoding)}{double_quote_encoded}" + k1 = ( + f"{k0}{WS_REGEX}{colon_encoded}*{WS_REGEX}{k1}{WS_REGEX}{colon_encoded}" + ) + marker.pop("GOTO") + marker.pop("KEY") + if registry_matchers.MARKER_INDEX in marker.keys(): + # we want to match a list so first we have a square bracket + # that then needs a search parameter for the next object + # (curly bracket) and then key... + k1 = _str_to_hex_str(marker["KEY"]) + k1 = f"{WS_REGEX}{square_open_encoded}*{curly_close_encoded}*{double_quote_encoded}{encode_roundtrip(k1, encoding)}{double_quote_encoded}" + marker.pop("INDEX") + marker.pop("KEY") + if "KEY" in marker.keys(): + k1 = _str_to_hex_str(marker["KEY"]) + k1 = f"{double_quote_encoded}{encode_roundtrip(k1, encoding)}{double_quote_encoded}" + marker.pop("KEY") + # Given a key, each of the remaining rule parts must result in + # exiting early. + if registry_matchers.MARKER_KEY_EXISTS in marker.keys(): + res.append(f"{k1}{WS_REGEX}{colon_encoded}".upper()) + continue + if registry_matchers.MARKER_IS_TYPE in marker.keys(): + is_type = _type_to_str(marker["ISTYPE"], encoding=encoding) + k1 = f"{k1}{WS_REGEX}{colon_encoded}{WS_REGEX}{is_type}" + res.append(k1.upper()) + continue + if registry_matchers.MARKER_IS in marker.keys(): + marker_is = marker["IS"] + if not isinstance(marker_is, str): + _complex_is_type(marker_is) + is_val = _str_to_hex_str(marker_is) + k1 = f"{k1}{WS_REGEX}{WS_REGEX}{encode_roundtrip(is_val, encoding)}" + res.append(k1.upper()) + continue + if registry_matchers.MARKER_STARTSWITH in marker.keys(): + starts_with = _str_to_hex_str(marker["STARTSWITH"]) + k1 = f"{k1}{WS_REGEX}{colon_encoded}{WS_REGEX}{double_quote_encoded}{starts_with}" + res.append(k1.upper()) + continue + if registry_matchers.MARKER_ENDSWITH in marker.keys(): + ends_with = _str_to_hex_str(marker["ENDSWITH"]) + k1 = f"{k1}{WS_REGEX}{colon_encoded}{WS_REGEX}*{ends_with}{double_quote_encoded}" + res.append(k1.upper()) + continue + if registry_matchers.MARKER_CONTAINS in marker.keys(): + contains = _str_to_hex_str(marker["CONTAINS"]) + k1 = f"{k1}{WS_REGEX}{colon_encoded}{WS_REGEX}{double_quote_encoded}*{contains}*{double_quote_encoded}" + res.append(k1.upper()) + continue + if registry_matchers.MARKER_REGEX in marker.keys(): + raise UnprocessableEntity("REGEX not yet implemented") + if registry_matchers.MARKER_KEY_NO_EXIST in marker.keys(): + raise UnprocessableEntity("KEY NO EXIST not yet implemented") + + BOF = f"{curly_open_encoded}" + EOF = f"{curly_close_encoded}" + + bs_res = [] + + bs_res.append( + ByteSequence( + id=1, + pos="BOF", + min_off=0, + max_off=f"{DISK_SECTOR_SIZE}", + endian="", + value=BOF, + ) + ) + + # Debug logging to demonstrate output. + for idx, item in enumerate(res, 2): + logger.debug("%s. %s", idx, item) + + bs = ByteSequence( + id=idx, + pos="VAR", + min_off="", + max_off="", + endian="", + value=item, + ) + bs_res.append(bs) + + bs_res.append( + ByteSequence( + id=1, + pos="EOF", + min_off="0", + max_off=f"{DISK_SECTOR_SIZE}", + endian="", + value=EOF, + ) + ) + + """ + internal_sigs = [] + for internal in internal_signatures: + sig_id = _get_node_value("SignatureID", internal) + sig_name = _get_node_value("SignatureName", internal) + _ = _get_node_value("SignatureNote", internal) + try: + byte_sequences = internal.getElementsByTagName("ByteSequence") + sequences = get_bytes(byte_sequences) + except IndexError: + continue + internal_sigs.append( + InternalSignature( + id=sig_id, + name=sig_name, + byte_sequences=sequences, + ) + ) + """ + + iss = InternalSignature( + id=0, + name="", + byte_sequences=bs_res, + ) + + return [iss] + + +def create_xml(all: list[InternalSignature]) -> None: + """todo...""" + for x in all: + print("============") + print(x) + + +def create_baseline_json_sequences(): + """Create baseline JSON sequences that match map and list types + with various different encodings. + """ + + # TODO...