Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions ddp_reasoning.py
Original file line number Diff line number Diff line change
Expand Up @@ -947,6 +947,37 @@ def _cross_shard_instances(self, ce: OWLClassExpression, direct: bool = False) -
for shard_res in shard_results:
if ce_str in shard_res:
combined[ce_str] |= shard_res[ce_str]
elif isinstance(ce_obj, OWLObjectAllValuesFrom):
# 1. Keep what shards could prove locally (e.g., TBox closure, same-shard fillers)
local_results = set()
for shard_res in shard_results:
if ce_str in shard_res:
local_results |= shard_res[ce_str]

# 2. Bridge the cross-shard gap using Nominal Injection
filler_str = str(ce_obj.get_filler())
filler_iris = combined.get(filler_str, set())

cross_results = set()
if filler_iris:
# Package globally known C instances into a closed OWA set: {i_1, i_2, ...}
nominal_filler = OWLObjectOneOf([OWLNamedIndividual(iri) for iri in filler_iris])

# Create a new query: ∀ r.{i_1, i_2, ...}
cross_ce = OWLObjectAllValuesFrom(
property=ce_obj.get_property(),
filler=nominal_filler
)

# Dispatch this targeted cross-shard query
futures = [shard.query_instances.remote(cross_ce) for shard in self.shards]
shard_cross_results = ray.get(futures)

if shard_cross_results:
cross_results = set().union(*shard_cross_results)

# The final OWA-sound result is the union of local and cross-shard deductions
combined[ce_str] = local_results | cross_results
else:
# For other CEs (atomic classes, unions, intersections): union across
# shards is correct because the ABox is partitioned — each individual
Expand Down
Loading