diff --git a/pyproject.toml b/pyproject.toml index 1263cab..7a3d123 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -128,7 +128,14 @@ testpaths = ["tests"] # -n 4: 4 xdist workers. With the 2-thread BLAS cap set in conftest.py # that's 4 * 2 = 8 threads max. Measured: 118s single -> 66s (-44%). # Override per run via `pytest -n 0` (serial) or `pytest -n auto`. -addopts = "-n 4 --timeout=60" +# +# --dist loadgroup: respect @pytest.mark.xdist_group so modules that share +# a module-scoped fixture (e.g. tests/test_e2e_real_embedder.py with a +# persistent GraphStore across classes) stay on one worker. Default +# LoadScheduling ignored the marker and fragmented module state across +# workers, breaking tests that depended on ingest state established in +# an earlier test class in the same module. +addopts = "-n 4 --timeout=60 --dist loadgroup" markers = [ "needs_embedder: embedder test (model2vec is now core, kept for compat)", "needs_fastembed: requires graphstore[embedders-extra] (fastembed)", diff --git a/src/graphstore/core/evolve/_impl.py b/src/graphstore/core/evolve/_impl.py index fab4f33..9a4d6c3 100644 --- a/src/graphstore/core/evolve/_impl.py +++ b/src/graphstore/core/evolve/_impl.py @@ -265,7 +265,8 @@ def compute_signals(self) -> dict: from graphstore.core.memory import measure m = measure(store, gs._vector_store, gs._document_store) total_bytes = m.get("total", 0) - except Exception: + except Exception as err: + logger.debug("memory.measure() failed during evolve telemetry: %s", err) total_bytes = 0 ceiling = getattr(store, "_ceiling_bytes", 1) or 1 @@ -281,7 +282,8 @@ def compute_signals(self) -> dict: h = health_check(store, gs._vector_store, gs._document_store) tombstone_ratio = h.get("tombstone_ratio", 0.0) string_bloat = h.get("string_bloat", 0.0) - except Exception: + except Exception as err: + logger.debug("optimizer.health_check() failed during evolve telemetry: %s", err) tombstone_ratio = 0.0 string_bloat = 0.0 diff --git a/src/graphstore/core/memory.py b/src/graphstore/core/memory.py index d6dd72b..f5e7ef3 100755 --- a/src/graphstore/core/memory.py +++ b/src/graphstore/core/memory.py @@ -4,7 +4,10 @@ graphstore store/vector/document types. """ +import logging import sys + +logger = logging.getLogger(__name__) from graphstore.core.errors import CeilingExceeded from graphstore.algos.measure import ( estimate_bytes as _algo_estimate_bytes, @@ -81,8 +84,8 @@ def measure(store, vector_store=None, document_store=None, skip_csr: bool = Fals try: stats = document_store.stats() report["document_store_disk"] = stats.get("total_bytes", 0) - except Exception: - pass + except Exception as err: + logger.debug("document_store.stats() failed during memory accounting: %s", err) return report diff --git a/src/graphstore/core/optimizer.py b/src/graphstore/core/optimizer.py index 7b4844d..039e863 100644 --- a/src/graphstore/core/optimizer.py +++ b/src/graphstore/core/optimizer.py @@ -6,8 +6,12 @@ from __future__ import annotations +import logging + import numpy as np +logger = logging.getLogger(__name__) + from graphstore.algos.compact import ( apply_slot_remap_to_edges as _algo_apply_slot_remap, build_live_mask as _algo_build_live_mask, @@ -632,8 +636,8 @@ def _evict_nodes(store: CoreStore, slots_to_evict: list[int], vector_store=None, if document_store is not None: try: document_store.delete_document(slot) - except Exception: - pass + except Exception as err: + logger.debug("document_store.delete_document(%s) failed during eviction: %s", slot, err) # Tombstone store.columns.clear(slot) diff --git a/src/graphstore/dsl/handlers/ingest.py b/src/graphstore/dsl/handlers/ingest.py index b95f9f7..75ff2be 100644 --- a/src/graphstore/dsl/handlers/ingest.py +++ b/src/graphstore/dsl/handlers/ingest.py @@ -260,12 +260,12 @@ def _ingest_body(self, q, result, chunks, parent_id, parent_kind, ent_id = f"ent:{s}" try: self.store.put_node(ent_id, "entity", {"name": ent.text}) - except Exception: - pass + except Exception as err: + logger.debug("put_node(%s) skipped during ingest entity link: %s", ent_id, err) try: self.store.put_edge(chunk_id, ent_id, "mentions") - except Exception: - pass + except Exception as err: + logger.debug("put_edge(%s -> %s) skipped during ingest entity link: %s", chunk_id, ent_id, err) # Embed chunk text for vector retrieval embed_batch.append((chunk_slot, embed_text)) diff --git a/src/graphstore/dsl/handlers/mutations.py b/src/graphstore/dsl/handlers/mutations.py index 5b22dbf..01b6b47 100644 --- a/src/graphstore/dsl/handlers/mutations.py +++ b/src/graphstore/dsl/handlers/mutations.py @@ -1,8 +1,11 @@ """Mutation handlers for the DSL executor (create, update, delete, merge, batch).""" +import logging import time from collections import deque +logger = logging.getLogger(__name__) + import numpy as np from scipy.sparse import csr_matrix @@ -311,12 +314,12 @@ def _create_node(self, q: CreateNode) -> Result: ent_id = f"ent:{ent_slug_val}" try: self.store.put_node(ent_id, "entity", {"name": ent_display}) - except Exception: - pass + except Exception as err: + logger.debug("put_node(%s) skipped during mutation entity link: %s", ent_id, err) try: self.store.put_edge(sent_id, ent_id, "mentions") - except Exception: - pass + except Exception as err: + logger.debug("put_edge(%s -> %s) skipped during mutation entity link: %s", sent_id, ent_id, err) if batch_embed_sentences: self._batch_embed_and_store(list(zip(sent_slots, sentences))) diff --git a/src/graphstore/dsl/sys/pipeline.py b/src/graphstore/dsl/sys/pipeline.py index 56a40b3..2a7524a 100644 --- a/src/graphstore/dsl/sys/pipeline.py +++ b/src/graphstore/dsl/sys/pipeline.py @@ -248,8 +248,8 @@ def _consolidate(self, q: SysConsolidate) -> Result: if ev_id: try: store.put_edge(obs_id, ev_id, "evidence") - except Exception: - pass + except Exception as err: + logger.debug("evidence edge %s -> %s skipped: %s", obs_id, ev_id, err) created += 1 except Exception: diff --git a/src/graphstore/ingest/connector.py b/src/graphstore/ingest/connector.py index 1c6d0c2..7a6cfdf 100755 --- a/src/graphstore/ingest/connector.py +++ b/src/graphstore/ingest/connector.py @@ -40,8 +40,8 @@ def connect_all(store, vector_store, threshold=0.85, where_expr=None, executor=N if progress_callback is not None and slot % 100 == 0: try: progress_callback(slot, n) - except Exception: - pass + except Exception as err: + logger.debug("user progress_callback raised (ignored): %s", err) if not live[slot] or not vector_store.has_vector(slot): continue