Skip to content

Commit 5427f18

Browse files
committed
✨(backend) use batches in indexing task
Reduce the number of Find API calls by grouping all the latest changes for indexation : send all the documents updated or deleted since the triggering of the task. Signed-off-by: Fabre Florian <ffabre@hybird.org>
1 parent a83f140 commit 5427f18

File tree

7 files changed

+498
-370
lines changed

7 files changed

+498
-370
lines changed

src/backend/core/models.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -904,7 +904,8 @@ def soft_delete(self):
904904

905905
# Mark all descendants as soft deleted
906906
self.get_descendants().filter(ancestors_deleted_at__isnull=True).update(
907-
ancestors_deleted_at=self.ancestors_deleted_at
907+
ancestors_deleted_at=self.ancestors_deleted_at,
908+
updated_at=self.updated_at,
908909
)
909910

910911
@transaction.atomic

src/backend/core/services/search_indexers.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,16 +130,17 @@ def __init__(self, batch_size=None):
130130
"SEARCH_INDEXER_QUERY_URL must be set in Django settings."
131131
)
132132

133-
def index(self):
133+
def index(self, queryset=None):
134134
"""
135135
Fetch documents in batches, serialize them, and push to the search backend.
136136
"""
137137
last_id = 0
138138
count = 0
139+
queryset = queryset or models.Document.objects.all()
139140

