Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 85 additions & 58 deletions src/json_stream/base.py
Original file line number Diff line number Diff line change
@@ -1,65 +1,51 @@
import collections
import copy
from abc import ABC
from collections import OrderedDict
from collections import OrderedDict, deque
from itertools import chain
from typing import Optional, Iterator, Any

from json_stream.tokenizer import TokenType
from json_stream.tokenizer import (
OPERATOR,
STRING,
)

COLON = (OPERATOR, ":")


class TransientAccessException(Exception):
pass


class StreamingJSONBase(ABC):
class StreamingJSONBase(object):
INCOMPLETE_ERROR = "Unexpected end of file"

@classmethod
def factory(cls, token, token_stream, persistent):
if persistent:
if token == '{':
return PersistentStreamingJSONObject(token_stream)
if token == '[':
return PersistentStreamingJSONList(token_stream)
else:
if token == '{':
return TransientStreamingJSONObject(token_stream)
if token == '[':
return TransientStreamingJSONList(token_stream)
raise ValueError(f"Unknown operator {token}") # pragma: no cover

_persistent_children: bool
__slots__ = '_persistent_children', '_stream', '_child', 'streaming'

def __init__(self, token_stream):
# this is inlined in subclasses
self.streaming = True
self._stream = token_stream
self._child: Optional[StreamingJSONBase] = None

def _clear_child(self):
if self._child is not None:
self._child.read_all()
self._child = None

def _iter_items(self):
if not self.streaming:
return
load = self._load_item
while True:
if not self.streaming:
return
self._clear_child()
# clear child
if self._child is not None:
# inlined from read_all()
deque(self._child._iter_items(), maxlen=0)
self._child = None

try:
item = self._load_item()
yield load()
except StopIteration:
if self.streaming:
raise ValueError(self.INCOMPLETE_ERROR)
return
yield item

def _done(self):
self.streaming = False
raise StopIteration()

def read_all(self):
collections.deque(self._iter_items(), maxlen=0)
deque(self._iter_items(), maxlen=0)

def _load_item(self):
raise NotImplementedError() # pragma: no cover
Expand All @@ -83,9 +69,15 @@ def __deepcopy__(self, memo):
raise copy.Error("Copying json_steam objects leads to a bad time")


class PersistentStreamingJSONBase(StreamingJSONBase, ABC):
class PersistentStreamingJSONBase(StreamingJSONBase):
__slots__ = '_data'

def __init__(self, token_stream):
super().__init__(token_stream)
# inlined from super
self.streaming = True
self._stream = token_stream
self._child: Optional[StreamingJSONBase] = None

self._data = self._init_persistent_data()
self._persistent_children = True

Expand All @@ -107,9 +99,15 @@ def __repr__(self): # pragma: no cover
return f"<{type(self).__name__}: {repr(self._data)}, {'STREAMING' if self.streaming else 'DONE'}>"


class TransientStreamingJSONBase(StreamingJSONBase, ABC):
class TransientStreamingJSONBase(StreamingJSONBase):
__slots__ = '_started',

def __init__(self, token_stream):
super().__init__(token_stream)
# inlined from super
self.streaming = True
self._stream = token_stream
self._child: Optional[StreamingJSONBase] = None

self._started = False
self._persistent_children = False

Expand Down Expand Up @@ -137,29 +135,35 @@ def __repr__(self): # pragma: no cover
return f"<{type(self).__name__}: TRANSIENT, {'STREAMING' if self.streaming else 'DONE'}>"


class StreamingJSONList(StreamingJSONBase, ABC):
class StreamingJSONList(StreamingJSONBase):
INCOMPLETE_ERROR = "Unterminated list at end of file"

__slots__ = ()

