diff --git a/README.md b/README.md index 79c6c20..69ec73d 100644 --- a/README.md +++ b/README.md @@ -9,8 +9,8 @@ Simple streaming JSON parser and encoder. When [reading](#reading) JSON data, `json-stream` can decode JSON data in a streaming manner, providing a pythonic dict/list-like interface, or a -[visitor-based interfeace](#visitor). Can stream from files, [URLs](#urls) -or [iterators](#iterators). +[visitor-based interface](#visitor). It can stream from files, [URLs](#urls) +or [iterators](#iterators). It can process [multiple JSON documents](#multiple) in a single stream. When [writing](#writing) JSON data, `json-stream` can stream JSON objects as you generate them. @@ -226,6 +226,65 @@ z at path ('xxxx', 3) [] at path ('xxxx', 5) ``` +### Multiple JSON documents: `load_many()` and `visit_many()` + +Sometimes JSON data arrives as a sequence of top‑level JSON texts rather than a single array/object. json-stream supports this pattern with: +- `json_stream.load_many(...)`: yields each top-level JSON value as it is parsed. +- `json_stream.visit_many(...)`: visits each top-level JSON value and yields control after each one. + +These functions are useful for common streaming formats: +- NDJSON (Newline-Delimited JSON, also known as JSON Lines, content-type often application/x-ndjson): one JSON value per line, separated by a single "\n". +- Concatenated or sequential JSON documents: complete JSON texts written back-to-back without delimiters. + +Note about concatenated JSON without explicit delimiters: + +When multiple top‑level JSON documents are simply concatenated with +no delimiters (no newlines or no spaces), json-stream can unambiguously +detect the document boundaries for the following top-level values: +- objects: `{ ... }` +- arrays: `[ ... ]` +- literals: `true`, `false`, `null` + +However, numbers and strings require a delimiter between consecutive +documents so the tokenizer can tell where one ends and the next begins. +A delimiter can be as simple as a single whitespace or a newline. + +Examples: +- Valid without delimiters (will parse as 2 docs): + - `{"a":1}{"b":2}` → `{...}`, `{...}` + - `[1][2]` → `[1]`, `[2]` + - `truefalse` → `true`, `false` + - `null[]` → `null`, `[]` +- These will error without a delimiter between documents (e.g. a space or newline): + - `3true` + - `0"x"` + - `"hi""there" + +#### Examples + +Read many from a file containing NDJSON or concatenated JSON: +```python +import json_stream + +with open("events.ndjson", "rb") as f: # NDJSON: one JSON object per line + for item in json_stream.load_many(f): + # each item is the top-level item in each JSON text + handle(item) +``` + +Visit many values in many documents: +```python +import json_stream + +def visitor(value, path): + # called depth-first for each value within the current top-level JSON text + ... + +for _ in json_stream.visit_many(open("events.ndjson", "rb"), visitor): + # loop advances one top-level JSON text at a time + pass +``` + ### Stream a URL `json_stream` knows how to stream directly from a URL using a variety of packages. diff --git a/pyproject.toml b/pyproject.toml index f609d36..6dea1ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta" [project] name = "json-stream" -version = "2.3.4" +version = "2.4.0" authors = [ {name = "Jamie Cockburn", email="jamie_cockburn@hotmail.co.uk"}, ] diff --git a/src/json_stream/__init__.py b/src/json_stream/__init__.py index 758995a..c5bcd82 100644 --- a/src/json_stream/__init__.py +++ b/src/json_stream/__init__.py @@ -1,4 +1,4 @@ -from json_stream.loader import load # noqa: F401 -from json_stream.visitor import visit # noqa: F401 +from json_stream.loader import load, load_many # noqa: F401 +from json_stream.visitor import visit, visit_many # noqa: F401 from json_stream.writer import streamable_list, streamable_dict # noqa: F401 from json_stream.util import to_standard_types diff --git a/src/json_stream/httpx/__init__.py b/src/json_stream/httpx/__init__.py index c41141a..fb26678 100644 --- a/src/json_stream/httpx/__init__.py +++ b/src/json_stream/httpx/__init__.py @@ -13,5 +13,13 @@ def load(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CON return json_stream.load(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer) +def load_many(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): + return json_stream.load_many(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer) + + def visit(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): return json_stream.visit(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer) + + +def visit_many(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): + return json_stream.visit_many(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer) diff --git a/src/json_stream/httpx/tests/test_httpx.py b/src/json_stream/httpx/tests/test_httpx.py index 49c8c75..3b498a4 100644 --- a/src/json_stream/httpx/tests/test_httpx.py +++ b/src/json_stream/httpx/tests/test_httpx.py @@ -4,7 +4,8 @@ from unittest import TestCase from unittest.mock import Mock -from json_stream.httpx import load, visit +from json_stream import to_standard_types +from json_stream.httpx import load, visit, load_many, visit_many class TestLoad(TestCase): @@ -16,13 +17,14 @@ def grouper(iterable, n): args = [iter(iterable)] * n return zip_longest(*args, fillvalue="") - def _create_mock_response(self): + def _create_mock_response(self, data=None): # httpx iter_bytes returns an iterable of bytes response = Mock() - data = json.dumps({ - "a": "a" * io.DEFAULT_BUFFER_SIZE, - "b": "b", - }) + if data is None: + data = json.dumps({ + "a": "a" * io.DEFAULT_BUFFER_SIZE, + "b": "b", + }) content = ("".join(chunk).encode() for chunk in self.grouper(data, 1024)) response.iter_bytes.return_value = content return response @@ -69,3 +71,37 @@ def test_visitor(self): ('a' * io.DEFAULT_BUFFER_SIZE, ('a',)), ('b', ('b',)), ], visited) + + def test_load_many(self): + expected = [{ + "a": "a" * io.DEFAULT_BUFFER_SIZE, + }, { + "b": "b" * io.DEFAULT_BUFFER_SIZE, + }] + data = "".join(json.dumps(i) for i in expected) + response = self._create_mock_response(data=data) + count = 0 + for item, exp in zip(load_many(response), expected): + self.assertEqual(exp, to_standard_types(item)) + count += 1 + self.assertEqual(count, len(expected)) + + def test_visit_many(self): + items = [{ + "a": "a" * io.DEFAULT_BUFFER_SIZE, + }, { + "b": "b" * io.DEFAULT_BUFFER_SIZE, + }] + data = "".join(json.dumps(e) for e in items) + response = self._create_mock_response(data=data) + current_visit = [] + visits = [] + + def visitor(item, path): + current_visit.append((item, path)) + for _ in visit_many(response, visitor): + visits.append(current_visit) + current_visit = [] + self.assertEqual(len(visits), len(items)) + self.assertEqual(visits[0], [('a' * io.DEFAULT_BUFFER_SIZE, ('a',))]) + self.assertEqual(visits[1], [('b' * io.DEFAULT_BUFFER_SIZE, ('b',))]) diff --git a/src/json_stream/loader.py b/src/json_stream/loader.py index 680e801..a1493bf 100644 --- a/src/json_stream/loader.py +++ b/src/json_stream/loader.py @@ -4,9 +4,16 @@ def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer): + return next(load_many(fp_or_iterable, persistent, tokenizer)) + + +def load_many(fp_or_iterable, persistent=False, tokenizer=default_tokenizer): fp = ensure_file(fp_or_iterable) token_stream = tokenizer(fp) - token_type, token = next(token_stream) - if token_type == TokenType.OPERATOR: - return StreamingJSONBase.factory(token, token_stream, persistent) - return token + for token_type, token in token_stream: + if token_type == TokenType.OPERATOR: + data = StreamingJSONBase.factory(token, token_stream, persistent) + yield data + data.read_all() + else: + yield token diff --git a/src/json_stream/requests/__init__.py b/src/json_stream/requests/__init__.py index acbcb0f..405efa2 100644 --- a/src/json_stream/requests/__init__.py +++ b/src/json_stream/requests/__init__.py @@ -13,5 +13,13 @@ def load(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CON return json_stream.load(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer) +def load_many(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): + return json_stream.load_many(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer) + + def visit(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): return json_stream.visit(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer) + + +def visit_many(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): + return json_stream.visit_many(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer) diff --git a/src/json_stream/requests/tests/test_requests.py b/src/json_stream/requests/tests/test_requests.py index d627f53..d6da65e 100644 --- a/src/json_stream/requests/tests/test_requests.py +++ b/src/json_stream/requests/tests/test_requests.py @@ -4,7 +4,8 @@ from unittest import TestCase from unittest.mock import Mock -from json_stream.requests import load, visit +from json_stream import to_standard_types +from json_stream.requests import load, visit, load_many, visit_many class TestLoad(TestCase): @@ -17,13 +18,14 @@ def grouper(iterable, n): args = [iter(iterable)] * n return zip_longest(*args, fillvalue="") - def _create_mock_response(self): + def _create_mock_response(self, data=None): # requests iter_content returns an iterable of bytes response = Mock() - data = json.dumps({ - "a": "a" * io.DEFAULT_BUFFER_SIZE, - "b": "b", - }) + if data is None: + data = json.dumps({ + "a": "a" * io.DEFAULT_BUFFER_SIZE, + "b": "b", + }) content = ("".join(chunk).encode() for chunk in self.grouper(data, 1024)) response.iter_content.return_value = content return response @@ -70,3 +72,37 @@ def test_visitor(self): ('a' * io.DEFAULT_BUFFER_SIZE, ('a',)), ('b', ('b',)), ], visited) + + def test_load_many(self): + expected = [{ + "a": "a" * io.DEFAULT_BUFFER_SIZE, + }, { + "b": "b" * io.DEFAULT_BUFFER_SIZE, + }] + data = "".join(json.dumps(i) for i in expected) + response = self._create_mock_response(data=data) + count = 0 + for item, exp in zip(load_many(response), expected): + self.assertEqual(exp, to_standard_types(item)) + count += 1 + self.assertEqual(count, len(expected)) + + def test_visit_many(self): + items = [{ + "a": "a" * io.DEFAULT_BUFFER_SIZE, + }, { + "b": "b" * io.DEFAULT_BUFFER_SIZE, + }] + data = "".join(json.dumps(e) for e in items) + response = self._create_mock_response(data=data) + current_visit = [] + visits = [] + + def visitor(item, path): + current_visit.append((item, path)) + for _ in visit_many(response, visitor): + visits.append(current_visit) + current_visit = [] + self.assertEqual(len(visits), len(items)) + self.assertEqual(visits[0], [('a' * io.DEFAULT_BUFFER_SIZE, ('a',))]) + self.assertEqual(visits[1], [('b' * io.DEFAULT_BUFFER_SIZE, ('b',))]) diff --git a/src/json_stream/select_tokenizer.py b/src/json_stream/select_tokenizer.py index 0612770..78c32ea 100644 --- a/src/json_stream/select_tokenizer.py +++ b/src/json_stream/select_tokenizer.py @@ -5,7 +5,7 @@ try: default_tokenizer = rust_tokenizer_or_raise() -except ExtensionException as e: +except ExtensionException as e: # pragma: no cover warn(str(e), category=ImportWarning) # ImportWarnings are ignored by default default_tokenizer = tokenize diff --git a/src/json_stream/tests/test_load_many.py b/src/json_stream/tests/test_load_many.py new file mode 100644 index 0000000..f32e704 --- /dev/null +++ b/src/json_stream/tests/test_load_many.py @@ -0,0 +1,102 @@ +import unittest +from io import StringIO, BytesIO +from unittest import TestCase + +import json_stream +from json_stream import to_standard_types + + +class TestLoadMany(TestCase): + def test_load_many(self): + # NDJSON-like: one JSON value per line + payload = "\n".join([ + '{"a": 1}', + '[1, 2]', + '3', + 'true', + 'null', + '"x"', + '{}', + '[]', + ]) + stream = StringIO(payload) + items = [ + to_standard_types(v) for v in json_stream.load_many(stream, persistent=True) + ] + self.assertListEqual( + items, + [ + {"a": 1}, + [1, 2], + 3, + True, + None, + "x", + {}, + [], + ], + ) + + def test_load_many_concatenated(self): + payload = b'{"a": 1}[1,2]truenull"x"{}[]' + stream = BytesIO(payload) + items = [ + to_standard_types(v) for v in json_stream.load_many(stream, persistent=False) + ] + # Even when persistent=False, materialization should consume correctly per item + self.assertListEqual( + items, + [ + {"a": 1}, + [1, 2], + True, + None, + "x", + {}, + [], + ], + ) + + @unittest.expectedFailure + def test_load_many_incompatible_concatenations(self): + # this fails because the tokeniser want's certain primitives to be followed by delimiter + payloads = [ + [b'{"a": 1}[]', [{"a": 1}, []]], # this one works + [b'3true', [3, True]], + [b'3.0true', [3.0, True]], + [b'1.2e11true', [120000000000.0, True]], + [b'0true', [0, True]], + [b'""true', [0, True]], + ] + errors = [] + for payload, expected in payloads: + try: + stream = BytesIO(payload) + items = [ + to_standard_types(v) for v in json_stream.load_many(stream, persistent=False) + ] + self.assertListEqual(expected, items) + except ValueError as e: + errors.append(f"{payload}: {e}") + self.assertListEqual([], errors) + + def test_load_many_skips_after_item_partially_consumed(self): + # Ensure that after yielding an object/array, the generator resumes and continues + # to the next top-level JSON text correctly. + payload = '{"first": [1, 2, 3]} {"second": {"x": 1}, "unconsumed": 7} 4' + stream = StringIO(payload) + + gen = json_stream.load_many(stream, persistent=True) + + first = next(gen) + # consume partially then fully + self.assertEqual(list(first["first"]), [1, 2, 3]) + + second = next(gen) + self.assertEqual(dict(second["second"].items()), {"x": 1}) + + third = next(gen) + self.assertEqual(third, 4) + + with self.assertRaises(StopIteration): + next(gen) diff --git a/src/json_stream/tests/test_visit_many.py b/src/json_stream/tests/test_visit_many.py new file mode 100644 index 0000000..ab63b26 --- /dev/null +++ b/src/json_stream/tests/test_visit_many.py @@ -0,0 +1,42 @@ +from io import StringIO +from unittest import TestCase + +import json_stream + + +class TestVisitMany(TestCase): + def test_visit_many(self): + payload = "\n".join([ + '{"a": 1}', + '[1, 2]', + '3', + 'true', + 'null', + '"x"', + '{}', + '[]', + ]) + stream = StringIO(payload) + + visited_batches = [] + current = [] + + def visitor(item, path): + current.append((item, path)) + + for _ in json_stream.visit_many(stream, visitor): + visited_batches.append(current) + current = [] + + # We only verify that per-top-level JSON text we received at least one visit + # and that the first visited path matches the expected top-level structure. + # Detailed traversal behavior is covered by other visitor tests. + self.assertEqual(len(visited_batches), 8) + self.assertEqual(visited_batches[0][0], (1, ('a',))) # {"a": 1} + self.assertEqual(visited_batches[1][0], (1, (0,))) # [1, 2] + self.assertEqual(visited_batches[2][0], (3, ())) # 3 + self.assertEqual(visited_batches[3][0], (True, ())) # true + self.assertEqual(visited_batches[4][0], (None, ())) # null + self.assertEqual(visited_batches[5][0], ("x", ())) # "x" + self.assertEqual(visited_batches[6][0], ({}, ())) # {} + self.assertEqual(visited_batches[7][0], ([], ())) # [] diff --git a/src/json_stream/tests/test_writer.py b/src/json_stream/tests/test_writer.py index 5e15275..31caf5e 100644 --- a/src/json_stream/tests/test_writer.py +++ b/src/json_stream/tests/test_writer.py @@ -7,7 +7,7 @@ class BaseTestWriter: def dump(self, o): - raise NotImplementedError() + raise NotImplementedError() # pragma: no cover def test_writer_wrapper(self): def dict_content(n): @@ -35,7 +35,7 @@ def test_writer_empty(self): @streamable_list def empty_list(): for i in range(0): # never yields - yield i + yield i # pragma: no cover o = empty_list() result = self.dump(o) diff --git a/src/json_stream/visitor.py b/src/json_stream/visitor.py index 5d691bd..b321670 100644 --- a/src/json_stream/visitor.py +++ b/src/json_stream/visitor.py @@ -20,12 +20,18 @@ def _visit(obj, visitor, path): visitor(obj, path) -def visit(fp_or_iterator, visitor, tokenizer=default_tokenizer): +def visit_many(fp_or_iterator, visitor, tokenizer=default_tokenizer): fp = ensure_file(fp_or_iterator) token_stream = tokenizer(fp) - token_type, token = next(token_stream) - if token_type == TokenType.OPERATOR: - obj = StreamingJSONBase.factory(token, token_stream, persistent=False) - else: - obj = token - _visit(obj, visitor, ()) + for token_type, token in token_stream: + if token_type == TokenType.OPERATOR: + obj = StreamingJSONBase.factory(token, token_stream, persistent=False) + _visit(obj, visitor, ()) + obj.read_all() + else: + _visit(token, visitor, ()) + yield + + +def visit(fp_or_iterator, visitor, tokenizer=default_tokenizer): + next(visit_many(fp_or_iterator, visitor, tokenizer))