Skip to content

Commit 73ea54d

Browse files
committed
✨(backend) throttle indexation tasks instead of debounce (simplier)
Replace indexer_debounce_lock|release functions by indexer_throttle_acquire() Instead of mutex-like mechanism, simply set a flag in cache for an amount of time that block any other task creation. Signed-off-by: Fabre Florian <ffabre@hybird.org>
1 parent c5ae5ac commit 73ea54d

File tree

3 files changed

+105
-62
lines changed

3 files changed

+105
-62
lines changed

src/backend/core/tasks/search.py

Lines changed: 38 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -5,56 +5,50 @@
55
from django.conf import settings
66
from django.core.cache import cache
77

8-
from impress.celery_app import app
8+
from django_redis.cache import RedisCache
99

10-
logger = getLogger(__file__)
10+
from core import models
11+
from core.services.search_indexers import (
12+
get_batch_accesses_by_users_and_teams,
13+
get_document_indexer,
14+
)
1115

16+
from impress.celery_app import app
1217

13-
def indexer_debounce_lock(document_id):
14-
"""Increase or reset counter"""
15-
key = f"doc-indexer-debounce-{document_id}"
18+
logger = getLogger(__file__)
1619

17-
try:
18-
return cache.incr(key)
19-
except ValueError:
20-
cache.set(key, 1)
21-
return 1
2220

21+
def indexer_throttle_acquire(document_id, timeout=0, atomic=True):
22+
"""
23+
Enable the task throttle flag for a delay.
24+
Uses redis locks if available to ensure atomic changes
25+
"""
26+
key = f"doc-indexer-throttle-{document_id}"
2327

24-
def indexer_debounce_release(document_id):
25-
"""Decrease or reset counter"""
26-
key = f"doc-indexer-debounce-{document_id}"
28+
if isinstance(cache, RedisCache) and atomic:
29+
with cache.locks(key):
30+
return indexer_throttle_acquire(document_id, timeout, atomic=False)
2731

28-
try:
29-
return cache.decr(key)
30-
except ValueError:
31-
cache.set(key, 0)
32-
return 0
32+
# Use add() here :
33+
# - set the flag and returns true if not exist
34+
# - do nothing and return false if exist
35+
return cache.add(key, 1, timeout=timeout)
3336

3437

3538
@app.task
3639
def document_indexer_task(document_id):
3740
"""Celery Task : Sends indexation query for a document."""
38-
# Prevents some circular imports
39-
# pylint: disable=import-outside-toplevel
40-
from core import models # noqa : PLC0415
41-
from core.services.search_indexers import ( # noqa : PLC0415
42-
get_batch_accesses_by_users_and_teams,
43-
get_document_indexer,
44-
)
45-
46-
# check if the counter : if still up, skip the task. only the last one
47-
# within the countdown delay will do the query.
48-
if indexer_debounce_release(document_id) > 0:
49-
logger.info("Skip document %s indexation", document_id)
50-
return
51-
5241
indexer = get_document_indexer()
5342

5443
if indexer is None:
5544
return
5645

57-
doc = models.Document.objects.get(pk=document_id)
46+
try:
47+
doc = models.Document.objects.get(pk=document_id)
48+
except models.Document.DoesNotExist:
49+
# Skip the task if the document does not exist.
50+
return
51+
5852
accesses = get_batch_accesses_by_users_and_teams((doc.path,))
5953

6054
data = indexer.serialize_document(document=doc, accesses=accesses)
@@ -76,14 +70,15 @@ def trigger_document_indexer(document):
7670
if not settings.SEARCH_INDEXER_CLASS:
7771
return
7872

