Skip to content

Commit 03f4125

Browse files
authored
Adding e2e scenario tests for maintenance push notifications handling. (#3758)
1 parent 4cdf082 commit 03f4125

File tree

11 files changed

+1284
-30
lines changed

11 files changed

+1284
-30
lines changed

.github/workflows/install_and_test.sh

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@ cd ${TESTDIR}
4040
# install, run tests
4141
pip install ${PKG}
4242
# Redis tests
43-
pytest -m 'not onlycluster'
43+
pytest -m 'not onlycluster' --ignore=tests/test_scenario
4444
# RedisCluster tests
4545
CLUSTER_URL="redis://localhost:16379/0"
4646
CLUSTER_SSL_URL="rediss://localhost:27379/0"
4747
pytest -m 'not onlynoncluster and not redismod and not ssl' \
48-
--redis-url="${CLUSTER_URL}" --redis-ssl-url="${CLUSTER_SSL_URL}"
48+
--ignore=tests/test_scenario \
49+
--redis-url="${CLUSTER_URL}" \
50+
--redis-ssl-url="${CLUSTER_SSL_URL}"

redis/_parsers/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ def parse_moving_msg(response):
191191
# Expected message format is: MOVING <seq_number> <time> <endpoint>
192192
id = response[1]
193193
ttl = response[2]
194-
if response[3] in [b"null", "null"]:
194+
if response[3] is None:
195195
host, port = None, None
196196
else:
197197
value = response[3]

redis/connection.py

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1720,7 +1720,7 @@ def __init__(
17201720
self._cache_factory = cache_factory
17211721

17221722
if connection_kwargs.get("cache_config") or connection_kwargs.get("cache"):
1723-
if connection_kwargs.get("protocol") not in [3, "3"]:
1723+
if self.connection_kwargs.get("protocol") not in [3, "3"]:
17241724
raise RedisError("Client caching is only supported with RESP version 3")
17251725

17261726
cache = self.connection_kwargs.get("cache")
@@ -1741,31 +1741,21 @@ def __init__(
17411741
connection_kwargs.pop("cache", None)
17421742
connection_kwargs.pop("cache_config", None)
17431743

1744-
if connection_kwargs.get(
1744+
if self.connection_kwargs.get(
17451745
"maintenance_events_pool_handler"
1746-
) or connection_kwargs.get("maintenance_events_config"):
1747-
if connection_kwargs.get("protocol") not in [3, "3"]:
1746+
) or self.connection_kwargs.get("maintenance_events_config"):
1747+
if self.connection_kwargs.get("protocol") not in [3, "3"]:
17481748
raise RedisError(
17491749
"Push handlers on connection are only supported with RESP version 3"
17501750
)
1751-
config = connection_kwargs.get("maintenance_events_config", None) or (
1752-
connection_kwargs.get("maintenance_events_pool_handler").config
1753-
if connection_kwargs.get("maintenance_events_pool_handler")
1751+
config = self.connection_kwargs.get("maintenance_events_config", None) or (
1752+
self.connection_kwargs.get("maintenance_events_pool_handler").config
1753+
if self.connection_kwargs.get("maintenance_events_pool_handler")
17541754
else None
17551755
)
17561756

17571757
if config and config.enabled:
1758-
connection_kwargs.update(
1759-
{
1760-
"orig_host_address": connection_kwargs.get("host"),
1761-
"orig_socket_timeout": connection_kwargs.get(
1762-
"socket_timeout", None
1763-
),
1764-
"orig_socket_connect_timeout": connection_kwargs.get(
1765-
"socket_connect_timeout", None
1766-
),
1767-
}
1768-
)
1758+
self._update_connection_kwargs_for_maintenance_events()
17691759

17701760
self._event_dispatcher = self.connection_kwargs.get("event_dispatcher", None)
17711761
if self._event_dispatcher is None:
@@ -1821,6 +1811,7 @@ def set_maintenance_events_pool_handler(
18211811
"maintenance_events_config": maintenance_events_pool_handler.config,
18221812
}
18231813
)
1814+
self._update_connection_kwargs_for_maintenance_events()
18241815

18251816
self._update_maintenance_events_configs_for_connections(
18261817
maintenance_events_pool_handler
@@ -1838,6 +1829,23 @@ def _update_maintenance_events_configs_for_connections(
18381829
conn.set_maintenance_event_pool_handler(maintenance_events_pool_handler)
18391830
conn.maintenance_events_config = maintenance_events_pool_handler.config
18401831

1832+
def _update_connection_kwargs_for_maintenance_events(self):
1833+
"""Store original connection parameters for maintenance events."""
1834+
if self.connection_kwargs.get("orig_host_address", None) is None:
1835+
# If orig_host_address is None it means we haven't
1836+
# configured the original values yet
1837+
self.connection_kwargs.update(
1838+
{
1839+
"orig_host_address": self.connection_kwargs.get("host"),
1840+
"orig_socket_timeout": self.connection_kwargs.get(
1841+
"socket_timeout", None
1842+
),
1843+
"orig_socket_connect_timeout": self.connection_kwargs.get(
1844+
"socket_connect_timeout", None
1845+
),
1846+
}
1847+
)
1848+
18411849
def reset(self) -> None:
18421850
self._created_connections = 0
18431851
self._available_connections = []

redis/maintenance_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -449,7 +449,7 @@ def __init__(
449449
self,
450450
enabled: bool = True,
451451
proactive_reconnect: bool = True,
452-
relax_timeout: Optional[Number] = 20,
452+
relax_timeout: Optional[Number] = 10,
453453
endpoint_type: Optional[EndpointType] = None,
454454
):
455455
"""

tasks.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ def standalone_tests(
5858

5959
if uvloop:
6060
run(
61-
f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --cov=./ --cov-report=xml:coverage_resp{protocol}_uvloop.xml -m 'not onlycluster{extra_markers}' --uvloop --junit-xml=standalone-resp{protocol}-uvloop-results.xml"
61+
f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_resp{protocol}_uvloop.xml -m 'not onlycluster{extra_markers}' --uvloop --junit-xml=standalone-resp{protocol}-uvloop-results.xml"
6262
)
6363
else:
6464
run(
65-
f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --cov=./ --cov-report=xml:coverage_resp{protocol}.xml -m 'not onlycluster{extra_markers}' --junit-xml=standalone-resp{protocol}-results.xml"
65+
f"pytest {profile_arg} --protocol={protocol} {redis_mod_url} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_resp{protocol}.xml -m 'not onlycluster{extra_markers}' --junit-xml=standalone-resp{protocol}-results.xml"
6666
)
6767

6868

@@ -74,11 +74,11 @@ def cluster_tests(c, uvloop=False, protocol=2, profile=False):
7474
cluster_tls_url = "rediss://localhost:27379/0"
7575
if uvloop:
7676
run(
77-
f"pytest {profile_arg} --protocol={protocol} --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}_uvloop.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-uvloop-results.xml --uvloop"
77+
f"pytest {profile_arg} --protocol={protocol} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}_uvloop.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-uvloop-results.xml --uvloop"
7878
)
7979
else:
8080
run(
81-
f"pytest {profile_arg} --protocol={protocol} --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-results.xml"
81+
f"pytest {profile_arg} --protocol={protocol} --ignore=tests/test_scenario --cov=./ --cov-report=xml:coverage_cluster_resp{protocol}.xml -m 'not onlynoncluster and not redismod' --redis-url={cluster_url} --redis-ssl-url={cluster_tls_url} --junit-xml=cluster-resp{protocol}-results.xml"
8282
)
8383

8484

tests/test_maintenance_events.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ def test_init_defaults(self):
381381
config = MaintenanceEventsConfig()
382382
assert config.enabled is True
383383
assert config.proactive_reconnect is True
384-
assert config.relax_timeout == 20
384+
assert config.relax_timeout == 10
385385

386386
def test_init_custom_values(self):
387387
"""Test MaintenanceEventsConfig initialization with custom values."""

tests/test_maintenance_events_handling.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,7 @@ def send(self, data):
236236
# MOVING push message before SET key_receive_moving_none_X response
237237
# Format: >4\r\n$6\r\nMOVING\r\n:1\r\n:1\r\n+null\r\n (4 elements: MOVING, id, ttl, null)
238238
# Note: Using + instead of $ to send as simple string instead of bulk string
239-
moving_push = (
240-
f">4\r\n$6\r\nMOVING\r\n:1\r\n:{MOVING_TIMEOUT}\r\n+null\r\n"
241-
)
239+
moving_push = f">4\r\n$6\r\nMOVING\r\n:1\r\n:{MOVING_TIMEOUT}\r\n_\r\n"
242240
response = moving_push.encode() + response
243241
elif b"key_receive_moving_" in data:
244242
# MOVING push message before SET key_receive_moving_X response

tests/test_scenario/conftest.py

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
import json
2+
import logging
3+
import os
4+
from typing import Optional
5+
from urllib.parse import urlparse
6+
import pytest
7+
8+
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
9+
from redis.client import Redis
10+
from redis.maintenance_events import EndpointType, MaintenanceEventsConfig
11+
from redis.retry import Retry
12+
from tests.test_scenario.fault_injector_client import FaultInjectorClient
13+
14+
RELAX_TIMEOUT = 30
15+
CLIENT_TIMEOUT = 5
16+
17+
DEFAULT_ENDPOINT_NAME = "m-standard"
18+
19+
20+
@pytest.fixture()
21+
def endpoint_name(request):
22+
return request.config.getoption("--endpoint-name") or os.getenv(
23+
"REDIS_ENDPOINT_NAME", DEFAULT_ENDPOINT_NAME
24+
)
25+
26+
27+
@pytest.fixture()
28+
def endpoints_config(endpoint_name: str):
29+
endpoints_config = os.getenv("REDIS_ENDPOINTS_CONFIG_PATH", None)
30+
31+
if not (endpoints_config and os.path.exists(endpoints_config)):
32+
raise FileNotFoundError(f"Endpoints config file not found: {endpoints_config}")
33+
34+
try:
35+
with open(endpoints_config, "r") as f:
36+
data = json.load(f)
37+
db = data[endpoint_name]
38+
return db
39+
except Exception as e:
40+
raise ValueError(
41+
f"Failed to load endpoints config file: {endpoints_config}"
42+
) from e
43+
44+
45+
@pytest.fixture()
46+
def fault_injector_client():
47+
url = os.getenv("FAULT_INJECTION_API_URL", "http://127.0.0.1:20324")
48+
return FaultInjectorClient(url)
49+
50+
51+
@pytest.fixture()
52+
def client_maint_events(endpoints_config):
53+
return _get_client_maint_events(endpoints_config)
54+
55+
56+
def _get_client_maint_events(
57+
endpoints_config,
58+
enable_maintenance_events: bool = True,
59+
endpoint_type: Optional[EndpointType] = None,
60+
enable_relax_timeout: bool = True,
61+
enable_proactive_reconnect: bool = True,
62+
disable_retries: bool = False,
63+
socket_timeout: Optional[float] = None,
64+
):
65+
"""Create Redis client with maintenance events enabled."""
66+
67+
# Get credentials from the configuration
68+
username = endpoints_config.get("username")
69+
password = endpoints_config.get("password")
70+
71+
# Parse host and port from endpoints URL
72+
endpoints = endpoints_config.get("endpoints", [])
73+
if not endpoints:
74+
raise ValueError("No endpoints found in configuration")
75+
76+
parsed = urlparse(endpoints[0])
77+
host = parsed.hostname
78+
port = parsed.port
79+
80+
tls_enabled = True if parsed.scheme == "rediss" else False
81+
82+
if not host:
83+
raise ValueError(f"Could not parse host from endpoint URL: {endpoints[0]}")
84+
85+
logging.info(f"Connecting to Redis Enterprise: {host}:{port} with user: {username}")
86+
87+
# Configure maintenance events
88+
maintenance_config = MaintenanceEventsConfig(
89+
enabled=enable_maintenance_events,
90+
proactive_reconnect=enable_proactive_reconnect,
91+
relax_timeout=RELAX_TIMEOUT if enable_relax_timeout else -1,
92+
endpoint_type=endpoint_type,
93+
)
94+
95+
# Create Redis client with maintenance events config
96+
# This will automatically create the MaintenanceEventPoolHandler
97+
if disable_retries:
98+
retry = Retry(NoBackoff(), 0)
99+
else:
100+
retry = Retry(backoff=ExponentialWithJitterBackoff(base=1, cap=10), retries=3)
101+
102+
client = Redis(
103+
host=host,
104+
port=port,
105+
socket_timeout=CLIENT_TIMEOUT if socket_timeout is None else socket_timeout,
106+
username=username,
107+
password=password,
108+
ssl=tls_enabled,
109+
ssl_cert_reqs="none",
110+
ssl_check_hostname=False,
111+
protocol=3, # RESP3 required for push notifications
112+
maintenance_events_config=maintenance_config,
113+
retry=retry,
114+
)
115+
logging.info("Redis client created with maintenance events enabled")
116+
logging.info(f"Client uses Protocol: {client.connection_pool.get_protocol()}")
117+
maintenance_handler_exists = client.maintenance_events_pool_handler is not None
118+
logging.info(f"Maintenance events pool handler: {maintenance_handler_exists}")
119+
120+
return client

tests/test_scenario/fault_injector_client.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,23 @@
11
import json
2+
import logging
3+
import time
24
import urllib.request
35
from typing import Dict, Any, Optional, Union
46
from enum import Enum
57

8+
import pytest
9+
10+
11+
class TaskStatuses:
12+
"""Class to hold completed statuses constants."""
13+
14+
FAILED = "failed"
15+
FINISHED = "finished"
16+
SUCCESS = "success"
17+
RUNNING = "running"
18+
19+
COMPLETED_STATUSES = [FAILED, FINISHED, SUCCESS]
20+
621

722
class ActionType(str, Enum):
823
DMC_RESTART = "dmc_restart"
@@ -103,3 +118,32 @@ def execute_rladmin_command(
103118
error_body = json.loads(e.read().decode("utf-8"))
104119
raise ValueError(f"Validation Error: {error_body}")
105120
raise
121+
122+
def get_operation_result(
123+
self,
124+
action_id: str,
125+
timeout: int = 60,
126+
) -> Dict[str, Any]:
127+
"""Get the result of a specific action"""
128+
start_time = time.time()
129+
check_interval = 3
130+
while time.time() - start_time < timeout:
131+
try:
132+
status_result = self.get_action_status(action_id)
133+
operation_status = status_result.get("status", "unknown")
134+
135+
if operation_status in TaskStatuses.COMPLETED_STATUSES:
136+
logging.debug(
137+
f"Operation {action_id} completed with status: "
138+
f"{operation_status}"
139+
)
140+
if operation_status != TaskStatuses.SUCCESS:
141+
pytest.fail(f"Operation {action_id} failed: {status_result}")
142+
return status_result
143+
144+
time.sleep(check_interval)
145+
except Exception as e:
146+
logging.warning(f"Error checking operation status: {e}")
147+
time.sleep(check_interval)
148+
else:
149+
raise TimeoutError(f"Timeout waiting for operation {action_id}")

0 commit comments

Comments
 (0)