Skip to content

Commit 91b2cdd

Browse files
authored
Merge pull request #59 from plainbanana/rethink-pipeline
Implement custom libevent adapter to fix timeout issues with event prioritization
2 parents 9dffdae + 6e3568c commit 91b2cdd

File tree

6 files changed

+255
-49
lines changed

6 files changed

+255
-49
lines changed

META.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
"plainbanana <plainbanana@mustardon.tokyo>"
55
],
66
"dynamic_config" : 0,
7-
"generated_by" : "Minilla/v3.1.25, CPAN::Meta::Converter version 2.150010",
7+
"generated_by" : "Minilla/v3.1.26, CPAN::Meta::Converter version 2.150010",
88
"license" : [
99
"perl_5"
1010
],

README.md

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -167,31 +167,19 @@ do not execute fork() without issuing `disconnect` if all callbacks are not exec
167167

168168
## run\_event\_loop()
169169

170-
This method allows you to issue commands without waiting for their responses.
171-
You can then perform a blocking wait for those responses later, if needed.
170+
This method is nonblocking and allows you to issue commands without waiting for their responses.
172171

173-
Executes one iteration of the event loop to process any pending commands that have not yet been sent
174-
and any incoming responses from Redis.
175-
176-
If there are events that can be triggered immediately, they will all be processed.
177-
In other words, if there are unsent commands, they will be pipelined and sent,
172+
If there are unsent commands, they will be pipelined and sent,
178173
and if there are already-received responses, their corresponding callbacks will be executed.
179-
180-
If there are no events that can be triggered immediately: there are neither unsent commands nor any Redis responses available to read,
181-
but unprocessed callbacks remain, then this method will block for up to `command_timeout` while waiting for a response from Redis.
182174
When a timeout occurs, an error will be propagated to the corresponding callback(s).
183175

184-
The return value can be either 1 for success (e.g., commands sent or responses read),
176+
The return value can be either 1 for success
177+
(e.g., commands sent, responses read, or exit without waiting for any responses),
185178
0 for no callbacks remained, or undef for other errors.
186179

187180
### Notes
188181

189-
- Be aware that the timeout check will only be triggered when there are neither unsent commands nor Redis responses available to read.
190-
If a timeout occurs, all remaining commands on that node will time out as well.
191-
- Internally, this method calls `event_base_loop(..., EVLOOP_ONCE)`, which
192-
performs a single iteration of the event loop. A command will not be fully processed in a single call.
193-
- If you need to process multiple commands or wait for all responses, call
194-
this method repeatedly or use `wait_all_responses`.
182+
- If a timeout occurs, all remaining commands on that node will time out as well.
195183
- For a simpler, synchronous-like usage where you need at least one response,
196184
refer to `wait_one_response`. If you only need to block until all
197185
pending commands are processed, see `wait_all_responses`.
@@ -205,17 +193,17 @@ pending commands are processed, see `wait_all_responses`.
205193
# Send commands to Redis without waiting for responses
206194
$redis->run_event_loop();
207195

208-
# Possibly wait for responses
196+
# If any responses are available, read them immediately without waiting for the rest
209197
$redis->run_event_loop();
210198

211199
## wait\_one\_response()
212200

213-
If there are any unexcuted callbacks, it will block until at least one is executed.
201+
If there are any unexecuted callbacks, it will block until at least one is executed.
214202
The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors.
215203

216204
## wait\_all\_responses()
217205

218-
If there are any unexcuted callbacks, it will block until all of them are executed.
206+
If there are any unexecuted callbacks, it will block until all of them are executed.
219207
The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors.
220208

221209
## disconnect()

lib/Redis/Cluster/Fast.pm