79-
logger.info(
80-
"Add task for document %s indexation in %.2f seconds",
81-
document.pk,
82-
countdown,
83-
)
84-
85-
# Each time this method is called during the countdown, we increment the
73+
# Each time this method is called during a countdown, we increment the
8674
# counter and each task decrease it, so the index be run only once.
87-
indexer_debounce_lock(document.pk)
88-
89-
document_indexer_task.apply_async(args=[document.pk], countdown=countdown)
75+
if indexer_throttle_acquire(document.pk, timeout=countdown):
76+
logger.info(
77+
"Add task for document %s indexation in %.2f seconds",
78+
document.pk,
79+
countdown,
80+
)
81+
82+
document_indexer_task.apply_async(args=[document.pk])
83+
else:
84+
logger.info("Skip task for document %s indexation", document.pk)

src/backend/core/tests/documents/test_api_documents_search.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ def test_api_documents_search_endpoint_is_none(indexer_settings):
9393
"path": document.path,
9494
"title": document.title,
9595
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
96+
"deleted_at": None,
9697
"user_role": access.role,
9798
}
9899

@@ -184,6 +185,7 @@ def test_api_documents_search_format(indexer_settings):
184185
"path": document.path,
185186
"title": document.title,
186187
"updated_at": document.updated_at.isoformat().replace("+00:00", "Z"),
188+
"deleted_at": None,
187189
"user_role": access.role,
188190
}
189191

src/backend/core/tests/test_models_documents.py

Lines changed: 65 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222

2323
from core import factories, models
2424
from core.services.search_indexers import SearchIndexer
25+
from core.tasks.search import document_indexer_task
2526

2627
pytestmark = pytest.mark.django_db
2728

@@ -1473,10 +1474,10 @@ def test_models_documents_post_save_indexer(mock_push, indexer_settings):
14731474
key=itemgetter("id"),
14741475
)
14751476

1476-
# The debounce counters should be reset
1477-
assert cache.get(f"doc-indexer-debounce-{doc1.pk}") == 0
1478-
assert cache.get(f"doc-indexer-debounce-{doc2.pk}") == 0
1479-
assert cache.get(f"doc-indexer-debounce-{doc3.pk}") == 0
1477+
# The throttle counters should be reset
1478+
assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None
1479+
assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None
1480+
assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None
14801481

14811482

14821483
@mock.patch.object(SearchIndexer, "push")
@@ -1486,10 +1487,31 @@ def test_models_documents_post_save_indexer_not_configured(mock_push, indexer_se
14861487
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
14871488
indexer_settings.SEARCH_INDEXER_CLASS = None
14881489

1490+
user = factories.UserFactory()
1491+
1492+
with transaction.atomic():
1493+
doc = factories.DocumentFactory()
1494+
factories.UserDocumentAccessFactory(document=doc, user=user)
1495+
1496+
assert mock_push.assert_not_called
1497+
1498+
1499+
@mock.patch.object(SearchIndexer, "push")
1500+
@pytest.mark.django_db(transaction=True)
1501+
def test_models_documents_post_save_indexer_wrongly_configured(
1502+
mock_push, indexer_settings
1503+
):
1504+
"""Task should not start an indexation when disabled"""
1505+
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
1506+
indexer_settings.SEARCH_INDEXER_URL = None
1507+
1508+
user = factories.UserFactory()
1509+
14891510
with transaction.atomic():
1490-
factories.DocumentFactory()
1511+
doc = factories.DocumentFactory()
1512+
factories.UserDocumentAccessFactory(document=doc, user=user)
14911513

1492-
assert mock_push.call_args_list == []
1514+
assert mock_push.assert_not_called
14931515

14941516

14951517
@mock.patch.object(SearchIndexer, "push")
@@ -1526,10 +1548,10 @@ def test_models_documents_post_save_indexer_with_accesses(mock_push, indexer_set
15261548
key=itemgetter("id"),
15271549
)
15281550

1529-
# The debounce counters should be reset
1530-
assert cache.get(f"doc-indexer-debounce-{doc1.pk}") == 0
1531-
assert cache.get(f"doc-indexer-debounce-{doc2.pk}") == 0
1532-
assert cache.get(f"doc-indexer-debounce-{doc3.pk}") == 0
1551+
# The throttle counters should be reset
1552+
assert cache.get(f"doc-indexer-throttle-{doc1.pk}") is None
1553+
assert cache.get(f"doc-indexer-throttle-{doc2.pk}") is None
1554+
assert cache.get(f"doc-indexer-throttle-{doc3.pk}") is None
15331555

15341556

15351557
@mock.patch.object(SearchIndexer, "push")
@@ -1588,10 +1610,34 @@ def test_models_documents_post_save_indexer_deleted(mock_push, indexer_settings)
15881610
key=itemgetter("id"),
15891611
)
15901612

