Skip to content

Commit 78b9cb6

Browse files
committed
wip: groups 1:1
Signed-off-by: Kairo de Araujo <kairo@dearaujo.nl>
1 parent ca394b6 commit 78b9cb6

File tree

1 file changed

+39
-31
lines changed

1 file changed

+39
-31
lines changed

app.py

+39-31
Original file line numberDiff line numberDiff line change
@@ -168,40 +168,48 @@ def bump_online_roles(expired: bool = False) -> None:
168168
c = chain(_end_chain_callback.s(None, start_time))()
169169
return
170170

171+
# if the LOCK_BEAT is already locked, it means that another task is running
172+
# and we should skip this one
173+
if repository._redis.lock("LOCK_BEAT").locked():
174+
logging.info("LOCK_BEAT is already locked. Skipping bump_online_roles")
175+
c = chain(_end_chain_callback.s(None, start_time))()
176+
return
177+
171178
status_lock_targets = False
172179
# Lock to avoid race conditions. See `LOCK_TIMEOUT` in the Worker
173180
# development guide documentation.
174-
try:
175-
with repository._redis.lock("LOCK_TARGETS", repository._timeout):
176-
chunks_size = 500
177-
roles = repository.get_delegated_rolenames(expired=expired)
178-
group_update_roles = _update_online_role.chunks(zip(roles), chunks_size).group()
179-
# c = chain(
180-
# group(_update_online_role.s(role) for role in roles)(),
181-
# _update_snapshot_timestamp.s(),
182-
# _end_chain_callback.s(start_time),
183-
# )(queue="rstuf_internals")
184-
c = chain(
185-
group_update_roles,
186-
_update_snapshot_timestamp.s(),
187-
_end_chain_callback.s(start_time),
188-
)(queue="rstuf_internals")
189-
return c
190-
except redis.exceptions.LockNotOwnedError:
191-
# The LockNotOwnedError happens when the task exceeds the timeout,
192-
# and another task owns the lock.
193-
# If the task time out, the lock is released. If it doesn't finish
194-
# properly, it will raise (fail) the task. Otherwise, the ignores
195-
# the error because another task didn't lock it.
196-
if status_lock_targets is False:
197-
logging.error(
198-
"The task to bump all online roles exceeded the timeout "
199-
f"of {repository._timeout} seconds."
200-
)
201-
raise redis.exceptions.LockError(
202-
f"RSTUF: Task exceed `LOCK_TIMEOUT` ({repository._timeout} "
203-
"seconds)"
204-
)
181+
with repository._redis.lock("LOCK_BEAT"):
182+
try:
183+
with repository._redis.lock("LOCK_TARGETS", repository._timeout):
184+
# chunks_size = 500
185+
roles = repository.get_delegated_rolenames(expired=expired)
186+
# group_update_roles = _update_online_role.chunks(zip(roles), chunks_size).group()
187+
c = chain(
188+
group(_update_online_role.s(role) for role in roles)(),
189+
_update_snapshot_timestamp.s(),
190+
_end_chain_callback.s(start_time),
191+
)(queue="rstuf_internals")
192+
# c = chain(
193+
# group_update_roles,
194+
# _update_snapshot_timestamp.s(),
195+
# _end_chain_callback.s(start_time),
196+
# )(queue="rstuf_internals")
197+
return c
198+
except redis.exceptions.LockNotOwnedError:
199+
# The LockNotOwnedError happens when the task exceeds the timeout,
200+
# and another task owns the lock.
201+
# If the task time out, the lock is released. If it doesn't finish
202+
# properly, it will raise (fail) the task. Otherwise, the ignores
203+
# the error because another task didn't lock it.
204+
if status_lock_targets is False:
205+
logging.error(
206+
"The task to bump all online roles exceeded the timeout "
207+
f"of {repository._timeout} seconds."
208+
)
209+
raise redis.exceptions.LockError(
210+
f"RSTUF: Task exceed `LOCK_TIMEOUT` ({repository._timeout} "
211+
"seconds)"
212+
)
205213

206214

207215
def _publish_signals(

0 commit comments

Comments
 (0)