Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,14 @@ testpaths = ["tests"]
# -n 4: 4 xdist workers. With the 2-thread BLAS cap set in conftest.py
# that's 4 * 2 = 8 threads max. Measured: 118s single -> 66s (-44%).
# Override per run via `pytest -n 0` (serial) or `pytest -n auto`.
addopts = "-n 4 --timeout=60"
#
# --dist loadgroup: respect @pytest.mark.xdist_group so modules that share
# a module-scoped fixture (e.g. tests/test_e2e_real_embedder.py with a
# persistent GraphStore across classes) stay on one worker. Default
# LoadScheduling ignored the marker and fragmented module state across
# workers, breaking tests that depended on ingest state established in
# an earlier test class in the same module.
addopts = "-n 4 --timeout=60 --dist loadgroup"
markers = [
"needs_embedder: embedder test (model2vec is now core, kept for compat)",
"needs_fastembed: requires graphstore[embedders-extra] (fastembed)",
Expand Down
6 changes: 4 additions & 2 deletions src/graphstore/core/evolve/_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,8 @@ def compute_signals(self) -> dict:
from graphstore.core.memory import measure
m = measure(store, gs._vector_store, gs._document_store)
total_bytes = m.get("total", 0)
except Exception:
except Exception as err:
logger.debug("memory.measure() failed during evolve telemetry: %s", err)
total_bytes = 0

ceiling = getattr(store, "_ceiling_bytes", 1) or 1
Expand All @@ -281,7 +282,8 @@ def compute_signals(self) -> dict:
h = health_check(store, gs._vector_store, gs._document_store)
tombstone_ratio = h.get("tombstone_ratio", 0.0)
string_bloat = h.get("string_bloat", 0.0)
except Exception:
except Exception as err:
logger.debug("optimizer.health_check() failed during evolve telemetry: %s", err)
tombstone_ratio = 0.0
string_bloat = 0.0

Expand Down
7 changes: 5 additions & 2 deletions src/graphstore/core/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
graphstore store/vector/document types.
"""

import logging
import sys

logger = logging.getLogger(__name__)
from graphstore.core.errors import CeilingExceeded
from graphstore.algos.measure import (
estimate_bytes as _algo_estimate_bytes,
Expand Down Expand Up @@ -81,8 +84,8 @@ def measure(store, vector_store=None, document_store=None, skip_csr: bool = Fals
try:
stats = document_store.stats()
report["document_store_disk"] = stats.get("total_bytes", 0)
except Exception:
pass
except Exception as err:
logger.debug("document_store.stats() failed during memory accounting: %s", err)

return report

Expand Down
8 changes: 6 additions & 2 deletions src/graphstore/core/optimizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@

from __future__ import annotations

import logging

import numpy as np

logger = logging.getLogger(__name__)

from graphstore.algos.compact import (
apply_slot_remap_to_edges as _algo_apply_slot_remap,
build_live_mask as _algo_build_live_mask,
Expand Down Expand Up @@ -632,8 +636,8 @@ def _evict_nodes(store: CoreStore, slots_to_evict: list[int], vector_store=None,
if document_store is not None:
try:
document_store.delete_document(slot)
except Exception:
pass
except Exception as err:
logger.debug("document_store.delete_document(%s) failed during eviction: %s", slot, err)

# Tombstone
store.columns.clear(slot)
Expand Down
8 changes: 4 additions & 4 deletions src/graphstore/dsl/handlers/ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,12 +260,12 @@ def _ingest_body(self, q, result, chunks, parent_id, parent_kind,
ent_id = f"ent:{s}"
try:
self.store.put_node(ent_id, "entity", {"name": ent.text})
except Exception:
pass
except Exception as err:
logger.debug("put_node(%s) skipped during ingest entity link: %s", ent_id, err)
try:
self.store.put_edge(chunk_id, ent_id, "mentions")
except Exception:
pass
except Exception as err:
logger.debug("put_edge(%s -> %s) skipped during ingest entity link: %s", chunk_id, ent_id, err)

# Embed chunk text for vector retrieval
embed_batch.append((chunk_slot, embed_text))
Expand Down
11 changes: 7 additions & 4 deletions src/graphstore/dsl/handlers/mutations.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
"""Mutation handlers for the DSL executor (create, update, delete, merge, batch)."""

import logging
import time
from collections import deque

logger = logging.getLogger(__name__)

import numpy as np
from scipy.sparse import csr_matrix

Expand Down Expand Up @@ -311,12 +314,12 @@ def _create_node(self, q: CreateNode) -> Result:
ent_id = f"ent:{ent_slug_val}"
try:
self.store.put_node(ent_id, "entity", {"name": ent_display})
except Exception:
pass
except Exception as err:
logger.debug("put_node(%s) skipped during mutation entity link: %s", ent_id, err)
try:
self.store.put_edge(sent_id, ent_id, "mentions")
except Exception:
pass
except Exception as err:
logger.debug("put_edge(%s -> %s) skipped during mutation entity link: %s", sent_id, ent_id, err)

if batch_embed_sentences:
self._batch_embed_and_store(list(zip(sent_slots, sentences)))
Expand Down
4 changes: 2 additions & 2 deletions src/graphstore/dsl/sys/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,8 @@ def _consolidate(self, q: SysConsolidate) -> Result:
if ev_id:
try:
store.put_edge(obs_id, ev_id, "evidence")
except Exception:
pass
except Exception as err:
logger.debug("evidence edge %s -> %s skipped: %s", obs_id, ev_id, err)

created += 1
except Exception:
Expand Down
4 changes: 2 additions & 2 deletions src/graphstore/ingest/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ def connect_all(store, vector_store, threshold=0.85, where_expr=None, executor=N
if progress_callback is not None and slot % 100 == 0:
try:
progress_callback(slot, n)
except Exception:
pass
except Exception as err:
logger.debug("user progress_callback raised (ignored): %s", err)

if not live[slot] or not vector_store.has_vector(slot):
continue
Expand Down
Loading