44
55from django .conf import settings
66from django .core .cache import cache
7+ from django .db .models import Q
78
89from django_redis .cache import RedisCache
910
1011from core import models
1112from core .services .search_indexers import (
12- get_batch_accesses_by_users_and_teams ,
1313 get_document_indexer ,
1414)
1515
1818logger = getLogger (__file__ )
1919
2020
21- def indexer_throttle_acquire (document_id , timeout = 0 , atomic = True ):
21+ @app .task
22+ def document_indexer_task (document_id ):
23+ """Celery Task : Sends indexation query for a document."""
24+ indexer = get_document_indexer ()
25+
26+ if indexer is None :
27+ return
28+
29+ logger .info ("Start document %s indexation" , document_id )
30+ indexer .index (models .Document .objects .filter (pk = document_id ))
31+
32+
33+ def batch_indexer_throttle_acquire (timeout : int = 0 , atomic : bool = True ):
2234 """
2335 Enable the task throttle flag for a delay.
2436 Uses redis locks if available to ensure atomic changes
2537 """
26- key = f"doc- indexer-throttle- { document_id } "
38+ key = "document-batch- indexer-throttle"
2739
40+ # Redis is used as cache database (not in tests). Use the lock feature here
41+ # to ensure atomicity of changes to the throttle flag.
2842 if isinstance (cache , RedisCache ) and atomic :
2943 with cache .locks (key ):
30- return indexer_throttle_acquire ( document_id , timeout , atomic = False )
44+ return batch_indexer_throttle_acquire ( timeout , atomic = False )
3145
3246 # Use add() here :
3347 # - set the flag and returns true if not exist
@@ -36,49 +50,48 @@ def indexer_throttle_acquire(document_id, timeout=0, atomic=True):
3650
3751
3852@app .task
39- def document_indexer_task ( document_id ):
40- """Celery Task : Sends indexation query for a document ."""
53+ def batch_document_indexer_task ( timestamp ):
54+ """Celery Task : Sends indexation query for a batch of documents ."""
4155 indexer = get_document_indexer ()
4256
43- if indexer is None :
44- return
45-
46- try :
47- doc = models .Document .objects .get (pk = document_id )
48- except models .Document .DoesNotExist :
49- # Skip the task if the document does not exist.
50- return
51-
52- accesses = get_batch_accesses_by_users_and_teams ((doc .path ,))
53-
54- data = indexer .serialize_document (document = doc , accesses = accesses )
57+ if indexer :
58+ queryset = models .Document .objects .filter (
59+ Q (updated_at__gte = timestamp )
60+ | Q (deleted_at__gte = timestamp )
61+ | Q (ancestors_deleted_at__gte = timestamp )
62+ )
5563
56- logger . info ( "Start document %s indexation" , document_id )
57- indexer . push ( data )
64+ count = indexer . index ( queryset )
65+ logger . info ( "Indexed %d documents" , count )
5866
5967
60- def trigger_document_indexer ( document ):
68+ def trigger_batch_document_indexer ( item ):
6169 """
6270 Trigger indexation task with debounce a delay set by the SEARCH_INDEXER_COUNTDOWN setting.
6371
6472 Args:
6573 document (Document): The document instance.
6674 """
67- countdown = settings .SEARCH_INDEXER_COUNTDOWN
75+ countdown = int ( settings .SEARCH_INDEXER_COUNTDOWN )
6876
6977 # DO NOT create a task if indexation if disabled
7078 if not settings .SEARCH_INDEXER_CLASS :
7179 return
7280
73- # Each time this method is called during a countdown, we increment the
74- # counter and each task decrease it, so the index be run only once.
75- if indexer_throttle_acquire (document .pk , timeout = countdown ):
76- logger .info (
77- "Add task for document %s indexation in %.2f seconds" ,
78- document .pk ,
79- countdown ,
80- )
81-
82- document_indexer_task .apply_async (args = [document .pk ])
81+ if countdown > 0 :
82+ # Each time this method is called during a countdown, we increment the
83+ # counter and each task decrease it, so the index be run only once.
84+ if batch_indexer_throttle_acquire (timeout = countdown ):
85+ logger .info (
86+ "Add task for batch document indexation from updated_at=%s in %d seconds" ,
87+ item .updated_at .isoformat (),
88+ countdown ,
89+ )
90+
91+ batch_document_indexer_task .apply_async (
92+ args = [item .updated_at ], countdown = countdown
93+ )
94+ else :
95+ logger .info ("Skip task for batch document %s indexation" , item .pk )
8396 else :
84- logger . info ( "Skip task for document %s indexation" , document .pk )
97+ document_indexer_task . apply ( args = [ item .pk ] )
0 commit comments