1591-
# The debounce counters should be reset
1592-
assert cache.get(f"doc-indexer-debounce-{doc.pk}") == 0
1593-
assert cache.get(f"doc-indexer-debounce-{doc_deleted.pk}") == 0
1594-
assert cache.get(f"doc-indexer-debounce-{doc_ancestor_deleted.pk}") == 0
1613+
# The throttle counters should be reset
1614+
assert cache.get(f"doc-indexer-throttle-{doc.pk}") is None
1615+
assert cache.get(f"doc-indexer-throttle-{doc_deleted.pk}") is None
1616+
assert cache.get(f"doc-indexer-throttle-{doc_ancestor_deleted.pk}") is None
1617+
1618+
1619+
@pytest.mark.django_db(transaction=True)
1620+
def test_models_documents_indexer_hard_deleted(indexer_settings):
1621+
"""Indexation task on hard deleted document"""
1622+
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
1623+
1624+
user = factories.UserFactory()
1625+
1626+
with transaction.atomic():
1627+
doc = factories.DocumentFactory(
1628+
link_reach=models.LinkReachChoices.AUTHENTICATED
1629+
)
1630+
factories.UserDocumentAccessFactory(document=doc, user=user)
1631+
1632+
doc_id = doc.pk
1633+
doc.delete()
1634+
1635+
# Call task on deleted document.
1636+
document_indexer_task.apply(args=[doc_id])
1637+
1638+
with mock.patch.object(SearchIndexer, "push") as mock_push:
1639+
# Hard delete document are not re-indexed.
1640+
assert mock_push.assert_not_called
15951641

15961642

15971643
@mock.patch.object(SearchIndexer, "push")
@@ -1664,7 +1710,7 @@ def test_models_documents_post_save_indexer_restored(mock_push, indexer_settings
16641710

16651711

16661712
@pytest.mark.django_db(transaction=True)
1667-
def test_models_documents_post_save_indexer_debounce(indexer_settings):
1713+
def test_models_documents_post_save_indexer_throttle(indexer_settings):
16681714
"""Test indexation task skipping on document update"""
16691715
indexer_settings.SEARCH_INDEXER_COUNTDOWN = 0
16701716

@@ -1681,19 +1727,19 @@ def test_models_documents_post_save_indexer_debounce(indexer_settings):
16811727
}
16821728

16831729
with mock.patch.object(SearchIndexer, "push") as mock_push:
1684-
# Simulate 1 waiting task
1685-
cache.set(f"doc-indexer-debounce-{doc.pk}", 1)
1730+
# Simulate 1 running task
1731+
cache.set(f"doc-indexer-throttle-{doc.pk}", 1)
16861732

16871733
# save doc to trigger the indexer, but nothing should be done since
1688-
# the counter is over 0
1734+
# the flag is up
16891735
with transaction.atomic():
16901736
doc.save()
16911737

16921738
assert [call.args[0] for call in mock_push.call_args_list] == []
16931739

16941740
with mock.patch.object(SearchIndexer, "push") as mock_push:
16951741
# No waiting task
1696-
cache.set(f"doc-indexer-debounce-{doc.pk}", 0)
1742+
cache.delete(f"doc-indexer-throttle-{doc.pk}")
16971743

16981744
with transaction.atomic():
16991745
doc = models.Document.objects.get(pk=doc.pk)

0 commit comments

Comments
 (0)