Skip to content

Commit 3101d5e

Browse files
committed
⚡️ (manager) Don't reiterate store at finish
1 parent 663aa83 commit 3101d5e

File tree

2 files changed

+10
-10
lines changed

2 files changed

+10
-10
lines changed

ingestors/manager.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,19 +3,19 @@
33
from functools import cache
44
from tempfile import mkdtemp
55
from timeit import default_timer
6-
from typing import Any, Generator
6+
from typing import Any
77

88
import magic
99
from banal import ensure_list
1010
from followthemoney import model
1111
from followthemoney.helpers import entity_filename
1212
from followthemoney.namespace import Namespace
13-
from followthemoney.proxy import EntityProxy
1413
from ftmq.store.fragments import get_fragments
1514
from ftmq.store.fragments.utils import safe_fragment
1615
from normality import stringify
1716
from openaleph_procrastinate import defer
18-
from procrastinate import App
17+
from openaleph_procrastinate.app import App
18+
from openaleph_procrastinate.util import make_checksum_entity
1919
from prometheus_client import Counter, Histogram
2020
from rigour.mime import normalize_mimetype
2121
from servicelayer.archive import init_archive
@@ -106,7 +106,7 @@ def __init__(self, app: App, dataset: str, context: dict[str, Any]):
106106
self.context = context
107107
self.ns = Namespace(self.context["namespace"])
108108
self.work_path = ensure_path(mkdtemp(prefix="ingestor-"))
109-
self.emitted = set()
109+
self.emitted = []
110110
self.archive = get_archive()
111111

112112
def make_entity(self, schema, parent=None):
@@ -138,7 +138,7 @@ def apply_context(self, entity, source):
138138
def emit_entity(self, entity, fragment=None):
139139
entity = self.ns.apply(entity)
140140
self.writer.put(entity.to_dict(), fragment)
141-
self.emitted.add(entity.id)
141+
self.emitted.append(make_checksum_entity(entity, quiet=True))
142142

143143
def emit_text_fragment(self, entity, texts, fragment):
144144
texts = [t for t in ensure_list(texts) if filter_text(t)]
@@ -259,6 +259,3 @@ def delegate(self, ingestor_class, file_path, entity):
259259
def close(self):
260260
self.writer.flush()
261261
remove_directory(self.work_path)
262-
263-
def iterate_emitted(self) -> Generator[EntityProxy, None, None]:
264-
yield from self.db.iterate(self.emitted)

ingestors/tasks.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,16 @@ def ingest(job: DatasetJob) -> None:
3939
finally:
4040
manager.close()
4141

42-
for entity in manager.iterate_emitted():
42+
for entity in manager.emitted:
4343
if entity.schema.is_a("Analyzable"):
4444
to_analyze.append(entity)
4545

4646
to_index.append(entity)
4747

48-
job.log.info(f"Emitted {len(manager.emitted)} entities.", emitted=manager.emitted)
48+
job.log.info(
49+
f"Emitted {len(manager.emitted)} entities.",
50+
emitted=[e.id for e in manager.emitted],
51+
)
4952
if to_analyze:
5053
defer.analyze(app, job.dataset, to_analyze, batch=job.batch, **job.context)
5154
if to_index:

0 commit comments

Comments
 (0)