Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 61 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ Simple streaming JSON parser and encoder.

When [reading](#reading) JSON data, `json-stream` can decode JSON data in
a streaming manner, providing a pythonic dict/list-like interface, or a
[visitor-based interfeace](#visitor). Can stream from files, [URLs](#urls)
or [iterators](#iterators).
[visitor-based interface](#visitor). It can stream from files, [URLs](#urls)
or [iterators](#iterators). It can process [multiple JSON documents](#multiple) in a single stream.

When [writing](#writing) JSON data, `json-stream` can stream JSON objects
as you generate them.
Expand Down Expand Up @@ -226,6 +226,65 @@ z at path ('xxxx', 3)
[] at path ('xxxx', 5)
```

### <a id="multiple"></a> Multiple JSON documents: `load_many()` and `visit_many()`

Sometimes JSON data arrives as a sequence of top‑level JSON texts rather than a single array/object. json-stream supports this pattern with:
- `json_stream.load_many(...)`: yields each top-level JSON value as it is parsed.
- `json_stream.visit_many(...)`: visits each top-level JSON value and yields control after each one.

These functions are useful for common streaming formats:
- NDJSON (Newline-Delimited JSON, also known as JSON Lines, content-type often application/x-ndjson): one JSON value per line, separated by a single "\n".
- Concatenated or sequential JSON documents: complete JSON texts written back-to-back without delimiters.

Note about concatenated JSON without explicit delimiters:

When multiple top‑level JSON documents are simply concatenated with
no delimiters (no newlines or no spaces), json-stream can unambiguously
detect the document boundaries for the following top-level values:
- objects: `{ ... }`
- arrays: `[ ... ]`
- literals: `true`, `false`, `null`

However, numbers and strings require a delimiter between consecutive
documents so the tokenizer can tell where one ends and the next begins.
A delimiter can be as simple as a single whitespace or a newline.

Examples:
- Valid without delimiters (will parse as 2 docs):
- `{"a":1}{"b":2}` → `{...}`, `{...}`
- `[1][2]` → `[1]`, `[2]`
- `truefalse` → `true`, `false`
- `null[]` → `null`, `[]`
- These will error without a delimiter between documents (e.g. a space or newline):
- `3true`
- `0"x"`
- `"hi""there"

#### Examples

Read many from a file containing NDJSON or concatenated JSON:
```python
import json_stream

with open("events.ndjson", "rb") as f: # NDJSON: one JSON object per line
for item in json_stream.load_many(f):
# each item is the top-level item in each JSON text
handle(item)
```

Visit many values in many documents:
```python
import json_stream

def visitor(value, path):
# called depth-first for each value within the current top-level JSON text
...

for _ in json_stream.visit_many(open("events.ndjson", "rb"), visitor):
# loop advances one top-level JSON text at a time
pass
```

### <a id="urls"></a> Stream a URL

`json_stream` knows how to stream directly from a URL using a variety of packages.
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "json-stream"
version = "2.3.4"
version = "2.4.0"
authors = [
{name = "Jamie Cockburn", email="jamie_cockburn@hotmail.co.uk"},
]
Expand Down
4 changes: 2 additions & 2 deletions src/json_stream/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from json_stream.loader import load # noqa: F401
from json_stream.visitor import visit # noqa: F401
from json_stream.loader import load, load_many # noqa: F401
from json_stream.visitor import visit, visit_many # noqa: F401
from json_stream.writer import streamable_list, streamable_dict # noqa: F401
from json_stream.util import to_standard_types
8 changes: 8 additions & 0 deletions src/json_stream/httpx/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ def load(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CON
return json_stream.load(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer)


def load_many(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.load_many(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer)


def visit(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.visit(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer)


def visit_many(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.visit_many(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer)
48 changes: 42 additions & 6 deletions src/json_stream/httpx/tests/test_httpx.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from unittest import TestCase
from unittest.mock import Mock

from json_stream.httpx import load, visit
from json_stream import to_standard_types
from json_stream.httpx import load, visit, load_many, visit_many


class TestLoad(TestCase):
Expand All @@ -16,13 +17,14 @@ def grouper(iterable, n):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue="")

def _create_mock_response(self):
def _create_mock_response(self, data=None):
# httpx iter_bytes returns an iterable of bytes
response = Mock()
data = json.dumps({
"a": "a" * io.DEFAULT_BUFFER_SIZE,
"b": "b",
})
if data is None:
data = json.dumps({
"a": "a" * io.DEFAULT_BUFFER_SIZE,
"b": "b",
})
content = ("".join(chunk).encode() for chunk in self.grouper(data, 1024))
response.iter_bytes.return_value = content
return response
Expand Down Expand Up @@ -69,3 +71,37 @@ def test_visitor(self):
('a' * io.DEFAULT_BUFFER_SIZE, ('a',)),
('b', ('b',)),
], visited)

def test_load_many(self):
expected = [{
"a": "a" * io.DEFAULT_BUFFER_SIZE,
}, {
"b": "b" * io.DEFAULT_BUFFER_SIZE,
}]
data = "".join(json.dumps(i) for i in expected)
response = self._create_mock_response(data=data)
count = 0
for item, exp in zip(load_many(response), expected):
self.assertEqual(exp, to_standard_types(item))
count += 1
self.assertEqual(count, len(expected))

def test_visit_many(self):
items = [{
"a": "a" * io.DEFAULT_BUFFER_SIZE,
}, {
"b": "b" * io.DEFAULT_BUFFER_SIZE,
}]
data = "".join(json.dumps(e) for e in items)
response = self._create_mock_response(data=data)
current_visit = []
visits = []

def visitor(item, path):
current_visit.append((item, path))
for _ in visit_many(response, visitor):
visits.append(current_visit)
current_visit = []
self.assertEqual(len(visits), len(items))
self.assertEqual(visits[0], [('a' * io.DEFAULT_BUFFER_SIZE, ('a',))])
self.assertEqual(visits[1], [('b' * io.DEFAULT_BUFFER_SIZE, ('b',))])
15 changes: 11 additions & 4 deletions src/json_stream/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,16 @@


def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer):
return next(load_many(fp_or_iterable, persistent, tokenizer))


def load_many(fp_or_iterable, persistent=False, tokenizer=default_tokenizer):
fp = ensure_file(fp_or_iterable)
token_stream = tokenizer(fp)
token_type, token = next(token_stream)
if token_type == TokenType.OPERATOR:
return StreamingJSONBase.factory(token, token_stream, persistent)
return token
for token_type, token in token_stream:
if token_type == TokenType.OPERATOR:
data = StreamingJSONBase.factory(token, token_stream, persistent)
yield data
data.read_all()
else:
yield token
8 changes: 8 additions & 0 deletions src/json_stream/requests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,13 @@ def load(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CON
return json_stream.load(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer)


def load_many(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.load_many(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer)


def visit(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.visit(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer)


def visit_many(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE):
return json_stream.visit_many(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer)
48 changes: 42 additions & 6 deletions src/json_stream/requests/tests/test_requests.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from unittest import TestCase
from unittest.mock import Mock

from json_stream.requests import load, visit
from json_stream import to_standard_types
from json_stream.requests import load, visit, load_many, visit_many


class TestLoad(TestCase):
Expand All @@ -17,13 +18,14 @@ def grouper(iterable, n):
args = [iter(iterable)] * n
return zip_longest(*args, fillvalue="")

def _create_mock_response(self):
def _create_mock_response(self, data=None):
# requests iter_content returns an iterable of bytes
response = Mock()
data = json.dumps({
"a": "a" * io.DEFAULT_BUFFER_SIZE,
"b": "b",
})
if data is None:
data = json.dumps({
"a": "a" * io.DEFAULT_BUFFER_SIZE,
"b": "b",
})
content = ("".join(chunk).encode() for chunk in self.grouper(data, 1024))
response.iter_content.return_value = content
return response
Expand Down Expand Up @@ -70,3 +72,37 @@ def test_visitor(self):
('a' * io.DEFAULT_BUFFER_SIZE, ('a',)),
('b', ('b',)),
], visited)

def test_load_many(self):
expected = [{
"a": "a" * io.DEFAULT_BUFFER_SIZE,
}, {
"b": "b" * io.DEFAULT_BUFFER_SIZE,
}]
data = "".join(json.dumps(i) for i in expected)
response = self._create_mock_response(data=data)
count = 0
for item, exp in zip(load_many(response), expected):
self.assertEqual(exp, to_standard_types(item))
count += 1
self.assertEqual(count, len(expected))

def test_visit_many(self):
items = [{
"a": "a" * io.DEFAULT_BUFFER_SIZE,
}, {
"b": "b" * io.DEFAULT_BUFFER_SIZE,
}]
data = "".join(json.dumps(e) for e in items)
response = self._create_mock_response(data=data)
current_visit = []
visits = []

def visitor(item, path):
current_visit.append((item, path))
for _ in visit_many(response, visitor):
visits.append(current_visit)
current_visit = []
self.assertEqual(len(visits), len(items))
self.assertEqual(visits[0], [('a' * io.DEFAULT_BUFFER_SIZE, ('a',))])
self.assertEqual(visits[1], [('b' * io.DEFAULT_BUFFER_SIZE, ('b',))])
2 changes: 1 addition & 1 deletion src/json_stream/select_tokenizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

try:
default_tokenizer = rust_tokenizer_or_raise()
except ExtensionException as e:
except ExtensionException as e: # pragma: no cover
warn(str(e), category=ImportWarning) # ImportWarnings are ignored by default
default_tokenizer = tokenize

Expand Down
102 changes: 102 additions & 0 deletions src/json_stream/tests/test_load_many.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
import unittest
from io import StringIO, BytesIO
from unittest import TestCase

import json_stream
from json_stream import to_standard_types


class TestLoadMany(TestCase):
def test_load_many(self):
# NDJSON-like: one JSON value per line
payload = "\n".join([
'{"a": 1}',
'[1, 2]',
'3',
'true',
'null',
'"x"',
'{}',
'[]',
])
stream = StringIO(payload)
items = [
to_standard_types(v) for v in json_stream.load_many(stream, persistent=True)
]
self.assertListEqual(
items,
[
{"a": 1},
[1, 2],
3,
True,
None,
"x",
{},
[],
],
)

def test_load_many_concatenated(self):
payload = b'{"a": 1}[1,2]truenull"x"{}[]'
stream = BytesIO(payload)
items = [
to_standard_types(v) for v in json_stream.load_many(stream, persistent=False)
]
# Even when persistent=False, materialization should consume correctly per item
self.assertListEqual(
items,
[
{"a": 1},
[1, 2],
True,
None,
"x",
{},
[],
],
)

@unittest.expectedFailure
def test_load_many_incompatible_concatenations(self):
# this fails because the tokeniser want's certain primitives to be followed by delimiter
payloads = [
[b'{"a": 1}[]', [{"a": 1}, []]], # this one works
[b'3true', [3, True]],
[b'3.0true', [3.0, True]],
[b'1.2e11true', [120000000000.0, True]],
[b'0true', [0, True]],
[b'""true', [0, True]],
]
errors = []
for payload, expected in payloads:
try:
stream = BytesIO(payload)
items = [
to_standard_types(v) for v in json_stream.load_many(stream, persistent=False)
]
self.assertListEqual(expected, items)
except ValueError as e:
errors.append(f"{payload}: {e}")
self.assertListEqual([], errors)

def test_load_many_skips_after_item_partially_consumed(self):
# Ensure that after yielding an object/array, the generator resumes and continues
# to the next top-level JSON text correctly.
payload = '{"first": [1, 2, 3]} {"second": {"x": 1}, "unconsumed": 7} 4'
stream = StringIO(payload)

gen = json_stream.load_many(stream, persistent=True)

first = next(gen)
# consume partially then fully
self.assertEqual(list(first["first"]), [1, 2, 3])

second = next(gen)
self.assertEqual(dict(second["second"].items()), {"x": 1})

third = next(gen)
self.assertEqual(third, 4)

with self.assertRaises(StopIteration):
next(gen)
Loading