From 524ceafd9c98cf3d1b721a385417f56c8c8bc325 Mon Sep 17 00:00:00 2001 From: Jamie Cockburn Date: Wed, 7 Jun 2023 20:39:11 +0100 Subject: [PATCH 1/4] added buffered reading to tokenizer --- src/json_stream/loader.py | 4 ++-- src/json_stream/tests/test_buffering.py | 6 +++--- src/json_stream/tokenizer.py | 21 ++++++++++++++------- src/json_stream/visitor.py | 4 ++-- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/src/json_stream/loader.py b/src/json_stream/loader.py index 680e801..1e4b72e 100644 --- a/src/json_stream/loader.py +++ b/src/json_stream/loader.py @@ -3,9 +3,9 @@ from json_stream.select_tokenizer import default_tokenizer -def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer): +def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer, buffering=-1): fp = ensure_file(fp_or_iterable) - token_stream = tokenizer(fp) + token_stream = tokenizer(fp, buffering=buffering) token_type, token = next(token_stream) if token_type == TokenType.OPERATOR: return StreamingJSONBase.factory(token, token_stream, persistent) diff --git a/src/json_stream/tests/test_buffering.py b/src/json_stream/tests/test_buffering.py index f60033a..0e969e9 100644 --- a/src/json_stream/tests/test_buffering.py +++ b/src/json_stream/tests/test_buffering.py @@ -12,9 +12,9 @@ def test_buffering(self): self._test_buffering(tokenizer=rust_tokenizer_or_raise()) def test_buffering_python_tokenizer(self): - self._test_buffering(tokenizer=tokenize) + self._test_buffering(tokenizer=tokenize, buffering=0) - def _test_buffering(self, tokenizer): + def _test_buffering(self, tokenizer, **load_args): happenings = [] def data_in_chunks(data, chunk_size=15): @@ -24,7 +24,7 @@ def data_in_chunks(data, chunk_size=15): yield part json_string = b'{"tasks":[{"id":1,"title":"task1"},{"id":2,"title":"task2"},{"id":3,"title":"task3"}]}' - stream = json_stream.load(data_in_chunks(json_string), tokenizer=tokenizer) + stream = json_stream.load(data_in_chunks(json_string), tokenizer=tokenizer, **load_args) for task in stream["tasks"]: happenings.append(('item', to_standard_types(task))) diff --git a/src/json_stream/tokenizer.py b/src/json_stream/tokenizer.py index 21b0bb4..0cca7cc 100644 --- a/src/json_stream/tokenizer.py +++ b/src/json_stream/tokenizer.py @@ -78,7 +78,7 @@ def _ensure_text(stream): return stream -def tokenize(stream): +def tokenize(stream, *, buffering=-1, **_): stream = _ensure_text(stream) def is_delimiter(char): @@ -365,9 +365,19 @@ def process_char(char): return advance, next_state state = State.WHITESPACE - c = stream.read(1) - index = 0 - while c: + if not buffering: + buffering = 1 + elif buffering <= 0: + buffering = io.DEFAULT_BUFFER_SIZE + buffering = buffering.__index__() + buffer = stream.read(buffering) + c = None + index = -1 + advance = True + while buffer: + if advance: + c, buffer = buffer[0], buffer[1:] or stream.read(buffering) + index += 1 try: advance, state = process_char(c) except ValueError as e: @@ -376,9 +386,6 @@ def process_char(char): completed = False token = [] yield now_token - if advance: - c = stream.read(1) - index += 1 process_char(SpecialChar.EOF) if completed: yield now_token diff --git a/src/json_stream/visitor.py b/src/json_stream/visitor.py index 99edd38..7b434b2 100644 --- a/src/json_stream/visitor.py +++ b/src/json_stream/visitor.py @@ -19,9 +19,9 @@ def _visit(obj, visitor, path): visitor(obj, path) -def visit(fp_or_iterator, visitor, tokenizer=default_tokenizer): +def visit(fp_or_iterator, visitor, tokenizer=default_tokenizer, buffering=-1): fp = ensure_file(fp_or_iterator) - token_stream = tokenizer(fp) + token_stream = tokenizer(fp, buffering=buffering) _, token = next(token_stream) obj = StreamingJSONBase.factory(token, token_stream, persistent=False) _visit(obj, visitor, ()) From d84e7b85c064267ead20f26f899b5a9343241f45 Mon Sep 17 00:00:00 2001 From: Jamie Cockburn Date: Mon, 3 Jul 2023 20:23:00 +0100 Subject: [PATCH 2/4] fixed buffered python tokenizer bug causing final state to be sometimes missed --- src/json_stream/tokenizer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/json_stream/tokenizer.py b/src/json_stream/tokenizer.py index 0cca7cc..d5ac3e1 100644 --- a/src/json_stream/tokenizer.py +++ b/src/json_stream/tokenizer.py @@ -374,7 +374,7 @@ def process_char(char): c = None index = -1 advance = True - while buffer: + while buffer or not advance: if advance: c, buffer = buffer[0], buffer[1:] or stream.read(buffering) index += 1 From 77fbc99c5d2434840b6a767e3310419a3cc0cb78 Mon Sep 17 00:00:00 2001 From: Jamie Cockburn Date: Mon, 3 Jul 2023 20:24:28 +0100 Subject: [PATCH 3/4] fixed buffering tests merged from master --- src/json_stream/tests/test_buffering.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/json_stream/tests/test_buffering.py b/src/json_stream/tests/test_buffering.py index 9c932fa..e04c3bd 100644 --- a/src/json_stream/tests/test_buffering.py +++ b/src/json_stream/tests/test_buffering.py @@ -15,7 +15,7 @@ def data_in_chunks(data, chunk_size=15): yield part json_string = b'{"tasks":[{"id":1,"title":"task1"},{"id":2,"title":"task2"},{"id":3,"title":"task3"}]}' - stream = json_stream.load(data_in_chunks(json_string)) + stream = json_stream.load(data_in_chunks(json_string), buffering=0) for task in stream["tasks"]: happenings.append(('item', to_standard_types(task))) From a995eb5d39c0976fbebbb83fce4d94967489326a Mon Sep 17 00:00:00 2001 From: Jamie Cockburn Date: Mon, 3 Jul 2023 20:25:53 +0100 Subject: [PATCH 4/4] changed interface for getting default tokenizer --- src/json_stream/httpx/__init__.py | 25 ++++++++++++----- src/json_stream/loader.py | 8 +++--- src/json_stream/requests/__init__.py | 27 ++++++++++++++----- src/json_stream/select_tokenizer.py | 20 +++++++++----- src/json_stream/tests/__init__.py | 5 ++-- .../tests/test_tokenizer_integration.py | 6 ++--- src/json_stream/visitor.py | 8 +++--- 7 files changed, 63 insertions(+), 36 deletions(-) diff --git a/src/json_stream/httpx/__init__.py b/src/json_stream/httpx/__init__.py index c41141a..179c5f6 100644 --- a/src/json_stream/httpx/__init__.py +++ b/src/json_stream/httpx/__init__.py @@ -1,5 +1,4 @@ import json_stream -from json_stream.select_tokenizer import default_tokenizer CONTENT_CHUNK_SIZE = 10 * 1024 @@ -9,9 +8,21 @@ def _to_iterable(response, chunk_size): return response.iter_bytes(chunk_size=chunk_size) -def load(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): - return json_stream.load(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer) - - -def visit(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): - return json_stream.visit(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer) +def load(response, persistent=False, tokenizer=None, chunk_size=CONTENT_CHUNK_SIZE, buffering=0, **kwargs): + return json_stream.load( + _to_iterable(response, chunk_size), + persistent=persistent, + tokenizer=tokenizer, + buffering=buffering, + **kwargs + ) + + +def visit(response, visitor, tokenizer=None, chunk_size=CONTENT_CHUNK_SIZE, buffering=0, **kwargs): + return json_stream.visit( + _to_iterable(response, chunk_size), + visitor, + tokenizer=tokenizer, + buffering=buffering, + **kwargs + ) diff --git a/src/json_stream/loader.py b/src/json_stream/loader.py index 1e4b72e..970f1b4 100644 --- a/src/json_stream/loader.py +++ b/src/json_stream/loader.py @@ -1,11 +1,9 @@ from json_stream.base import StreamingJSONBase, TokenType -from json_stream.iterators import ensure_file -from json_stream.select_tokenizer import default_tokenizer +from json_stream.select_tokenizer import get_token_stream -def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer, buffering=-1): - fp = ensure_file(fp_or_iterable) - token_stream = tokenizer(fp, buffering=buffering) +def load(fp_or_iterable, persistent=False, tokenizer=None, buffering=-1, **kwargs): + token_stream = get_token_stream(fp_or_iterable, tokenizer=tokenizer, buffering=buffering, **kwargs) token_type, token = next(token_stream) if token_type == TokenType.OPERATOR: return StreamingJSONBase.factory(token, token_stream, persistent) diff --git a/src/json_stream/requests/__init__.py b/src/json_stream/requests/__init__.py index acbcb0f..418bbda 100644 --- a/src/json_stream/requests/__init__.py +++ b/src/json_stream/requests/__init__.py @@ -1,5 +1,4 @@ import json_stream -from json_stream.select_tokenizer import default_tokenizer CONTENT_CHUNK_SIZE = 10 * 1024 @@ -9,9 +8,23 @@ def _to_iterable(response, chunk_size): return response.iter_content(chunk_size=chunk_size) -def load(response, persistent=False, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): - return json_stream.load(_to_iterable(response, chunk_size), persistent=persistent, tokenizer=tokenizer) - - -def visit(response, visitor, tokenizer=default_tokenizer, chunk_size=CONTENT_CHUNK_SIZE): - return json_stream.visit(_to_iterable(response, chunk_size), visitor, tokenizer=tokenizer) +def load(response, persistent=False, tokenizer=None, chunk_size=CONTENT_CHUNK_SIZE, + buffering=0, **kwargs): + return json_stream.load( + _to_iterable(response, chunk_size), + persistent=persistent, + tokenizer=tokenizer, + buffering=buffering, + **kwargs + ) + + +def visit(response, visitor, tokenizer=None, chunk_size=CONTENT_CHUNK_SIZE, + buffering=0, **kwargs): + return json_stream.visit( + _to_iterable(response, chunk_size), + visitor, + tokenizer=tokenizer, + buffering=buffering, + **kwargs + ) diff --git a/src/json_stream/select_tokenizer.py b/src/json_stream/select_tokenizer.py index 0612770..92c30d2 100644 --- a/src/json_stream/select_tokenizer.py +++ b/src/json_stream/select_tokenizer.py @@ -1,12 +1,20 @@ from warnings import warn +from json_stream.iterators import ensure_file from json_stream.tokenizer import tokenize from json_stream_rs_tokenizer import rust_tokenizer_or_raise, ExtensionException -try: - default_tokenizer = rust_tokenizer_or_raise() -except ExtensionException as e: - warn(str(e), category=ImportWarning) # ImportWarnings are ignored by default - default_tokenizer = tokenize -__all__ = ['default_tokenizer'] +def get_tokenizer(**kwargs): + try: + return rust_tokenizer_or_raise(**kwargs) + except ExtensionException as e: + warn(str(e), category=ImportWarning) # ImportWarnings are ignored by default + return tokenize + + +def get_token_stream(fp_or_iterable, tokenizer, **tokenizer_kwargs): + fp = ensure_file(fp_or_iterable) + if tokenizer is None: + tokenizer = get_tokenizer(**tokenizer_kwargs) + return tokenizer(fp, **tokenizer_kwargs) diff --git a/src/json_stream/tests/__init__.py b/src/json_stream/tests/__init__.py index 89f7c93..3809f6f 100644 --- a/src/json_stream/tests/__init__.py +++ b/src/json_stream/tests/__init__.py @@ -3,14 +3,13 @@ from itertools import zip_longest from unittest import TestCase -from json_stream.select_tokenizer import default_tokenizer from json_stream import load from json_stream.base import TransientAccessException class JSONLoadTestCase(TestCase): - def _test_object(self, obj, persistent, binary=False, tokenizer=default_tokenizer): + def _test_object(self, obj, persistent, binary=False, tokenizer=None): self.assertListEqual(list(self._to_data(obj, persistent, binary, tokenizer)), list(obj)) self.assertListEqual(list(self._to_data(obj, persistent, binary, tokenizer).keys()), list(obj.keys())) self.assertListEqual(list(self._to_data(obj, persistent, binary, tokenizer).values()), list(obj.values())) @@ -40,7 +39,7 @@ def _test_object(self, obj, persistent, binary=False, tokenizer=default_tokenize with self.assertRaises(TransientAccessException): data.items() # can't get keys - def _test_list(self, obj, persistent, binary=False, tokenizer=default_tokenizer): + def _test_list(self, obj, persistent, binary=False, tokenizer=None): self.assertListEqual(list(self._to_data(obj, persistent, binary, tokenizer)), list(obj)) if persistent: self.assertEqual(len(self._to_data(obj, persistent, binary, tokenizer)), len(obj)) diff --git a/src/json_stream/tests/test_tokenizer_integration.py b/src/json_stream/tests/test_tokenizer_integration.py index d9e89d8..8ae2f1e 100644 --- a/src/json_stream/tests/test_tokenizer_integration.py +++ b/src/json_stream/tests/test_tokenizer_integration.py @@ -5,7 +5,7 @@ from json_stream import load -from json_stream.select_tokenizer import default_tokenizer +from json_stream.select_tokenizer import get_tokenizer from json_stream.tests import JSONLoadTestCase @@ -13,12 +13,12 @@ @skipUnless(hasattr(json_stream_rs_tokenizer, 'RustTokenizer'), 'rust tokenizer not available') class TestRSTokenizer(JSONLoadTestCase): def test_load_object(self): - self.assertIs(default_tokenizer, json_stream_rs_tokenizer.RustTokenizer) + self.assertIs(get_tokenizer(), json_stream_rs_tokenizer.RustTokenizer) obj = {"a": 1, "b": None, "c": True} self._test_object(obj, persistent=False) def test_load_object_binary(self): - self.assertIs(default_tokenizer, json_stream_rs_tokenizer.RustTokenizer) + self.assertIs(get_tokenizer(), json_stream_rs_tokenizer.RustTokenizer) obj = {"a": 1, "b": None, "c": True} self._test_object(obj, persistent=False, binary=True) diff --git a/src/json_stream/visitor.py b/src/json_stream/visitor.py index 7b434b2..e817bec 100644 --- a/src/json_stream/visitor.py +++ b/src/json_stream/visitor.py @@ -1,6 +1,5 @@ from json_stream.base import StreamingJSONObject, StreamingJSONList, StreamingJSONBase -from json_stream.iterators import ensure_file -from json_stream.select_tokenizer import default_tokenizer +from json_stream.select_tokenizer import get_token_stream def _visit(obj, visitor, path): @@ -19,9 +18,8 @@ def _visit(obj, visitor, path): visitor(obj, path) -def visit(fp_or_iterator, visitor, tokenizer=default_tokenizer, buffering=-1): - fp = ensure_file(fp_or_iterator) - token_stream = tokenizer(fp, buffering=buffering) +def visit(fp_or_iterable, visitor, tokenizer=None, buffering=-1, **kwargs): + token_stream = get_token_stream(fp_or_iterable, tokenizer=tokenizer, buffering=buffering, **kwargs) _, token = next(token_stream) obj = StreamingJSONBase.factory(token, token_stream, persistent=False) _visit(obj, visitor, ())