From 46afaeaa1717b2a87e1ee3b722fa8d8fbf4a6f6e Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Fri, 4 Jul 2025 15:22:55 +0300 Subject: [PATCH 1/9] Added support for Active-Active pipeline --- redis/event.py | 8 +- redis/multidb/client.py | 90 ++++++++++++++++++--- redis/multidb/command_executor.py | 31 +++++-- redis/multidb/event.py | 4 +- tests/test_multidb/test_client.py | 6 +- tests/test_multidb/test_command_executor.py | 10 +-- 6 files changed, 116 insertions(+), 33 deletions(-) diff --git a/redis/event.py b/redis/event.py index 8bc1bd4f41..76c51ae958 100644 --- a/redis/event.py +++ b/redis/event.py @@ -249,23 +249,23 @@ def nodes(self) -> dict: def credential_provider(self) -> Union[CredentialProvider, None]: return self._credential_provider -class OnCommandFailEvent: +class OnCommandsFailEvent: """ Event fired whenever a command fails during the execution. """ def __init__( self, - command: tuple, + commands: tuple, exception: Exception, client, ): - self._command = command + self._commands = commands self._exception = exception self._client = client @property def command(self) -> tuple: - return self._command + return self._commands @property def exception(self) -> Exception: diff --git a/redis/multidb/client.py b/redis/multidb/client.py index 96433bead0..4def2b5a4e 100644 --- a/redis/multidb/client.py +++ b/redis/multidb/client.py @@ -1,4 +1,5 @@ import threading +from typing import List, Any from redis.background import BackgroundScheduler from redis.commands import RedisModuleCommands, CoreCommands, SentinelCommands @@ -25,18 +26,19 @@ def __init__(self, config: MultiDbConfig): self._failover_strategy.set_databases(self._databases) self._auto_fallback_interval = config.auto_fallback_interval self._event_dispatcher = config.event_dispatcher - self._command_executor = DefaultCommandExecutor( + self.command_executor = DefaultCommandExecutor( failure_detectors=self._failure_detectors, databases=self._databases, failover_strategy=self._failover_strategy, event_dispatcher=self._event_dispatcher, auto_fallback_interval=self._auto_fallback_interval, ) - self._initialized = False + self.initialized = False self._hc_lock = threading.RLock() self._bg_scheduler = BackgroundScheduler() + self._config = config - def _initialize(self): + def initialize(self): """ Perform initialization of databases to define their initial state. """ @@ -59,7 +61,7 @@ def _initialize(self): # Set states according to a weights and circuit state if database.circuit.state == CBState.CLOSED and not is_active_db: database.state = DBState.ACTIVE - self._command_executor.active_database = database + self.command_executor.active_database = database is_active_db = True elif database.circuit.state == CBState.CLOSED and is_active_db: database.state = DBState.PASSIVE @@ -69,7 +71,7 @@ def _initialize(self): if not is_active_db: raise NoValidDatabaseException('Initial connection failed - no active database found') - self._initialized = True + self.initialized = True def get_databases(self) -> Databases: """ @@ -96,7 +98,7 @@ def set_active_database(self, database: AbstractDatabase) -> None: highest_weighted_db, _ = self._databases.get_top_n(1)[0] highest_weighted_db.state = DBState.PASSIVE database.state = DBState.ACTIVE - self._command_executor.active_database = database + self.command_executor.active_database = database return raise NoValidDatabaseException('Cannot set active database, database is unhealthy') @@ -116,7 +118,7 @@ def add_database(self, database: AbstractDatabase): if database.weight > highest_weight and database.circuit.state == CBState.CLOSED: database.state = DBState.ACTIVE - self._command_executor.active_database = database + self.command_executor.active_database = database highest_weighted_db.state = DBState.PASSIVE def remove_database(self, database: Database): @@ -128,7 +130,7 @@ def remove_database(self, database: Database): if highest_weight <= weight and highest_weighted_db.circuit.state == CBState.CLOSED: highest_weighted_db.state = DBState.ACTIVE - self._command_executor.active_database = highest_weighted_db + self.command_executor.active_database = highest_weighted_db def update_database_weight(self, database: AbstractDatabase, weight: float): """ @@ -148,7 +150,7 @@ def update_database_weight(self, database: AbstractDatabase, weight: float): if weight > highest_weight and database.circuit.state == CBState.CLOSED: database.state = DBState.ACTIVE - self._command_executor.active_database = database + self.command_executor.active_database = database highest_weighted_db.state = DBState.PASSIVE def add_failure_detector(self, failure_detector: FailureDetector): @@ -168,10 +170,13 @@ def execute_command(self, *args, **options): """ Executes a single command and return its result. """ - if not self._initialized: - self._initialize() + if not self.initialized: + self.initialize() - return self._command_executor.execute_command(*args, **options) + return self.command_executor.execute_command(*args, **options) + + def pipeline(self): + return Pipeline(self) def _check_db_health(self, database: AbstractDatabase) -> None: """ @@ -190,8 +195,9 @@ def _check_db_health(self, database: AbstractDatabase) -> None: def _check_databases_health(self): """ - Runs health checks as recurring task. + Runs health checks against all databases. """ + for database, _ in self._databases: self._check_db_health(database) @@ -206,3 +212,61 @@ def _on_circuit_state_change_callback(self, circuit: CircuitBreaker, old_state: def _half_open_circuit(circuit: CircuitBreaker): circuit.state = CBState.HALF_OPEN + +class Pipeline(RedisModuleCommands, CoreCommands, SentinelCommands): + """ + Pipeline implementation for multiple logical Redis databases. + """ + def __init__(self, client: MultiDBClient): + self._command_stack = [] + self._client = client + + def __enter__(self) -> "Pipeline": + return self + + def __exit__(self, exc_type, exc_value, traceback): + self.reset() + + def __del__(self): + try: + self.reset() + except Exception: + pass + + def __len__(self) -> int: + return len(self._command_stack) + + def __bool__(self) -> bool: + """Pipeline instances should always evaluate to True""" + return True + + def reset(self) -> None: + self._command_stack = [] + + def close(self) -> None: + """Close the pipeline""" + self.reset() + + def pipeline_execute_command(self, *args, **options) -> "Pipeline": + """ + Stage a command to be executed when execute() is next called + + Returns the current Pipeline object back so commands can be + chained together, such as: + + pipe = pipe.set('foo', 'bar').incr('baz').decr('bang') + + At some other point, you can then run: pipe.execute(), + which will execute all commands queued in the pipe. + """ + self._command_stack.append((args, options)) + return self + + def execute_command(self, *args, **kwargs): + return self.pipeline_execute_command(*args, **kwargs) + + def execute(self) -> List[Any]: + if not self._client.initialized: + self._client.initialize() + + return self._client.command_executor.execute_pipeline(tuple(self._command_stack)) diff --git a/redis/multidb/command_executor.py b/redis/multidb/command_executor.py index 60dbeca36b..15f6e71990 100644 --- a/redis/multidb/command_executor.py +++ b/redis/multidb/command_executor.py @@ -1,10 +1,10 @@ import socket from abc import ABC, abstractmethod from datetime import datetime, timedelta -from typing import List, Union, Optional +from typing import List, Union, Optional, Any, Callable from redis.exceptions import ConnectionError, TimeoutError -from redis.event import EventDispatcherInterface, OnCommandFailEvent +from redis.event import EventDispatcherInterface, OnCommandsFailEvent from redis.multidb.config import DEFAULT_AUTO_FALLBACK_INTERVAL from redis.multidb.database import Database, AbstractDatabase, Databases from redis.multidb.circuit import State as CBState @@ -128,6 +128,25 @@ def auto_fallback_interval(self, auto_fallback_interval: int) -> None: self._auto_fallback_interval = auto_fallback_interval def execute_command(self, *args, **options): + def callback(database): + return database.client.execute_command(*args, **options) + + return self._execute_with_failure_detection(callback, args) + + def execute_pipeline(self, command_stack: tuple): + def callback(database): + with database.client.pipeline() as pipe: + for command, options in command_stack: + pipe.execute_command(*command, **options) + + return pipe.execute() + + return self._execute_with_failure_detection(callback, command_stack) + + def _execute_with_failure_detection(self, callback: Callable, cmds: tuple): + """ + Execute a commands execution callback with failure detection. + """ if ( self._active_database is None or self._active_database.circuit.state != CBState.CLOSED @@ -140,13 +159,13 @@ def execute_command(self, *args, **options): self._schedule_next_fallback() try: - return self._active_database.client.execute_command(*args, **options) + return callback(self._active_database) except (ConnectionError, TimeoutError, socket.timeout) as e: # Register command failure - self._event_dispatcher.dispatch(OnCommandFailEvent(args, e, self.active_database.client)) + self._event_dispatcher.dispatch(OnCommandsFailEvent(cmds, e, self.active_database.client)) # Retry until failure detector will trigger opening of circuit - return self.execute_command(*args, **options) + return self._execute_with_failure_detection(callback, cmds) def _schedule_next_fallback(self) -> None: if self._auto_fallback_interval == DEFAULT_AUTO_FALLBACK_INTERVAL: @@ -160,5 +179,5 @@ def _setup_event_dispatcher(self): """ event_listener = RegisterCommandFailure(self._failure_detectors, self._databases) self._event_dispatcher.register_listeners({ - OnCommandFailEvent: [event_listener], + OnCommandsFailEvent: [event_listener], }) \ No newline at end of file diff --git a/redis/multidb/event.py b/redis/multidb/event.py index 3d366dab77..08b633b512 100644 --- a/redis/multidb/event.py +++ b/redis/multidb/event.py @@ -1,6 +1,6 @@ from typing import List -from redis.event import EventListenerInterface, OnCommandFailEvent +from redis.event import EventListenerInterface, OnCommandsFailEvent from redis.multidb.config import Databases from redis.multidb.failure_detector import FailureDetector @@ -13,7 +13,7 @@ def __init__(self, failure_detectors: List[FailureDetector], databases: Database self._failure_detectors = failure_detectors self._databases = databases - def listen(self, event: OnCommandFailEvent) -> None: + def listen(self, event: OnCommandsFailEvent) -> None: matching_database = None for database, _ in self._databases: diff --git a/tests/test_multidb/test_client.py b/tests/test_multidb/test_client.py index 96ab0c0a64..a6a961f72f 100644 --- a/tests/test_multidb/test_client.py +++ b/tests/test_multidb/test_client.py @@ -4,7 +4,7 @@ import pybreaker import pytest -from redis.event import EventDispatcher, OnCommandFailEvent +from redis.event import EventDispatcher, OnCommandsFailEvent from redis.multidb.circuit import State as CBState, PBCircuitBreakerAdapter from redis.multidb.config import DEFAULT_HEALTH_CHECK_RETRIES, DEFAULT_HEALTH_CHECK_BACKOFF, DEFAULT_FAILOVER_RETRIES, \ DEFAULT_FAILOVER_BACKOFF @@ -423,8 +423,8 @@ def test_add_new_failure_detector( mock_fd = mock_multi_db_config.failure_detectors[0] # Event fired if command against mock_db1 would fail - command_fail_event = OnCommandFailEvent( - command=('SET', 'key', 'value'), + command_fail_event = OnCommandsFailEvent( + commands=('SET', 'key', 'value'), exception=Exception(), client=mock_db1.client ) diff --git a/tests/test_multidb/test_command_executor.py b/tests/test_multidb/test_command_executor.py index 54c6d38e1d..2b9e39ab29 100644 --- a/tests/test_multidb/test_command_executor.py +++ b/tests/test_multidb/test_command_executor.py @@ -3,7 +3,7 @@ import pytest -from redis.event import EventDispatcher, OnCommandFailEvent +from redis.event import EventDispatcher, OnCommandsFailEvent from redis.multidb.circuit import State as CBState from redis.multidb.command_executor import DefaultCommandExecutor from redis.multidb.failure_detector import CommandFailureDetector @@ -139,8 +139,8 @@ def test_execute_command_fallback_to_another_db_after_failure_detection( databases = create_weighted_list(mock_db, mock_db1, mock_db2) # Event fired if command against mock_db1 would fail - command_fail_event = OnCommandFailEvent( - command=('SET', 'key', 'value'), + command_fail_event = OnCommandsFailEvent( + commands=('SET', 'key', 'value'), exception=Exception(), client=mock_db1.client ) @@ -161,8 +161,8 @@ def test_execute_command_fallback_to_another_db_after_failure_detection( assert executor.execute_command('SET', 'key', 'value') == 'OK2' - command_fail_event = OnCommandFailEvent( - command=('SET', 'key', 'value'), + command_fail_event = OnCommandsFailEvent( + commands=('SET', 'key', 'value'), exception=Exception(), client=mock_db2.client ) From f16b6462f0fa1b15a0e7316c8bdd1571ebd7aa82 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Mon, 7 Jul 2025 12:52:23 +0300 Subject: [PATCH 2/9] Added Pipeline testing --- tests/test_multidb/test_pipeline.py | 200 ++++++++++++++++++++++++++++ 1 file changed, 200 insertions(+) create mode 100644 tests/test_multidb/test_pipeline.py diff --git a/tests/test_multidb/test_pipeline.py b/tests/test_multidb/test_pipeline.py new file mode 100644 index 0000000000..33008bea09 --- /dev/null +++ b/tests/test_multidb/test_pipeline.py @@ -0,0 +1,200 @@ +from time import sleep +from unittest.mock import patch, Mock + +import pybreaker +import pytest + +from redis.client import Pipeline +from redis.multidb.circuit import State as CBState, PBCircuitBreakerAdapter +from redis.multidb.client import MultiDBClient +from redis.multidb.config import DEFAULT_HEALTH_CHECK_RETRIES, DEFAULT_HEALTH_CHECK_BACKOFF, DEFAULT_FAILOVER_RETRIES, \ + DEFAULT_FAILOVER_BACKOFF +from redis.multidb.failover import WeightBasedFailoverStrategy +from redis.multidb.healthcheck import EchoHealthCheck +from redis.retry import Retry +from tests.test_multidb.conftest import create_weighted_list + +def mock_pipe() -> Pipeline: + mock_pipe = Mock(spec=Pipeline) + mock_pipe.__enter__ = Mock(return_value=mock_pipe) + mock_pipe.__exit__ = Mock(return_value=None) + return mock_pipe + +class TestPipeline: + @pytest.mark.parametrize( + 'mock_multi_db_config,mock_db, mock_db1, mock_db2', + [ + ( + {}, + {'weight': 0.2, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.5, 'circuit': {'state': CBState.CLOSED}}, + ), + ], + indirect=True, + ) + def test_executes_pipeline_against_correct_db( + self, mock_multi_db_config, mock_db, mock_db1, mock_db2 + ): + databases = create_weighted_list(mock_db, mock_db1, mock_db2) + + with patch.object( + mock_multi_db_config, + 'databases', + return_value=databases + ): + pipe = mock_pipe() + pipe.execute.return_value = ['OK1', 'value1'] + mock_db1.client.pipeline.return_value = pipe + + for hc in mock_multi_db_config.health_checks: + hc.check_health.return_value = True + + client = MultiDBClient(mock_multi_db_config) + assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1 + + pipe = client.pipeline() + pipe.set('key1', 'value1') + pipe.get('key1') + + assert pipe.execute() == ['OK1', 'value1'] + + for hc in mock_multi_db_config.health_checks: + assert hc.check_health.call_count == 3 + + @pytest.mark.parametrize( + 'mock_multi_db_config,mock_db, mock_db1, mock_db2', + [ + ( + {}, + {'weight': 0.2, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.5, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.7, 'circuit': {'state': CBState.OPEN}}, + ), + ], + indirect=True, + ) + def test_execute_pipeline_against_correct_db_and_closed_circuit( + self, mock_multi_db_config, mock_db, mock_db1, mock_db2 + ): + databases = create_weighted_list(mock_db, mock_db1, mock_db2) + + with patch.object( + mock_multi_db_config, + 'databases', + return_value=databases + ): + pipe = mock_pipe() + pipe.execute.return_value = ['OK1', 'value1'] + mock_db1.client.pipeline.return_value = pipe + + for hc in mock_multi_db_config.health_checks: + hc.check_health.side_effect = [False, True, True] + + client = MultiDBClient(mock_multi_db_config) + assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1 + + with client.pipeline() as pipe: + pipe.set('key1', 'value1') + pipe.get('key1') + + assert pipe.execute() == ['OK1', 'value1'] + + for hc in mock_multi_db_config.health_checks: + assert hc.check_health.call_count == 3 + + assert mock_db.circuit.state == CBState.CLOSED + assert mock_db1.circuit.state == CBState.CLOSED + assert mock_db2.circuit.state == CBState.OPEN + + @pytest.mark.parametrize( + 'mock_multi_db_config,mock_db, mock_db1, mock_db2', + [ + ( + {}, + {'weight': 0.2, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.5, 'circuit': {'state': CBState.CLOSED}}, + ), + ], + indirect=True, + ) + def test_execute_pipeline_against_correct_db_on_background_health_check_determine_active_db_unhealthy( + self, mock_multi_db_config, mock_db, mock_db1, mock_db2 + ): + cb = PBCircuitBreakerAdapter(pybreaker.CircuitBreaker(reset_timeout=5)) + cb.database = mock_db + mock_db.circuit = cb + + cb1 = PBCircuitBreakerAdapter(pybreaker.CircuitBreaker(reset_timeout=5)) + cb1.database = mock_db1 + mock_db1.circuit = cb1 + + cb2 = PBCircuitBreakerAdapter(pybreaker.CircuitBreaker(reset_timeout=5)) + cb2.database = mock_db2 + mock_db2.circuit = cb2 + + databases = create_weighted_list(mock_db, mock_db1, mock_db2) + + with patch.object( + mock_multi_db_config, + 'databases', + return_value=databases + ): + mock_db.client.execute_command.side_effect = ['healthcheck', 'healthcheck', 'healthcheck', 'error'] + mock_db1.client.execute_command.side_effect = ['healthcheck', 'error', 'error', 'healthcheck'] + mock_db2.client.execute_command.side_effect = ['healthcheck', 'healthcheck', 'error', 'error'] + + pipe = mock_pipe() + pipe.execute.return_value = ['OK', 'value'] + mock_db.client.pipeline.return_value = pipe + + pipe1 = mock_pipe() + pipe1.execute.return_value = ['OK1', 'value'] + mock_db1.client.pipeline.return_value = pipe1 + + pipe2 = mock_pipe() + pipe2.execute.return_value = ['OK2', 'value'] + mock_db2.client.pipeline.return_value = pipe2 + + mock_multi_db_config.health_check_interval = 0.1 + mock_multi_db_config.health_checks = [ + EchoHealthCheck( + retry=Retry(retries=DEFAULT_HEALTH_CHECK_RETRIES, backoff=DEFAULT_HEALTH_CHECK_BACKOFF) + ) + ] + mock_multi_db_config.failover_strategy = WeightBasedFailoverStrategy( + retry=Retry(retries=DEFAULT_FAILOVER_RETRIES, backoff=DEFAULT_FAILOVER_BACKOFF) + ) + + client = MultiDBClient(mock_multi_db_config) + + with client.pipeline() as pipe: + pipe.set('key1', 'value') + pipe.get('key1') + + assert pipe.execute() == ['OK1', 'value'] + + sleep(0.15) + + with client.pipeline() as pipe: + pipe.set('key1', 'value') + pipe.get('key1') + + assert pipe.execute() == ['OK2', 'value'] + + sleep(0.1) + + with client.pipeline() as pipe: + pipe.set('key1', 'value') + pipe.get('key1') + + assert pipe.execute() == ['OK', 'value'] + + sleep(0.1) + + with client.pipeline() as pipe: + pipe.set('key1', 'value') + pipe.get('key1') + + assert pipe.execute() == ['OK1', 'value'] From 7e43b40a35f8746cc3b2ce41f71eebc36d5a0010 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Mon, 14 Jul 2025 17:09:04 +0300 Subject: [PATCH 3/9] Added support for transactions --- redis/multidb/client.py | 14 ++- redis/multidb/command_executor.py | 11 +- tests/test_multidb/test_pipeline.py | 154 +++++++++++++++++++++++++++- 3 files changed, 175 insertions(+), 4 deletions(-) diff --git a/redis/multidb/client.py b/redis/multidb/client.py index b23f1cd696..936eeda1a3 100644 --- a/redis/multidb/client.py +++ b/redis/multidb/client.py @@ -1,6 +1,6 @@ import threading import socket -from typing import List, Any +from typing import List, Any, Callable from redis.background import BackgroundScheduler from redis.exceptions import ConnectionError, TimeoutError @@ -178,8 +178,20 @@ def execute_command(self, *args, **options): return self.command_executor.execute_command(*args, **options) def pipeline(self): + """ + Enters into pipeline mode of the client. + """ return Pipeline(self) + def transaction(self, func: Callable[["Pipeline"], None], *watches, **options): + """ + Executes callable as transaction. + """ + if not self.initialized: + self.initialize() + + return self.command_executor.execute_transaction(func, *watches, *options) + def _check_db_health(self, database: AbstractDatabase) -> None: """ Runs health checks on the given database until first failure. diff --git a/redis/multidb/command_executor.py b/redis/multidb/command_executor.py index 15f6e71990..ec2fc61e06 100644 --- a/redis/multidb/command_executor.py +++ b/redis/multidb/command_executor.py @@ -1,8 +1,9 @@ import socket from abc import ABC, abstractmethod from datetime import datetime, timedelta -from typing import List, Union, Optional, Any, Callable +from typing import List, Union, Optional, Callable +from redis.client import Pipeline from redis.exceptions import ConnectionError, TimeoutError from redis.event import EventDispatcherInterface, OnCommandsFailEvent from redis.multidb.config import DEFAULT_AUTO_FALLBACK_INTERVAL @@ -143,7 +144,13 @@ def callback(database): return self._execute_with_failure_detection(callback, command_stack) - def _execute_with_failure_detection(self, callback: Callable, cmds: tuple): + def execute_transaction(self, transaction: Callable[[Pipeline], None], *watches, **options): + def callback(database): + return database.client.transaction(transaction, *watches, **options) + + return self._execute_with_failure_detection(callback) + + def _execute_with_failure_detection(self, callback: Callable, cmds: tuple = ()): """ Execute a commands execution callback with failure detection. """ diff --git a/tests/test_multidb/test_pipeline.py b/tests/test_multidb/test_pipeline.py index 33008bea09..9caad235df 100644 --- a/tests/test_multidb/test_pipeline.py +++ b/tests/test_multidb/test_pipeline.py @@ -4,11 +4,13 @@ import pybreaker import pytest +from redis.event import EventDispatcher +from redis.exceptions import ConnectionError from redis.client import Pipeline from redis.multidb.circuit import State as CBState, PBCircuitBreakerAdapter from redis.multidb.client import MultiDBClient from redis.multidb.config import DEFAULT_HEALTH_CHECK_RETRIES, DEFAULT_HEALTH_CHECK_BACKOFF, DEFAULT_FAILOVER_RETRIES, \ - DEFAULT_FAILOVER_BACKOFF + DEFAULT_FAILOVER_BACKOFF, DEFAULT_FAILURES_THRESHOLD from redis.multidb.failover import WeightBasedFailoverStrategy from redis.multidb.healthcheck import EchoHealthCheck from redis.retry import Retry @@ -198,3 +200,153 @@ def test_execute_pipeline_against_correct_db_on_background_health_check_determin pipe.get('key1') assert pipe.execute() == ['OK1', 'value'] + +class TestTransaction: + + @pytest.mark.parametrize( + 'mock_multi_db_config,mock_db, mock_db1, mock_db2', + [ + ( + {}, + {'weight': 0.2, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.5, 'circuit': {'state': CBState.CLOSED}}, + ), + ], + indirect=True, + ) + def test_executes_transaction_against_correct_db( + self, mock_multi_db_config, mock_db, mock_db1, mock_db2 + ): + databases = create_weighted_list(mock_db, mock_db1, mock_db2) + + with patch.object( + mock_multi_db_config, + 'databases', + return_value=databases + ): + mock_db1.client.transaction.return_value = ['OK1', 'value1'] + + for hc in mock_multi_db_config.health_checks: + hc.check_health.return_value = True + + client = MultiDBClient(mock_multi_db_config) + assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1 + + def callback(pipe: Pipeline): + pipe.set('key1', 'value1') + pipe.get('key1') + + assert client.transaction(callback) == ['OK1', 'value1'] + + for hc in mock_multi_db_config.health_checks: + assert hc.check_health.call_count == 3 + + @pytest.mark.parametrize( + 'mock_multi_db_config,mock_db, mock_db1, mock_db2', + [ + ( + {}, + {'weight': 0.2, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.5, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.7, 'circuit': {'state': CBState.OPEN}}, + ), + ], + indirect=True, + ) + def test_execute_transaction_against_correct_db_and_closed_circuit( + self, mock_multi_db_config, mock_db, mock_db1, mock_db2 + ): + databases = create_weighted_list(mock_db, mock_db1, mock_db2) + + with patch.object( + mock_multi_db_config, + 'databases', + return_value=databases + ): + mock_db1.client.transaction.return_value = ['OK1', 'value1'] + + for hc in mock_multi_db_config.health_checks: + hc.check_health.side_effect = [False, True, True] + + client = MultiDBClient(mock_multi_db_config) + assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1 + + def callback(pipe: Pipeline): + pipe.set('key1', 'value1') + pipe.get('key1') + + assert client.transaction(callback) == ['OK1', 'value1'] + + for hc in mock_multi_db_config.health_checks: + assert hc.check_health.call_count == 3 + + assert mock_db.circuit.state == CBState.CLOSED + assert mock_db1.circuit.state == CBState.CLOSED + assert mock_db2.circuit.state == CBState.OPEN + + @pytest.mark.parametrize( + 'mock_multi_db_config,mock_db, mock_db1, mock_db2', + [ + ( + {}, + {'weight': 0.2, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.7, 'circuit': {'state': CBState.CLOSED}}, + {'weight': 0.5, 'circuit': {'state': CBState.CLOSED}}, + ), + ], + indirect=True, + ) + def test_execute_transaction_against_correct_db_on_background_health_check_determine_active_db_unhealthy( + self, mock_multi_db_config, mock_db, mock_db1, mock_db2 + ): + cb = PBCircuitBreakerAdapter(pybreaker.CircuitBreaker(reset_timeout=5)) + cb.database = mock_db + mock_db.circuit = cb + + cb1 = PBCircuitBreakerAdapter(pybreaker.CircuitBreaker(reset_timeout=5)) + cb1.database = mock_db1 + mock_db1.circuit = cb1 + + cb2 = PBCircuitBreakerAdapter(pybreaker.CircuitBreaker(reset_timeout=5)) + cb2.database = mock_db2 + mock_db2.circuit = cb2 + + databases = create_weighted_list(mock_db, mock_db1, mock_db2) + + with patch.object( + mock_multi_db_config, + 'databases', + return_value=databases + ): + mock_db.client.execute_command.side_effect = ['healthcheck', 'healthcheck', 'healthcheck', 'error'] + mock_db1.client.execute_command.side_effect = ['healthcheck', 'error', 'error', 'healthcheck'] + mock_db2.client.execute_command.side_effect = ['healthcheck', 'healthcheck', 'error', 'error'] + + mock_db.client.transaction.return_value = ['OK', 'value'] + mock_db1.client.transaction.return_value = ['OK1', 'value'] + mock_db2.client.transaction.return_value = ['OK2', 'value'] + + mock_multi_db_config.health_check_interval = 0.1 + mock_multi_db_config.health_checks = [ + EchoHealthCheck( + retry=Retry(retries=DEFAULT_HEALTH_CHECK_RETRIES, backoff=DEFAULT_HEALTH_CHECK_BACKOFF) + ) + ] + mock_multi_db_config.failover_strategy = WeightBasedFailoverStrategy( + retry=Retry(retries=DEFAULT_FAILOVER_RETRIES, backoff=DEFAULT_FAILOVER_BACKOFF) + ) + + client = MultiDBClient(mock_multi_db_config) + + def callback(pipe: Pipeline): + pipe.set('key1', 'value1') + pipe.get('key1') + + assert client.transaction(callback) == ['OK1', 'value'] + sleep(0.15) + assert client.transaction(callback) == ['OK2', 'value'] + sleep(0.1) + assert client.transaction(callback) == ['OK', 'value'] + sleep(0.1) + assert client.transaction(callback) == ['OK1', 'value'] \ No newline at end of file From 05630243706477f98994d227f50977564bc34739 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Tue, 15 Jul 2025 10:38:08 +0300 Subject: [PATCH 4/9] Added missing doc blocks --- redis/multidb/client.py | 2 +- redis/multidb/command_executor.py | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/redis/multidb/client.py b/redis/multidb/client.py index fc40a950c2..ed6883b110 100644 --- a/redis/multidb/client.py +++ b/redis/multidb/client.py @@ -65,7 +65,7 @@ def initialize(self): database.state = DBState.ACTIVE self.command_executor.active_database = database is_active_db_found = True - elif database.circuit.state == CBState.CLOSED and is_active_db: + elif database.circuit.state == CBState.CLOSED and is_active_db_found: database.state = DBState.PASSIVE else: database.state = DBState.DISCONNECTED diff --git a/redis/multidb/command_executor.py b/redis/multidb/command_executor.py index ec2fc61e06..3b600a9ebb 100644 --- a/redis/multidb/command_executor.py +++ b/redis/multidb/command_executor.py @@ -129,12 +129,16 @@ def auto_fallback_interval(self, auto_fallback_interval: int) -> None: self._auto_fallback_interval = auto_fallback_interval def execute_command(self, *args, **options): + """Executes a command and returns the result.""" def callback(database): return database.client.execute_command(*args, **options) return self._execute_with_failure_detection(callback, args) def execute_pipeline(self, command_stack: tuple): + """ + Executes a stack of commands in pipeline. + """ def callback(database): with database.client.pipeline() as pipe: for command, options in command_stack: @@ -145,6 +149,9 @@ def callback(database): return self._execute_with_failure_detection(callback, command_stack) def execute_transaction(self, transaction: Callable[[Pipeline], None], *watches, **options): + """ + Executes a transaction block wrapped in callback. + """ def callback(database): return database.client.transaction(transaction, *watches, **options) From 8922aa812226265c10f195fc30b665c375ad687b Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Thu, 24 Jul 2025 15:12:12 +0300 Subject: [PATCH 5/9] Added scenario tests for Pipeline and Transaction --- redis/multidb/client.py | 5 +- tests/test_scenario/test_active_active.py | 205 +++++++++++++++++++++- 2 files changed, 206 insertions(+), 4 deletions(-) diff --git a/redis/multidb/client.py b/redis/multidb/client.py index 85aa84acb5..16517b2ad3 100644 --- a/redis/multidb/client.py +++ b/redis/multidb/client.py @@ -305,4 +305,7 @@ def execute(self) -> List[Any]: if not self._client.initialized: self._client.initialize() - return self._client.command_executor.execute_pipeline(tuple(self._command_stack)) + try: + return self._client.command_executor.execute_pipeline(tuple(self._command_stack)) + finally: + self.reset() diff --git a/tests/test_scenario/test_active_active.py b/tests/test_scenario/test_active_active.py index 93e251ed4b..b92707ba7e 100644 --- a/tests/test_scenario/test_active_active.py +++ b/tests/test_scenario/test_active_active.py @@ -5,6 +5,7 @@ import pytest from redis.backoff import NoBackoff +from redis.client import Pipeline from redis.exceptions import ConnectionError from redis.retry import Retry from tests.test_scenario.conftest import get_endpoint_config @@ -50,7 +51,7 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector thread.start() r_multi_db.set('key', 'value') - current_active_db = r_multi_db._command_executor.active_database + current_active_db = r_multi_db.command_executor.active_database # Execute commands before network failure while not event.is_set(): @@ -58,7 +59,7 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector sleep(0.1) # Active db has been changed. - assert current_active_db != r_multi_db._command_executor.active_database + assert current_active_db != r_multi_db.command_executor.active_database # Execute commands after network failure for _ in range(3): @@ -86,7 +87,205 @@ def test_multi_db_client_throws_error_on_retry_exceed(self, r_multi_db, fault_in thread.start() with pytest.raises(ConnectionError): - # Retries count > failure threshold, so a client gives up earlier. + # Retries count < failure threshold, so a client gives up earlier. while not event.is_set(): assert r_multi_db.get('key') == 'value' + sleep(0.1) + +class TestActiveActiveStandalonePipeline: + @pytest.mark.parametrize( + "r_multi_db", + [ + {"failure_threshold": 2} + ], + indirect=True + ) + def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault_injector_client): + event = threading.Event() + thread = threading.Thread( + target=trigger_network_failure_action, + daemon=True, + args=(fault_injector_client,event) + ) + thread.start() + + current_active_db = r_multi_db.command_executor.active_database + + # Execute pipeline before network failure + while not event.is_set(): + with r_multi_db.pipeline() as pipe: + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] + sleep(0.1) + + # Active db has been changed. + assert current_active_db != r_multi_db.command_executor.active_database + + # Execute pipeline after network failure + for _ in range(3): + with r_multi_db.pipeline() as pipe: + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] + sleep(0.1) + + @pytest.mark.parametrize( + "r_multi_db", + [ + {"failure_threshold": 2} + ], + indirect=True + ) + def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_injector_client): + event = threading.Event() + thread = threading.Thread( + target=trigger_network_failure_action, + daemon=True, + args=(fault_injector_client,event) + ) + thread.start() + + current_active_db = r_multi_db.command_executor.active_database + + pipe = r_multi_db.pipeline() + + # Execute pipeline before network failure + while not event.is_set(): + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] + sleep(0.1) + + # Active db has been changed. + assert current_active_db != r_multi_db.command_executor.active_database + + # Execute pipeline after network failure + for _ in range(3): + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] + sleep(0.1) + + @pytest.mark.parametrize( + "r_multi_db", + [ + { + "failure_threshold": 15, + "command_retry": Retry(NoBackoff(), retries=5), + "health_check_interval": 100, + } + ], + indirect=True + ) + def test_pipeline_client_throws_error_on_retry_exceed(self, r_multi_db, fault_injector_client): + event = threading.Event() + thread = threading.Thread( + target=trigger_network_failure_action, + daemon=True, + args=(fault_injector_client,event) + ) + thread.start() + + with pytest.raises(ConnectionError): + # Retries count < failure threshold, so a client gives up earlier. + while not event.is_set(): + with r_multi_db.pipeline() as pipe: + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] + sleep(0.1) + +class TestActiveActiveStandaloneTransaction: + @pytest.mark.parametrize( + "r_multi_db", + [ + {"failure_threshold": 2} + ], + indirect=True + ) + def test_transaction_failover_to_another_db(self, r_multi_db, fault_injector_client): + event = threading.Event() + thread = threading.Thread( + target=trigger_network_failure_action, + daemon=True, + args=(fault_injector_client,event) + ) + thread.start() + + current_active_db = r_multi_db.command_executor.active_database + + def callback(pipe: Pipeline): + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + + # Execute pipeline before network failure + while not event.is_set(): + r_multi_db.transaction(callback) + sleep(0.1) + + # Active db has been changed. + assert current_active_db != r_multi_db.command_executor.active_database + + # Execute pipeline after network failure + for _ in range(3): + r_multi_db.transaction(callback) + sleep(0.1) + + @pytest.mark.parametrize( + "r_multi_db", + [ + { + "failure_threshold": 15, + "command_retry": Retry(NoBackoff(), retries=5), + "health_check_interval": 100, + } + ], + indirect=True + ) + def test_transaction_client_throws_error_on_retry_exceed(self, r_multi_db, fault_injector_client): + event = threading.Event() + thread = threading.Thread( + target=trigger_network_failure_action, + daemon=True, + args=(fault_injector_client,event) + ) + thread.start() + + def callback(pipe: Pipeline): + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + + with pytest.raises(ConnectionError): + # Retries count < failure threshold, so a client gives up earlier. + while not event.is_set(): + r_multi_db.transaction(callback) sleep(0.1) \ No newline at end of file From 94eff21ab4efcea8e5968f19a8b88f7f7292d931 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Thu, 24 Jul 2025 16:03:03 +0300 Subject: [PATCH 6/9] Added handling of ConnectionRefusedError, added timeouts so cluster could recover --- redis/multidb/client.py | 6 ++++-- tests/test_scenario/test_active_active.py | 18 ++++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/redis/multidb/client.py b/redis/multidb/client.py index 16517b2ad3..85c719fc1a 100644 --- a/redis/multidb/client.py +++ b/redis/multidb/client.py @@ -30,10 +30,12 @@ def __init__(self, config: MultiDbConfig): self._failover_strategy.set_databases(self._databases) self._auto_fallback_interval = config.auto_fallback_interval self._event_dispatcher = config.event_dispatcher + self._command_retry = config.command_retry + self._command_retry.update_supported_errors((ConnectionRefusedError,)) self.command_executor = DefaultCommandExecutor( failure_detectors=self._failure_detectors, databases=self._databases, - command_retry=config.command_retry, + command_retry=self._command_retry, failover_strategy=self._failover_strategy, event_dispatcher=self._event_dispatcher, auto_fallback_interval=self._auto_fallback_interval, @@ -219,7 +221,7 @@ def _check_db_health(self, database: AbstractDatabase, on_error: Callable[[Excep database.circuit.state = CBState.OPEN elif is_healthy and database.circuit.state != CBState.CLOSED: database.circuit.state = CBState.CLOSED - except (ConnectionError, TimeoutError, socket.timeout) as e: + except (ConnectionError, TimeoutError, socket.timeout, ConnectionRefusedError) as e: if database.circuit.state != CBState.OPEN: database.circuit.state = CBState.OPEN is_healthy = False diff --git a/tests/test_scenario/test_active_active.py b/tests/test_scenario/test_active_active.py index b92707ba7e..61ad847bd0 100644 --- a/tests/test_scenario/test_active_active.py +++ b/tests/test_scenario/test_active_active.py @@ -66,6 +66,9 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector assert r_multi_db.get('key') == 'value' sleep(0.1) + # Timeout so cluster could recover from network failure. + sleep(2) + @pytest.mark.parametrize( "r_multi_db", [ @@ -92,6 +95,9 @@ def test_multi_db_client_throws_error_on_retry_exceed(self, r_multi_db, fault_in assert r_multi_db.get('key') == 'value' sleep(0.1) + # Timeout so cluster could recover from network failure. + sleep(2) + class TestActiveActiveStandalonePipeline: @pytest.mark.parametrize( "r_multi_db", @@ -138,6 +144,9 @@ def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) + # Timeout so cluster could recover from network failure. + sleep(2) + @pytest.mark.parametrize( "r_multi_db", [ @@ -183,6 +192,9 @@ def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_inject assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) + # Timeout so cluster could recover from network failure. + sleep(2) + @pytest.mark.parametrize( "r_multi_db", [ @@ -216,6 +228,9 @@ def test_pipeline_client_throws_error_on_retry_exceed(self, r_multi_db, fault_in assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) + # Timeout so cluster could recover from network failure. + sleep(2) + class TestActiveActiveStandaloneTransaction: @pytest.mark.parametrize( "r_multi_db", @@ -256,6 +271,9 @@ def callback(pipe: Pipeline): r_multi_db.transaction(callback) sleep(0.1) + # Timeout so cluster could recover from network failure. + sleep(2) + @pytest.mark.parametrize( "r_multi_db", [ From 7fa7c077bae79055dac977be95404df327bf3f73 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Thu, 24 Jul 2025 16:19:31 +0300 Subject: [PATCH 7/9] Increased timeouts --- tests/test_scenario/test_active_active.py | 41 ++++++++++++----------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/tests/test_scenario/test_active_active.py b/tests/test_scenario/test_active_active.py index 61ad847bd0..15732fb913 100644 --- a/tests/test_scenario/test_active_active.py +++ b/tests/test_scenario/test_active_active.py @@ -58,16 +58,16 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector assert r_multi_db.get('key') == 'value' sleep(0.1) - # Active db has been changed. - assert current_active_db != r_multi_db.command_executor.active_database - # Execute commands after network failure for _ in range(3): assert r_multi_db.get('key') == 'value' sleep(0.1) + # Active db has been changed. + assert current_active_db != r_multi_db.command_executor.active_database + # Timeout so cluster could recover from network failure. - sleep(2) + sleep(3) @pytest.mark.parametrize( "r_multi_db", @@ -96,7 +96,7 @@ def test_multi_db_client_throws_error_on_retry_exceed(self, r_multi_db, fault_in sleep(0.1) # Timeout so cluster could recover from network failure. - sleep(2) + sleep(3) class TestActiveActiveStandalonePipeline: @pytest.mark.parametrize( @@ -129,9 +129,6 @@ def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) - # Active db has been changed. - assert current_active_db != r_multi_db.command_executor.active_database - # Execute pipeline after network failure for _ in range(3): with r_multi_db.pipeline() as pipe: @@ -144,8 +141,11 @@ def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) + # Active db has been changed. + assert current_active_db != r_multi_db.command_executor.active_database + # Timeout so cluster could recover from network failure. - sleep(2) + sleep(3) @pytest.mark.parametrize( "r_multi_db", @@ -178,9 +178,6 @@ def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_inject assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) - # Active db has been changed. - assert current_active_db != r_multi_db.command_executor.active_database - # Execute pipeline after network failure for _ in range(3): pipe.set('{hash}key1', 'value1') @@ -192,8 +189,11 @@ def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_inject assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) + # Active db has been changed. + assert current_active_db != r_multi_db.command_executor.active_database + # Timeout so cluster could recover from network failure. - sleep(2) + sleep(3) @pytest.mark.parametrize( "r_multi_db", @@ -229,7 +229,7 @@ def test_pipeline_client_throws_error_on_retry_exceed(self, r_multi_db, fault_in sleep(0.1) # Timeout so cluster could recover from network failure. - sleep(2) + sleep(3) class TestActiveActiveStandaloneTransaction: @pytest.mark.parametrize( @@ -263,16 +263,16 @@ def callback(pipe: Pipeline): r_multi_db.transaction(callback) sleep(0.1) - # Active db has been changed. - assert current_active_db != r_multi_db.command_executor.active_database - # Execute pipeline after network failure for _ in range(3): r_multi_db.transaction(callback) sleep(0.1) + # Active db has been changed. + assert current_active_db != r_multi_db.command_executor.active_database + # Timeout so cluster could recover from network failure. - sleep(2) + sleep(3) @pytest.mark.parametrize( "r_multi_db", @@ -306,4 +306,7 @@ def callback(pipe: Pipeline): # Retries count < failure threshold, so a client gives up earlier. while not event.is_set(): r_multi_db.transaction(callback) - sleep(0.1) \ No newline at end of file + sleep(0.1) + + # Timeout so cluster could recover from network failure. + sleep(3) \ No newline at end of file From 2cb8cac18a0936772b082297802ee68c08052541 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Fri, 25 Jul 2025 11:57:57 +0300 Subject: [PATCH 8/9] Refactored integration tests --- tests/test_scenario/test_active_active.py | 178 +++++----------------- 1 file changed, 36 insertions(+), 142 deletions(-) diff --git a/tests/test_scenario/test_active_active.py b/tests/test_scenario/test_active_active.py index 15732fb913..2b9bfc7e74 100644 --- a/tests/test_scenario/test_active_active.py +++ b/tests/test_scenario/test_active_active.py @@ -17,7 +17,7 @@ def trigger_network_failure_action(fault_injector_client, event: threading.Event endpoint_config = get_endpoint_config('re-active-active') action_request = ActionRequest( action_type=ActionType.NETWORK_FAILURE, - parameters={"bdb_id": endpoint_config['bdb_id'], "delay": 3, "cluster_index": 0} + parameters={"bdb_id": endpoint_config['bdb_id'], "delay": 1, "cluster_index": 0} ) result = fault_injector_client.trigger_action(action_request) @@ -34,6 +34,11 @@ def trigger_network_failure_action(fault_injector_client, event: threading.Event logger.info(f"Action completed. Status: {status_result['status']}") class TestActiveActiveStandalone: + + def teardown_method(self, method): + # Timeout so the cluster could recover from network failure. + sleep(3) + @pytest.mark.parametrize( "r_multi_db", [ @@ -48,10 +53,10 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector daemon=True, args=(fault_injector_client,event) ) - thread.start() + # Client initialized on the first command. r_multi_db.set('key', 'value') - current_active_db = r_multi_db.command_executor.active_database + thread.start() # Execute commands before network failure while not event.is_set(): @@ -63,42 +68,6 @@ def test_multi_db_client_failover_to_another_db(self, r_multi_db, fault_injector assert r_multi_db.get('key') == 'value' sleep(0.1) - # Active db has been changed. - assert current_active_db != r_multi_db.command_executor.active_database - - # Timeout so cluster could recover from network failure. - sleep(3) - - @pytest.mark.parametrize( - "r_multi_db", - [ - { - "failure_threshold": 15, - "command_retry": Retry(NoBackoff(), retries=5), - "health_check_interval": 100, - } - ], - indirect=True - ) - def test_multi_db_client_throws_error_on_retry_exceed(self, r_multi_db, fault_injector_client): - event = threading.Event() - thread = threading.Thread( - target=trigger_network_failure_action, - daemon=True, - args=(fault_injector_client,event) - ) - thread.start() - - with pytest.raises(ConnectionError): - # Retries count < failure threshold, so a client gives up earlier. - while not event.is_set(): - assert r_multi_db.get('key') == 'value' - sleep(0.1) - - # Timeout so cluster could recover from network failure. - sleep(3) - -class TestActiveActiveStandalonePipeline: @pytest.mark.parametrize( "r_multi_db", [ @@ -113,9 +82,18 @@ def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault daemon=True, args=(fault_injector_client,event) ) - thread.start() - current_active_db = r_multi_db.command_executor.active_database + # Client initialized on first pipe execution. + with r_multi_db.pipeline() as pipe: + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] + + thread.start() # Execute pipeline before network failure while not event.is_set(): @@ -141,12 +119,6 @@ def test_context_manager_pipeline_failover_to_another_db(self, r_multi_db, fault assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] sleep(0.1) - # Active db has been changed. - assert current_active_db != r_multi_db.command_executor.active_database - - # Timeout so cluster could recover from network failure. - sleep(3) - @pytest.mark.parametrize( "r_multi_db", [ @@ -161,11 +133,18 @@ def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_inject daemon=True, args=(fault_injector_client,event) ) - thread.start() - - current_active_db = r_multi_db.command_executor.active_database + # Client initialized on first pipe execution. pipe = r_multi_db.pipeline() + pipe.set('{hash}key1', 'value1') + pipe.set('{hash}key2', 'value2') + pipe.set('{hash}key3', 'value3') + pipe.get('{hash}key1') + pipe.get('{hash}key2') + pipe.get('{hash}key3') + assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] + + thread.start() # Execute pipeline before network failure while not event.is_set(): @@ -176,7 +155,7 @@ def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_inject pipe.get('{hash}key2') pipe.get('{hash}key3') assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] - sleep(0.1) + sleep(0.1) # Execute pipeline after network failure for _ in range(3): @@ -187,51 +166,8 @@ def test_chaining_pipeline_failover_to_another_db(self, r_multi_db, fault_inject pipe.get('{hash}key2') pipe.get('{hash}key3') assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] - sleep(0.1) - - # Active db has been changed. - assert current_active_db != r_multi_db.command_executor.active_database - - # Timeout so cluster could recover from network failure. - sleep(3) - - @pytest.mark.parametrize( - "r_multi_db", - [ - { - "failure_threshold": 15, - "command_retry": Retry(NoBackoff(), retries=5), - "health_check_interval": 100, - } - ], - indirect=True - ) - def test_pipeline_client_throws_error_on_retry_exceed(self, r_multi_db, fault_injector_client): - event = threading.Event() - thread = threading.Thread( - target=trigger_network_failure_action, - daemon=True, - args=(fault_injector_client,event) - ) - thread.start() - - with pytest.raises(ConnectionError): - # Retries count < failure threshold, so a client gives up earlier. - while not event.is_set(): - with r_multi_db.pipeline() as pipe: - pipe.set('{hash}key1', 'value1') - pipe.set('{hash}key2', 'value2') - pipe.set('{hash}key3', 'value3') - pipe.get('{hash}key1') - pipe.get('{hash}key2') - pipe.get('{hash}key3') - assert pipe.execute() == [True, True, True, 'value1', 'value2', 'value3'] - sleep(0.1) - - # Timeout so cluster could recover from network failure. - sleep(3) + sleep(0.1) -class TestActiveActiveStandaloneTransaction: @pytest.mark.parametrize( "r_multi_db", [ @@ -246,9 +182,6 @@ def test_transaction_failover_to_another_db(self, r_multi_db, fault_injector_cli daemon=True, args=(fault_injector_client,event) ) - thread.start() - - current_active_db = r_multi_db.command_executor.active_database def callback(pipe: Pipeline): pipe.set('{hash}key1', 'value1') @@ -258,6 +191,10 @@ def callback(pipe: Pipeline): pipe.get('{hash}key2') pipe.get('{hash}key3') + # Client initialized on first transaction execution. + r_multi_db.transaction(callback) + thread.start() + # Execute pipeline before network failure while not event.is_set(): r_multi_db.transaction(callback) @@ -266,47 +203,4 @@ def callback(pipe: Pipeline): # Execute pipeline after network failure for _ in range(3): r_multi_db.transaction(callback) - sleep(0.1) - - # Active db has been changed. - assert current_active_db != r_multi_db.command_executor.active_database - - # Timeout so cluster could recover from network failure. - sleep(3) - - @pytest.mark.parametrize( - "r_multi_db", - [ - { - "failure_threshold": 15, - "command_retry": Retry(NoBackoff(), retries=5), - "health_check_interval": 100, - } - ], - indirect=True - ) - def test_transaction_client_throws_error_on_retry_exceed(self, r_multi_db, fault_injector_client): - event = threading.Event() - thread = threading.Thread( - target=trigger_network_failure_action, - daemon=True, - args=(fault_injector_client,event) - ) - thread.start() - - def callback(pipe: Pipeline): - pipe.set('{hash}key1', 'value1') - pipe.set('{hash}key2', 'value2') - pipe.set('{hash}key3', 'value3') - pipe.get('{hash}key1') - pipe.get('{hash}key2') - pipe.get('{hash}key3') - - with pytest.raises(ConnectionError): - # Retries count < failure threshold, so a client gives up earlier. - while not event.is_set(): - r_multi_db.transaction(callback) - sleep(0.1) - - # Timeout so cluster could recover from network failure. - sleep(3) \ No newline at end of file + sleep(0.1) \ No newline at end of file From ed93cfc362474aa5fee789c309db54cbe0a42d22 Mon Sep 17 00:00:00 2001 From: vladvildanov Date: Thu, 31 Jul 2025 11:47:22 +0300 Subject: [PATCH 9/9] Fixed property name --- redis/event.py | 2 +- redis/multidb/event.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/redis/event.py b/redis/event.py index bb40c32f34..1fa66f0587 100644 --- a/redis/event.py +++ b/redis/event.py @@ -264,7 +264,7 @@ def __init__( self._exception = exception @property - def command(self) -> tuple: + def commands(self) -> tuple: return self._commands @property diff --git a/redis/multidb/event.py b/redis/multidb/event.py index e86ee15358..315802e812 100644 --- a/redis/multidb/event.py +++ b/redis/multidb/event.py @@ -13,4 +13,4 @@ def __init__(self, failure_detectors: List[FailureDetector]): def listen(self, event: OnCommandsFailEvent) -> None: for failure_detector in self._failure_detectors: - failure_detector.register_failure(event.exception, event.command) + failure_detector.register_failure(event.exception, event.commands)