Skip to content

Conversation

@plainbanana
Copy link
Owner

@plainbanana plainbanana commented Jun 13, 2025

Summary

This PR addresses timeout-related issues in Redis::Cluster::Fast by implementing a custom libevent adapter with event
prioritization.

Key Changes

  • Custom libevent adapter: Added src/adapters/libevent.h to handle timeout issues specific to our use case
  • Non-blocking event loop: Made run_event_loop truly non-blocking with EVLOOP_NONBLOCK
  • Enhanced error handling: Added proper error checks for event_base_priority_init
  • Documentation improvements: Fixed typos and clarified method descriptions
  • Code quality: Added explanatory comments and improved test clarity

Problem Background

When using hiredis-cluster's async context, we encountered situations where timeout events would fire even when Redis
responses were already available to read. This particularly affected scenarios where event loop execution was delayed due
to system load.

Approach Taken

After researching the issue and consulting with the hiredis community hiredis issue
#1286
, we learned that:

  1. libevent's event processing order: The library processes timeouts before I/O events as part of its design
  2. Upstream perspective: The hiredis maintainers appropriately noted that adding event prioritization to the
    general-purpose adapter could introduce starvation risks in multi-client scenarios
  3. Valid concern: In shared event base environments, high-traffic clients could potentially starve failing clients
    from receiving necessary timeouts

Our Solution

Given Redis::Cluster::Fast's specific usage patterns and requirements, we decided to maintain our own libevent adapter:

Why this works for our use case

  • Isolated event base: Redis::Cluster::Fast typically manages dedicated event bases
  • Controlled environment: Our specific client patterns reduce starvation risks
  • User benefit: Processing available responses before timeouts significantly improves user experience

Implementation Details

Our custom adapter (src/adapters/libevent.h) implements:

  • Separate event management: Independent handling of I/O and timer events
  • Event prioritization: I/O events (priority 0) before timer events (priority 1)
  • Deterministic behavior: Ensures available responses are processed before timeout evaluation
/* libevent adapter priority configuration
   Uses 2 priority levels to ensure I/O events are processed before timeouts:
   - Priority 0: I/O events (Redis responses) - highest priority
   - Priority 1: Timer events (timeouts) - lower priority
   EVENT_BASE_PRIORITY_NUMBER sets the total priorities for event_base_priority_init() */
  #define EVENT_BASE_PRIORITY_NUMBER 2

Test Plan

  • All existing tests pass
  • Memory leak tests (xt/02_leak.t, xt/08_leak_srandom.t)
  • Timeout handling tests (xt/04_timeout.t, xt/10_timeout_srandom.t)
  • Valgrind tests (xt/05_valgrind.t, xt/09_valgrind_srandom.t)
  • Fork safety tests (xt/03_fork.t, xt/11_fork_srandom.t)
  • Pipeline functionality verification

This change improves reliability for Redis::Cluster::Fast users while respecting the design decisions of the upstream
hiredis project.

…c operations. It introduces ev_io and ev_timer to handle timeout as intended.

After issuing run_event_loop and after the command_timeout has passed, when executing the event loop again to read the response, there is a problem that always results in a timeout. Now, fixed it and when there are any readable responses, it reads them.
Refactored the `run_event_loop` method to exit immediately when no events are available, ensuring it operates in non-blocking mode.
Removed the EVLOOP_ONCE flag from the event loop call as it is unnecessary when used with EVLOOP_NONBLOCK. This change simplifies the code and ensures clearer handling of the event loop behavior.
Replaced a duplicate `run_event_loop` call with `wait_one_response` for better clarity and correctness. This ensures the desired response is awaited effectively within the test.
Streamlined documentation for `run_event_loop` by removing redundant details about event processing and internal mechanics. Focused on clarifying return values, command handling, and timeout behavior for better readability.
Check the return value of event_base_priority_init and return an error
if the initialization fails. This improves error handling during the
connection setup phase.
Added detailed comments to explain the priority system for libevent:
- Priority 0 (highest): I/O events for immediate Redis response processing
- Priority 1 (lower): Timer events for timeout handling after I/O processing