Lines changed: 7 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -301,21 +301,14 @@ do not execute fork() without issuing C<disconnect> if all callbacks are not exe
301301
302302
=head2 run_event_loop()
303303
304-
This method allows you to issue commands without waiting for their responses.
305-
You can then perform a blocking wait for those responses later, if needed.
304+
This method is nonblocking and allows you to issue commands without waiting for their responses.
306305
307-
Executes one iteration of the event loop to process any pending commands that have not yet been sent
308-
and any incoming responses from Redis.
309-
310-
If there are events that can be triggered immediately, they will all be processed.
311-
In other words, if there are unsent commands, they will be pipelined and sent,
306+
If there are unsent commands, they will be pipelined and sent,
312307
and if there are already-received responses, their corresponding callbacks will be executed.
313-
314-
If there are no events that can be triggered immediately: there are neither unsent commands nor any Redis responses available to read,
315-
but unprocessed callbacks remain, then this method will block for up to C<command_timeout> while waiting for a response from Redis.
316308
When a timeout occurs, an error will be propagated to the corresponding callback(s).
317309
318-
The return value can be either 1 for success (e.g., commands sent or responses read),
310+
The return value can be either 1 for success
311+
(e.g., commands sent, responses read, or exit without waiting for any responses),
319312
0 for no callbacks remained, or undef for other errors.
320313
321314
=head3 Notes
@@ -324,21 +317,10 @@ The return value can be either 1 for success (e.g., commands sent or responses r
324317
325318
=item *
326319
327-
Be aware that the timeout check will only be triggered when there are neither unsent commands nor Redis responses available to read.
328320
If a timeout occurs, all remaining commands on that node will time out as well.
329321
330322
=item *
331323
332-
Internally, this method calls C<event_base_loop(..., EVLOOP_ONCE)>, which
333-
performs a single iteration of the event loop. A command will not be fully processed in a single call.
334-
335-
=item *
336-
337-
If you need to process multiple commands or wait for all responses, call
338-
this method repeatedly or use C<wait_all_responses>.
339-
340-
=item *
341-
342324
For a simpler, synchronous-like usage where you need at least one response,
343325
refer to C<wait_one_response>. If you only need to block until all
344326
pending commands are processed, see C<wait_all_responses>.
@@ -354,17 +336,17 @@ pending commands are processed, see C<wait_all_responses>.
354336
# Send commands to Redis without waiting for responses
355337
$redis->run_event_loop();
356338
357-
# Possibly wait for responses
339+
# If any responses are available, read them immediately without waiting for the rest
358340
$redis->run_event_loop();
359341
360342
=head2 wait_one_response()
361343
362-
If there are any unexcuted callbacks, it will block until at least one is executed.
344+
If there are any unexecuted callbacks, it will block until at least one is executed.
363345
The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors.
364346
365347
=head2 wait_all_responses()
366348
367-
If there are any unexcuted callbacks, it will block until all of them are executed.
349+
If there are any unexecuted callbacks, it will block until all of them are executed.
368350
The return value can be either 1 for success, 0 for no callbacks remained, or undef for other errors.
369351
370352
=head2 disconnect()

src/Fast.xs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ extern "C" {
1111
#include <unistd.h>
1212
#include <string.h>
1313
#include <time.h>
14-
#include "hiredis_cluster/adapters/libevent.h"
14+
#include "adapters/libevent.h"
1515
#include "hiredis_cluster/hircluster.h"
1616

1717
#ifdef __cplusplus
@@ -27,6 +27,13 @@ extern "C" {
2727

2828
#define MIN_ATTEMPT_TO_GET_RESULT 2
2929

30+
/* libevent adapter priority configuration
31+
Uses 2 priority levels to ensure I/O events are processed before timeouts:
32+
- Priority 0: I/O events (Redis responses) - highest priority
33+
- Priority 1: Timer events (timeouts) - lower priority
34+
EVENT_BASE_PRIORITY_NUMBER sets the total priorities for event_base_priority_init() */
35+
#define EVENT_BASE_PRIORITY_NUMBER 2
36+
3037
#define DEBUG_MSG(fmt, ...) \
3138
if (self->debug) { \
3239
fprintf(stderr, "[%d][%d][%s:%d:%s]: ", getpid(), getppid(), __FILE__, __LINE__, __func__); \
@@ -272,6 +279,11 @@ SV *Redis__Cluster__Fast_connect(pTHX_ Redis__Cluster__Fast self) {
272279
}
273280

274281
self->cluster_event_base = event_base_new();
282+
283+
if (event_base_priority_init(self->cluster_event_base, EVENT_BASE_PRIORITY_NUMBER) != 0) {
284+
return newSVpvf("%s", "failed to initialize event base priorities");
285+
}
286+
275287
if (redisClusterLibeventAttach(self->acc, self->cluster_event_base) != REDIS_OK) {
276288
return newSVpvf("%s", "failed to attach event base");
277289
}
@@ -471,7 +483,7 @@ int Redis__Cluster__Fast_run_event_loop(pTHX_ Redis__Cluster__Fast self) {
471483
return 0;
472484
}
473485
DEBUG_EVENT_BASE();
474-
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_ONCE);
486+
event_loop_error = event_base_loop(self->cluster_event_base, EVLOOP_NONBLOCK);
475487
if (event_loop_error != 0) {
476488
return -1;
477489
}

0 commit comments

Comments
 (0)