def _load_item(self):
token_type, v = next(self._stream)
if token_type == TokenType.OPERATOR:
stream = self._stream
token_type, v = next(stream)
if token_type == OPERATOR:
if v == ']':
self._done()
self.streaming = False
raise StopIteration()
if v == ',':
token_type, v = next(self._stream)
token_type, v = next(stream)
elif v in '{[':
pass
else: # pragma: no cover
raise ValueError(f"Expecting value, comma or ], got {v}")
if token_type == TokenType.OPERATOR:
self._child = v = self.factory(v, self._stream, self._persistent_children)
if token_type == OPERATOR:
self._child = v = factory[self._persistent_children, v](stream)
return v

def _get__iter__(self):
return self._iter_items()


class PersistentStreamingJSONList(PersistentStreamingJSONBase, StreamingJSONList):
__slots__ = ()

def _init_persistent_data(self):
return []

Expand All @@ -185,8 +189,16 @@ def __getitem__(self, k) -> Any:


class TransientStreamingJSONList(TransientStreamingJSONBase, StreamingJSONList):
__slots__ = "_index",

def __init__(self, token_stream):
super().__init__(token_stream)
# inlined from super
self.streaming = True
self._stream = token_stream
self._child: Optional[StreamingJSONBase] = None
self._started = False
self._persistent_children = False

self._index = -1

def _load_item(self):
Expand All @@ -203,26 +215,29 @@ def _find_item(self, i):
raise IndexError(f"Index {i} out of range")


class StreamingJSONObject(StreamingJSONBase, ABC):
class StreamingJSONObject(StreamingJSONBase):
INCOMPLETE_ERROR = "Unterminated object at end of file"

__slots__ = ()

def _load_item(self):
token_type, k = next(self._stream)
if token_type == TokenType.OPERATOR:
stream = self._stream
token_type, k = next(stream)
if token_type == OPERATOR:
if k == '}':
self._done()
self.streaming = False
raise StopIteration()
if k == ',':
token_type, k = next(self._stream)
if token_type != TokenType.STRING: # pragma: no cover
token_type, k = next(stream)
if token_type != STRING: # pragma: no cover
raise ValueError(f"Expecting string, comma or }}, got {k} ({token_type})")

token_type, token = next(self._stream)
if token_type != TokenType.OPERATOR or token != ":":
if next(stream) != COLON:
raise ValueError("Expecting :") # pragma: no cover

token_type, v = next(self._stream)
if token_type == TokenType.OPERATOR:
self._child = v = self.factory(v, self._stream, self._persistent_children)
token_type, v = next(stream)
if token_type == OPERATOR:
self._child = v = factory[self._persistent_children, v](stream)
return k, v

def _get__iter__(self):
Expand Down Expand Up @@ -251,6 +266,8 @@ def get(self, k, default=None) -> Any:


class PersistentStreamingJSONObject(PersistentStreamingJSONBase, StreamingJSONObject):
__slots__ = ()

def _init_persistent_data(self):
return OrderedDict()

Expand All @@ -277,6 +294,8 @@ def __getitem__(self, k) -> Any:


class TransientStreamingJSONObject(TransientStreamingJSONBase, StreamingJSONObject):
__slots__ = ()

def _find_item(self, k):
was_started = self._started
try:
Expand All @@ -299,3 +318,11 @@ def keys(self):
def values(self):
self._check_started()
return (v for k, v in self._iter_items())


factory = {
(True, '{'): PersistentStreamingJSONObject,
(True, '['): PersistentStreamingJSONList,
(False, '{'): TransientStreamingJSONObject,
(False, '['): TransientStreamingJSONList,
}
7 changes: 4 additions & 3 deletions src/json_stream/loader.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from json_stream.base import StreamingJSONBase, TokenType
from json_stream.base import factory
from json_stream.iterators import ensure_file
from json_stream.select_tokenizer import default_tokenizer
from json_stream.tokenizer import OPERATOR


def load(fp_or_iterable, persistent=False, tokenizer=default_tokenizer):
fp = ensure_file(fp_or_iterable)
token_stream = tokenizer(fp)
token_type, token = next(token_stream)
if token_type == TokenType.OPERATOR:
return StreamingJSONBase.factory(token, token_stream, persistent)
if token_type == OPERATOR:
return factory[persistent, token](token_stream)
return token
Loading