This clarifies why I/O events are prioritized over timer events in the
event processing loop.
Corrected the spelling of "unexcuted" to "unexecuted" in method documentation. This improves clarity and accuracy in the code comments.
Replaced single-line comments with multi-line block comments for event priority explanations.
Moved variable initialization before unused parameter casting for consistency and clarity.
Improved the explanation of libevent priority levels in the comments. Clarified the purpose of priorities 0 and 1 and added context on total priority configuration with `EVENT_BASE_PRIORITY_NUMBER`.
@@ -0,0 +1,224 @@
/*
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEMO]

I consolidated libevent adapters from hiredis, hiredis-cluster, and added some diff.

$ diff -u deps/hiredis/adapters/libevent.h src/adapters/libevent.h
--- deps/hiredis/adapters/libevent.h    2025-02-11 04:22:52.000000000 +0900
+++ src/adapters/libevent.h     2025-09-24 01:29:25.710571704 +0900
@@ -31,17 +31,17 @@
 #ifndef __HIREDIS_LIBEVENT_H__
 #define __HIREDIS_LIBEVENT_H__
 #include <event2/event.h>
-#include "../hiredis.h"
-#include "../async.h"
+#include "hiredis/hiredis.h"
+#include "hiredis/async.h"

 #define REDIS_LIBEVENT_DELETED 0x01
 #define REDIS_LIBEVENT_ENTERED 0x02

 typedef struct redisLibeventEvents {
     redisAsyncContext *context;
-    struct event *ev;
+    struct event *ev_io;
+    struct event *ev_timer;
     struct event_base *base;
-    struct timeval tv;
     short flags;
     short state;
 } redisLibeventEvents;
@@ -51,8 +51,8 @@
 }

 static void redisLibeventHandler(evutil_socket_t fd, short event, void *arg) {
-    ((void)fd);
     redisLibeventEvents *e = (redisLibeventEvents*)arg;
+    ((void)fd);
     e->state |= REDIS_LIBEVENT_ENTERED;

     #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\
@@ -60,11 +60,6 @@
         return; \
     }

-    if ((event & EV_TIMEOUT) && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
-        redisAsyncHandleTimeout(e->context);
-        CHECK_DELETED();
-    }
-
     if ((event & EV_READ) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
         redisAsyncHandleRead(e->context);
         CHECK_DELETED();
@@ -79,9 +74,28 @@
     #undef CHECK_DELETED
 }

+static void redisLibeventTimerHandler(evutil_socket_t fd, short event, void *arg) {
+    redisLibeventEvents *e = (redisLibeventEvents*)arg;
+    ((void)fd);
+    ((void)event);
+    e->state |= REDIS_LIBEVENT_ENTERED;
+
+    #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\
+        redisLibeventDestroy(e);\
+        return; \
+    }
+
+    if (e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
+        redisAsyncHandleTimeout(e->context);
+        CHECK_DELETED();
+    }
+
+    e->state &= ~REDIS_LIBEVENT_ENTERED;
+    #undef CHECK_DELETED
+}
+
 static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {
     redisLibeventEvents *e = (redisLibeventEvents *)privdata;
-    const struct timeval *tv = e->tv.tv_sec || e->tv.tv_usec ? &e->tv : NULL;

     if (isRemove) {
         if ((e->flags & flag) == 0) {
@@ -97,10 +111,11 @@
         }
     }

-    event_del(e->ev);
-    event_assign(e->ev, e->base, e->context->c.fd, e->flags | EV_PERSIST,
+    event_del(e->ev_io);
+    event_assign(e->ev_io, e->base, e->context->c.fd, e->flags | EV_PERSIST,
                  redisLibeventHandler, privdata);
-    event_add(e->ev, tv);
+    event_priority_set(e->ev_io, 0);
+    event_add(e->ev_io, NULL);
 }

 static void redisLibeventAddRead(void *privdata) {
@@ -124,9 +139,16 @@
     if (!e) {
         return;
     }
-    event_del(e->ev);
-    event_free(e->ev);
-    e->ev = NULL;
+    if (e->ev_io) {
+        event_del(e->ev_io);
+        event_free(e->ev_io);
+        e->ev_io = NULL;
+    }
+    if (e->ev_timer) {
+        evtimer_del(e->ev_timer);
+        event_free(e->ev_timer);
+        e->ev_timer = NULL;
+    }

     if (e->state & REDIS_LIBEVENT_ENTERED) {
         e->state |= REDIS_LIBEVENT_DELETED;
@@ -137,10 +159,8 @@

 static void redisLibeventSetTimeout(void *privdata, struct timeval tv) {
     redisLibeventEvents *e = (redisLibeventEvents *)privdata;
-    short flags = e->flags;
-    e->flags = 0;
-    e->tv = tv;
-    redisLibeventUpdate(e, flags, 0);
+    evtimer_del(e->ev_timer);
+    evtimer_add(e->ev_timer, &tv);
 }

 static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
@@ -168,8 +188,37 @@
     ac->ev.data = e;

     /* Initialize and install read/write events */