140141
while True:
141142
documents_batch = list(
142-
models.Document.objects.filter(
143+
queryset.filter(
143144
id__gt=last_id,
144145
).order_by("id")[: self.batch_size]
145146
)

src/backend/core/signals.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
from django.dispatch import receiver
1010

1111
from . import models
12-
from .tasks.search import trigger_document_indexer
12+
from .tasks.search import trigger_batch_document_indexer
1313

1414

1515
@receiver(signals.post_save, sender=models.Document)
@@ -19,7 +19,7 @@ def document_post_save(sender, instance, **kwargs): # pylint: disable=unused-ar
1919
Note : Within the transaction we can have an empty content and a serialization
2020
error.
2121
"""
22-
transaction.on_commit(partial(trigger_document_indexer, instance))
22+
transaction.on_commit(partial(trigger_batch_document_indexer, instance))
2323

2424

2525
@receiver(signals.post_save, sender=models.DocumentAccess)
@@ -28,4 +28,6 @@ def document_access_post_save(sender, instance, created, **kwargs): # pylint: d
2828
Asynchronous call to the document indexer at the end of the transaction.
2929
"""
3030
if not created:
31-
transaction.on_commit(partial(trigger_document_indexer, instance.document))
31+
transaction.on_commit(
32+
partial(trigger_batch_document_indexer, instance.document)
33+
)

src/backend/core/tasks/search.py

Lines changed: 46 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@
44

55
from django.conf import settings
66
from django.core.cache import cache
7+
from django.db.models import Q
78

89
from django_redis.cache import RedisCache
910

1011
from core import models
1112
from core.services.search_indexers import (
12-
get_batch_accesses_by_users_and_teams,
1313
get_document_indexer,
1414
)
1515

@@ -18,16 +18,30 @@
1818
logger = getLogger(__file__)
1919

2020

21-
def indexer_throttle_acquire(document_id, timeout=0, atomic=True):
21+
@app.task
22+
def document_indexer_task(document_id):
23+
"""Celery Task : Sends indexation query for a document."""
24+
indexer = get_document_indexer()
25+
26+
if indexer is None:
27+
return
28+
29+
logger.info("Start document %s indexation", document_id)
30+
indexer.index(models.Document.objects.filter(pk=document_id))
31+
32+
33+
def batch_indexer_throttle_acquire(timeout: int = 0, atomic: bool = True):
2234
"""
2335
Enable the task throttle flag for a delay.
2436
Uses redis locks if available to ensure atomic changes
2537
"""
26-
key = f"doc-indexer-throttle-{document_id}"
38+
key = "document-batch-indexer-throttle"
2739

40+
# Redis is used as cache database (not in tests). Use the lock feature here
41+
# to ensure atomicity of changes to the throttle flag.
2842
if isinstance(cache, RedisCache) and atomic:
2943
with cache.locks(key):
30-
return indexer_throttle_acquire(document_id, timeout, atomic=False)
44+
return batch_indexer_throttle_acquire(timeout, atomic=False)
3145

3246
# Use add() here :
3347
# - set the flag and returns true if not exist
@@ -36,49 +50,48 @@ def indexer_throttle_acquire(document_id, timeout=0, atomic=True):
3650

3751

3852
@app.task
39-
def document_indexer_task(document_id):
40-
"""Celery Task : Sends indexation query for a document."""
53+
def batch_document_indexer_task(timestamp):
54+
"""Celery Task : Sends indexation query for a batch of documents."""
4155
indexer = get_document_indexer()
4256

43-
if indexer is None:
44-
return
45-
46-
try:
47-
doc = models.Document.objects.get(pk=document_id)
48-
except models.Document.DoesNotExist:
49-
# Skip the task if the document does not exist.
50-
return
51-
52-
accesses = get_batch_accesses_by_users_and_teams((doc.path,))
53-
54-
data = indexer.serialize_document(document=doc, accesses=accesses)
57+
if indexer:
58+
queryset = models.Document.objects.filter(
59+
Q(updated_at__gte=timestamp)
60+
| Q(deleted_at__gte=timestamp)
61+
| Q(ancestors_deleted_at__gte=timestamp)
62+
)
5563

56-
logger.info("Start document %s indexation", document_id)
57-
indexer.push(data)
64+
count = indexer.index(queryset)
65+
logger.info("Indexed %d documents", count)
5866

5967

60-
def trigger_document_indexer(document):
68+
def trigger_batch_document_indexer(item):
6169
"""
6270
Trigger indexation task with debounce a delay set by the SEARCH_INDEXER_COUNTDOWN setting.
6371
6472
Args:
6573
document (Document): The document instance.
6674
"""
67-
countdown = settings.SEARCH_INDEXER_COUNTDOWN
75+
countdown = int(settings.SEARCH_INDEXER_COUNTDOWN)
6876

6977
# DO NOT create a task if indexation if disabled
7078
if not settings.SEARCH_INDEXER_CLASS:
7179
return
7280

73-
# Each time this method is called during a countdown, we increment the
74-
# counter and each task decrease it, so the index be run only once.
75-
if indexer_throttle_acquire(document.pk, timeout=countdown):
76-
logger.info(
77-
"Add task for document %s indexation in %.2f seconds",
78-
document.pk,
79-
countdown,
80-
)
81-
82-
document_indexer_task.apply_async(args=[document.pk])
81+
if countdown > 0:
82+
# Each time this method is called during a countdown, we increment the
83+
# counter and each task decrease it, so the index be run only once.
84+
if batch_indexer_throttle_acquire(timeout=countdown):
85+
logger.info(
86+
"Add task for batch document indexation from updated_at=%s in %d seconds",
87+
item.updated_at.isoformat(),
88+
countdown,
89+
)
90+
91+
batch_document_indexer_task.apply_async(
92+
args=[item.updated_at], countdown=countdown
93+
)
94+
else:
95+
logger.info("Skip task for batch document %s indexation", item.pk)
8396
else:
84-
logger.info("Skip task for document %s indexation", document.pk)
97+
document_indexer_task.apply(args=[item.pk])

src/backend/core/tests/conftest.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ def indexer_settings_fixture(settings):
4545
settings.SEARCH_INDEXER_QUERY_URL = (
4646
"http://localhost:8081/api/v1.0/documents/search/"
4747
)
48+
settings.SEARCH_INDEXER_COUNTDOWN = 1
4849

4950
yield settings
5051

0 commit comments

Comments
 (0)