Skip to content

Commit acbb9b9

Browse files
committed
⚡️ (manager) Use in-memory emitted entity store
1 parent 3101d5e commit acbb9b9

File tree

2 files changed

+8
-5
lines changed

2 files changed

+8
-5
lines changed

ingestors/manager.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from followthemoney import model
1111
from followthemoney.helpers import entity_filename
1212
from followthemoney.namespace import Namespace
13+
from ftmq.store import get_store
1314
from ftmq.store.fragments import get_fragments
1415
from ftmq.store.fragments.utils import safe_fragment
1516
from normality import stringify
@@ -106,7 +107,7 @@ def __init__(self, app: App, dataset: str, context: dict[str, Any]):
106107
self.context = context
107108
self.ns = Namespace(self.context["namespace"])
108109
self.work_path = ensure_path(mkdtemp(prefix="ingestor-"))
109-
self.emitted = []
110+
self.emitted = get_store("memory://")
110111
self.archive = get_archive()
111112

112113
def make_entity(self, schema, parent=None):
@@ -138,7 +139,8 @@ def apply_context(self, entity, source):
138139
def emit_entity(self, entity, fragment=None):
139140
entity = self.ns.apply(entity)
140141
self.writer.put(entity.to_dict(), fragment)
141-
self.emitted.append(make_checksum_entity(entity, quiet=True))
142+
with self.emitted.writer() as bulk:
143+
bulk.add_entity(make_checksum_entity(entity, quiet=True))
142144

143145
def emit_text_fragment(self, entity, texts, fragment):
144146
texts = [t for t in ensure_list(texts) if filter_text(t)]

ingestors/tasks.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,15 +39,16 @@ def ingest(job: DatasetJob) -> None:
3939
finally:
4040
manager.close()
4141

42-
for entity in manager.emitted:
42+
emitted = list(manager.emitted.iterate())
43+
for entity in emitted:
4344
if entity.schema.is_a("Analyzable"):
4445
to_analyze.append(entity)
4546

4647
to_index.append(entity)
4748

4849
job.log.info(
49-
f"Emitted {len(manager.emitted)} entities.",
50-
emitted=[e.id for e in manager.emitted],
50+
f"Emitted {len(emitted)} entities.",
51+
emitted=[e.id for e in emitted],
5152
)
5253
if to_analyze:
5354
defer.analyze(app, job.dataset, to_analyze, batch=job.batch, **job.context)

0 commit comments

Comments
 (0)