-    e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);
+    e->ev_io = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);
+    event_priority_set(e->ev_io, 0);
+
+    /* Initialize and install timer events */
+    e->ev_timer = evtimer_new(base, redisLibeventTimerHandler, e);
+    event_priority_set(e->ev_timer, 1);
+
     e->base = base;
     return REDIS_OK;
 }
 #endif
+
+#ifndef __HIREDIS_CLUSTER_LIBEVENT_H__
+#define __HIREDIS_CLUSTER_LIBEVENT_H__
+#include "hiredis_cluster/hircluster.h"
+
+static int redisLibeventAttach_link(redisAsyncContext *ac, void *base) {
+    return redisLibeventAttach(ac, (struct event_base *)base);
+}
+
+static int redisClusterLibeventAttach(redisClusterAsyncContext *acc,
+                                      struct event_base *base) {
+
+    if (acc == NULL || base == NULL) {
+        return REDIS_ERR;
+    }
+
+    acc->adapter = base;
+    acc->attach_fn = redisLibeventAttach_link;
+
+    return REDIS_OK;
+}
+
+#endif
$ diff -u deps/hiredis-cluster/adapters/libevent.h src/adapters/libevent.h
--- deps/hiredis-cluster/adapters/libevent.h    2024-09-06 12:36:32.000000000 +0900
+++ src/adapters/libevent.h     2025-09-24 01:29:25.710571704 +0900
@@ -28,11 +28,181 @@
  * POSSIBILITY OF SUCH DAMAGE.
  */

+#ifndef __HIREDIS_LIBEVENT_H__
+#define __HIREDIS_LIBEVENT_H__
+#include <event2/event.h>
+#include "hiredis/hiredis.h"
+#include "hiredis/async.h"
+
+#define REDIS_LIBEVENT_DELETED 0x01
+#define REDIS_LIBEVENT_ENTERED 0x02
+
+typedef struct redisLibeventEvents {
+    redisAsyncContext *context;
+    struct event *ev_io;
+    struct event *ev_timer;
+    struct event_base *base;
+    short flags;
+    short state;
+} redisLibeventEvents;
+
+static void redisLibeventDestroy(redisLibeventEvents *e) {
+    hi_free(e);
+}
+
+static void redisLibeventHandler(evutil_socket_t fd, short event, void *arg) {
+    redisLibeventEvents *e = (redisLibeventEvents*)arg;
+    ((void)fd);
+    e->state |= REDIS_LIBEVENT_ENTERED;
+
+    #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\
+        redisLibeventDestroy(e);\
+        return; \
+    }
+
+    if ((event & EV_READ) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
+        redisAsyncHandleRead(e->context);
+        CHECK_DELETED();
+    }
+
+    if ((event & EV_WRITE) && e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
+        redisAsyncHandleWrite(e->context);
+        CHECK_DELETED();
+    }
+
+    e->state &= ~REDIS_LIBEVENT_ENTERED;
+    #undef CHECK_DELETED
+}
+
+static void redisLibeventTimerHandler(evutil_socket_t fd, short event, void *arg) {
+    redisLibeventEvents *e = (redisLibeventEvents*)arg;
+    ((void)fd);
+    ((void)event);
+    e->state |= REDIS_LIBEVENT_ENTERED;
+
+    #define CHECK_DELETED() if (e->state & REDIS_LIBEVENT_DELETED) {\
+        redisLibeventDestroy(e);\
+        return; \
+    }
+
+    if (e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
+        redisAsyncHandleTimeout(e->context);
+        CHECK_DELETED();
+    }
+
+    e->state &= ~REDIS_LIBEVENT_ENTERED;
+    #undef CHECK_DELETED
+}
+
+static void redisLibeventUpdate(void *privdata, short flag, int isRemove) {
+    redisLibeventEvents *e = (redisLibeventEvents *)privdata;
+
+    if (isRemove) {
+        if ((e->flags & flag) == 0) {
+            return;
+        } else {
+            e->flags &= ~flag;
+        }
+    } else {
+        if (e->flags & flag) {
+            return;
+        } else {
+            e->flags |= flag;
+        }
+    }
+
+    event_del(e->ev_io);
+    event_assign(e->ev_io, e->base, e->context->c.fd, e->flags | EV_PERSIST,
+                 redisLibeventHandler, privdata);
+    event_priority_set(e->ev_io, 0);
+    event_add(e->ev_io, NULL);
+}
+
+static void redisLibeventAddRead(void *privdata) {
+    redisLibeventUpdate(privdata, EV_READ, 0);
+}
+
+static void redisLibeventDelRead(void *privdata) {
+    redisLibeventUpdate(privdata, EV_READ, 1);
+}
+
+static void redisLibeventAddWrite(void *privdata) {
+    redisLibeventUpdate(privdata, EV_WRITE, 0);
+}
+
+static void redisLibeventDelWrite(void *privdata) {
+    redisLibeventUpdate(privdata, EV_WRITE, 1);
+}
+
+static void redisLibeventCleanup(void *privdata) {
+    redisLibeventEvents *e = (redisLibeventEvents*)privdata;
+    if (!e) {
+        return;
+    }
+    if (e->ev_io) {
+        event_del(e->ev_io);
+        event_free(e->ev_io);
+        e->ev_io = NULL;
+    }
+    if (e->ev_timer) {
+        evtimer_del(e->ev_timer);
+        event_free(e->ev_timer);
+        e->ev_timer = NULL;
+    }
+
+    if (e->state & REDIS_LIBEVENT_ENTERED) {
+        e->state |= REDIS_LIBEVENT_DELETED;
+    } else {
+        redisLibeventDestroy(e);
+    }
+}
+
+static void redisLibeventSetTimeout(void *privdata, struct timeval tv) {
+    redisLibeventEvents *e = (redisLibeventEvents *)privdata;
+    evtimer_del(e->ev_timer);
+    evtimer_add(e->ev_timer, &tv);
+}
+
+static int redisLibeventAttach(redisAsyncContext *ac, struct event_base *base) {
+    redisContext *c = &(ac->c);
+    redisLibeventEvents *e;
+
+    /* Nothing should be attached when something is already attached */
+    if (ac->ev.data != NULL)
+        return REDIS_ERR;
+
+    /* Create container for context and r/w events */
+    e = (redisLibeventEvents*)hi_calloc(1, sizeof(*e));
+    if (e == NULL)
+        return REDIS_ERR;
+
+    e->context = ac;
+
+    /* Register functions to start/stop listening for events */
+    ac->ev.addRead = redisLibeventAddRead;
+    ac->ev.delRead = redisLibeventDelRead;
+    ac->ev.addWrite = redisLibeventAddWrite;
+    ac->ev.delWrite = redisLibeventDelWrite;
+    ac->ev.cleanup = redisLibeventCleanup;
+    ac->ev.scheduleTimer = redisLibeventSetTimeout;
+    ac->ev.data = e;
+
+    /* Initialize and install read/write events */
+    e->ev_io = event_new(base, c->fd, EV_READ | EV_WRITE, redisLibeventHandler, e);
+    event_priority_set(e->ev_io, 0);
+
+    /* Initialize and install timer events */
+    e->ev_timer = evtimer_new(base, redisLibeventTimerHandler, e);
+    event_priority_set(e->ev_timer, 1);
+
+    e->base = base;
+    return REDIS_OK;
+}
+#endif
+
 #ifndef __HIREDIS_CLUSTER_LIBEVENT_H__
 #define __HIREDIS_CLUSTER_LIBEVENT_H__
-
-#include "../hircluster.h"
-#include <hiredis/adapters/libevent.h>
+#include "hiredis_cluster/hircluster.h"

 static int redisLibeventAttach_link(redisAsyncContext *ac, void *base) {
     return redisLibeventAttach(ac, (struct event_base *)base);

return; \
}

if (e->context && (e->state & REDIS_LIBEVENT_DELETED) == 0) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEMO]

Previously, the context was not checked. I added the check assuming it was probably an oversight. Note that if context is NULL, calling redisAsyncHandleTimeout would cause a segmentation fault.

});
ok $redis->run_event_loop;
ok $redis->run_event_loop;
ok $redis->wait_one_response;
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[MEMO]

Now, run_event_loop is nonblocking function. In order to wait an incoming response, call wait_one_response.

@plainbanana plainbanana merged commit 91b2cdd into master Sep 25, 2025
8 of 24 checks passed
@plainbanana plainbanana deleted the rethink-pipeline branch September 25, 2025 16:18
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants