diff --git a/redis/client.py b/redis/client.py index 51b699721e..d3ab3cfcfe 100755 --- a/redis/client.py +++ b/redis/client.py @@ -761,9 +761,14 @@ def next_command(self): client_port = client_info[5:] client_type = "unix" else: - # use rsplit as ipv6 addresses contain colons - client_address, client_port = client_info.rsplit(":", 1) - client_type = "tcp" + if client_info == "": + client_address = "" + client_port = "" + client_type = "unknown" + else: + # use rsplit as ipv6 addresses contain colons + client_address, client_port = client_info.rsplit(":", 1) + client_type = "tcp" return { "time": float(command_time), "db": int(db_id), diff --git a/tests/test_asyncio/test_commands.py b/tests/test_asyncio/test_commands.py index 1072779192..3a10a50b93 100644 --- a/tests/test_asyncio/test_commands.py +++ b/tests/test_asyncio/test_commands.py @@ -1321,7 +1321,7 @@ async def test_msetex_expiration_pxat_and_nx_with_cluster_client(self, r): for ttl in old_ttls: assert 10 < ttl <= 30 for ttl in new_ttls: - assert ttl <= 10 + assert ttl <= 11 assert await r.mget(*mapping.keys(), "new:{test:1}", "new_2:{test:1}") == [ b"1", b"2", @@ -1370,8 +1370,8 @@ async def test_msetex_expiration_exat_and_xx_with_cluster_client(self, r): == 1 ) ttls = await asyncio.gather(*[r.ttl(key) for key in mapping.keys()]) - assert ttls[0] <= 10 - assert ttls[1] <= 10 + assert ttls[0] <= 11 + assert ttls[1] <= 11 assert 10 < ttls[2] <= 30 assert await r.mget( "1:{test:1}", "2:{test:1}", "3:{test:1}", "new:{test:1}" @@ -1483,7 +1483,7 @@ async def test_msetex_expiration_pxat_and_nx(self, r): for ttl in old_ttls: assert 10 < ttl <= 30 for ttl in new_ttls: - assert ttl <= 10 + assert ttl <= 11 assert await r.mget(*mapping.keys(), "new", "new_2") == [ b"1", b"2", @@ -1527,8 +1527,8 @@ async def test_msetex_expiration_exat_and_xx(self, r): == 1 ) ttls = await asyncio.gather(*[r.ttl(key) for key in mapping.keys()]) - assert ttls[0] <= 10 - assert ttls[1] <= 10 + assert ttls[0] <= 11 + assert ttls[1] <= 11 assert 10 < ttls[2] <= 30 assert await r.mget("1", "2", "3", "new") == [ b"new_value", diff --git a/tests/test_asyncio/test_multidb/test_client.py b/tests/test_asyncio/test_multidb/test_client.py index e028655c38..537fdb0c82 100644 --- a/tests/test_asyncio/test_multidb/test_client.py +++ b/tests/test_asyncio/test_multidb/test_client.py @@ -36,9 +36,7 @@ async def test_execute_command_against_correct_db_on_successful_initialization( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command = AsyncMock(return_value="OK1") mock_hc.check_health.return_value = True @@ -71,9 +69,7 @@ async def test_execute_command_against_correct_db_and_closed_circuit( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command = AsyncMock(return_value="OK1") mock_hc.check_health.side_effect = [ @@ -88,7 +84,8 @@ async def test_execute_command_against_correct_db_and_closed_circuit( client = MultiDBClient(mock_multi_db_config) assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1 - assert await client.set("key", "value") == "OK1" + result = await client.set("key", "value") + assert result == "OK1" assert mock_hc.check_health.call_count == 7 assert mock_db.circuit.state == CBState.CLOSED @@ -185,9 +182,7 @@ async def mock_check_health(database): mock_hc.check_health.side_effect = mock_check_health mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db.client.execute_command.return_value = "OK" mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -201,6 +196,7 @@ async def mock_check_health(database): assert await db1_became_unhealthy.wait(), ( "Timeout waiting for mock_db1 to become unhealthy" ) + await asyncio.sleep(0.01) assert await client.set("key", "value") == "OK2" @@ -209,7 +205,14 @@ async def mock_check_health(database): assert await db2_became_unhealthy.wait(), ( "Timeout waiting for mock_db2 to become unhealthy" ) - await asyncio.sleep(0.01) + + # Wait for circuit breaker state to actually reflect the unhealthy status + # (instead of just sleeping) + max_retries = 20 + for _ in range(max_retries): + if cb2.state == CBState.OPEN: # Circuit is open (unhealthy) + break + await asyncio.sleep(0.01) assert await client.set("key", "value") == "OK" @@ -258,9 +261,7 @@ async def mock_check_health(database): mock_hc.check_health.side_effect = mock_check_health mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db.client.execute_command.return_value = "OK" mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -271,6 +272,14 @@ async def mock_check_health(database): async with MultiDBClient(mock_multi_db_config) as client: assert await client.set("key", "value") == "OK1" await error_event.wait() + # Wait for circuit breaker to actually open (not just the event) + max_retries = 20 + for _ in range(max_retries): + if mock_db1.circuit.state == CBState.OPEN: # Circuit is open + break + await asyncio.sleep(0.01) + + # Now the failover strategy will select mock_db2 assert await client.set("key", "value") == "OK2" await asyncio.sleep(0.5) assert await client.set("key", "value") == "OK1" @@ -312,9 +321,7 @@ async def mock_check_health(database): mock_hc.check_health.side_effect = mock_check_health mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db.client.execute_command.return_value = "OK" mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -325,6 +332,15 @@ async def mock_check_health(database): async with MultiDBClient(mock_multi_db_config) as client: assert await client.set("key", "value") == "OK1" await error_event.wait() + # Wait for circuit breaker state to actually reflect the unhealthy status + # (instead of just sleeping) + max_retries = 20 + for _ in range(max_retries): + if ( + mock_db1.circuit.state == CBState.OPEN + ): # Circuit is open (unhealthy) + break + await asyncio.sleep(0.01) assert await client.set("key", "value") == "OK2" await asyncio.sleep(0.5) assert await client.set("key", "value") == "OK2" @@ -348,9 +364,7 @@ async def test_execute_command_throws_exception_on_failed_initialization( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_hc.check_health.return_value = False client = MultiDBClient(mock_multi_db_config) @@ -382,9 +396,7 @@ async def test_add_database_throws_exception_on_same_database( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_hc.check_health.return_value = False client = MultiDBClient(mock_multi_db_config) @@ -413,9 +425,7 @@ async def test_add_database_makes_new_database_active( databases = create_weighted_list(mock_db, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -451,9 +461,7 @@ async def test_remove_highest_weighted_database( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -487,9 +495,7 @@ async def test_update_database_weight_to_be_highest( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -525,9 +531,7 @@ async def test_add_new_failure_detector( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_multi_db_config.event_dispatcher = EventDispatcher() mock_fd = mock_multi_db_config.failure_detectors[0] @@ -546,7 +550,7 @@ async def test_add_new_failure_detector( assert mock_hc.check_health.call_count == 9 # Simulate failing command events that lead to a failure detection - for i in range(5): + for _ in range(5): await mock_multi_db_config.event_dispatcher.dispatch_async( command_fail_event ) @@ -557,7 +561,7 @@ async def test_add_new_failure_detector( client.add_failure_detector(another_fd) # Simulate failing command events that lead to a failure detection - for i in range(5): + for _ in range(5): await mock_multi_db_config.event_dispatcher.dispatch_async( command_fail_event ) @@ -584,9 +588,7 @@ async def test_add_new_health_check( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_hc.check_health.return_value = True @@ -624,9 +626,7 @@ async def test_set_active_database( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_db.client.execute_command.return_value = "OK" diff --git a/tests/test_asyncio/test_vsets.py b/tests/test_asyncio/test_vsets.py index 896d98b192..447e474e9c 100644 --- a/tests/test_asyncio/test_vsets.py +++ b/tests/test_asyncio/test_vsets.py @@ -454,7 +454,7 @@ async def test_vsim_truth_no_thread_enabled(d_client): elements_count = 5000 vector_dim = 50 for i in range(1, elements_count + 1): - float_array = [random.uniform(10 * i, 1000 * i) for x in range(vector_dim)] + float_array = [i for _ in range(vector_dim)] await d_client.vset().vadd("myset", float_array, f"elem_{i}") await d_client.vset().vadd("myset", [-22 for _ in range(vector_dim)], "elem_man_2") diff --git a/tests/test_commands.py b/tests/test_commands.py index cd427ef62f..4925329a21 100644 --- a/tests/test_commands.py +++ b/tests/test_commands.py @@ -1896,6 +1896,7 @@ def test_msetex_expiration_pxat_and_nx_with_cluster_client(self, r): ) == 0 ) + ttls = [r.ttl(key) for key in mapping.keys()] for ttl in ttls: assert 10 < ttl <= 30 @@ -1915,7 +1916,7 @@ def test_msetex_expiration_pxat_and_nx_with_cluster_client(self, r): for ttl in old_ttls: assert 10 < ttl <= 30 for ttl in new_ttls: - assert ttl <= 10 + assert ttl <= 11 assert r.mget(*mapping.keys(), "new:{test:1}", "new_2:{test:1}") == [ b"1", b"2", @@ -1959,8 +1960,8 @@ def test_msetex_expiration_exat_and_xx_with_cluster_client(self, r): == 1 ) ttls = [r.ttl(key) for key in mapping.keys()] - assert ttls[0] <= 10 - assert ttls[1] <= 10 + assert ttls[0] <= 11 + assert ttls[1] <= 11 assert 10 < ttls[2] <= 30 assert r.mget("1:{test:1}", "2:{test:1}", "3:{test:1}", "new:{test:1}") == [ b"new_value", @@ -2070,7 +2071,7 @@ def test_msetex_expiration_pxat_and_nx(self, r): for ttl in old_ttls: assert 10 < ttl <= 30 for ttl in new_ttls: - assert ttl <= 10 + assert ttl <= 11 assert r.mget(*mapping.keys(), "new", "new_2") == [ b"1", b"2", @@ -2114,8 +2115,8 @@ def test_msetex_expiration_exat_and_xx(self, r): == 1 ) ttls = [r.ttl(key) for key in mapping.keys()] - assert ttls[0] <= 10 - assert ttls[1] <= 10 + assert ttls[0] <= 11 + assert ttls[1] <= 11 assert 10 < ttls[2] <= 30 assert r.mget("1", "2", "3", "new") == [ b"new_value", diff --git a/tests/test_multidb/test_client.py b/tests/test_multidb/test_client.py index b342b4b91b..490126fdf2 100644 --- a/tests/test_multidb/test_client.py +++ b/tests/test_multidb/test_client.py @@ -1,6 +1,6 @@ import threading from time import sleep -from unittest.mock import patch, Mock +from unittest.mock import MagicMock, patch, Mock import pybreaker import pytest @@ -36,9 +36,7 @@ def test_execute_command_against_correct_db_on_successful_initialization( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_hc.check_health.return_value = True @@ -70,10 +68,8 @@ def test_execute_command_against_correct_db_and_closed_circuit( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): - mock_db1.client.execute_command.return_value = "OK1" + with patch.object(mock_multi_db_config, "databases", return_value=databases): + mock_db1.client.execute_command = MagicMock(return_value="OK1") mock_hc.check_health.side_effect = [ False, @@ -87,7 +83,8 @@ def test_execute_command_against_correct_db_and_closed_circuit( client = MultiDBClient(mock_multi_db_config) assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1 - assert client.set("key", "value") == "OK1" + result = client.set("key", "value") + assert result == "OK1" assert mock_hc.check_health.call_count == 7 assert mock_db.circuit.state == CBState.CLOSED @@ -183,9 +180,7 @@ def mock_check_health(database): mock_hc.check_health.side_effect = mock_check_health mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_multi_db_config.health_check_interval = 0.1 mock_multi_db_config.failover_strategy = WeightBasedFailoverStrategy() mock_db.client.execute_command.return_value = "OK" @@ -255,9 +250,7 @@ def mock_check_health(database): mock_hc.check_health.side_effect = mock_check_health mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db.client.execute_command.return_value = "OK" mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -268,6 +261,15 @@ def mock_check_health(database): client = MultiDBClient(mock_multi_db_config) assert client.set("key", "value") == "OK1" error_event.wait(timeout=0.5) + + # Wait for circuit breaker to actually open (not just the event) + max_retries = 20 + for _ in range(max_retries): + if mock_db1.circuit.state == CBState.OPEN: # Circuit is open + break + sleep(0.01) + + # Now the failover strategy will select mock_db2 assert client.set("key", "value") == "OK2" sleep(0.5) assert client.set("key", "value") == "OK1" @@ -308,9 +310,7 @@ def mock_check_health(database): mock_hc.check_health.side_effect = mock_check_health mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db.client.execute_command.return_value = "OK" mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -321,6 +321,15 @@ def mock_check_health(database): client = MultiDBClient(mock_multi_db_config) assert client.set("key", "value") == "OK1" error_event.wait(timeout=0.5) + # Wait for circuit breaker state to actually reflect the unhealthy status + # (instead of just sleeping) + max_retries = 20 + for _ in range(max_retries): + if ( + mock_db1.circuit.state == CBState.OPEN + ): # Circuit is open (unhealthy) + break + sleep(0.01) assert client.set("key", "value") == "OK2" sleep(0.5) assert client.set("key", "value") == "OK2" @@ -343,9 +352,7 @@ def test_execute_command_throws_exception_on_failed_initialization( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_hc.check_health.return_value = False client = MultiDBClient(mock_multi_db_config) @@ -377,9 +384,7 @@ def test_add_database_throws_exception_on_same_database( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_hc.check_health.return_value = False client = MultiDBClient(mock_multi_db_config) @@ -407,9 +412,7 @@ def test_add_database_makes_new_database_active( databases = create_weighted_list(mock_db, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -444,9 +447,7 @@ def test_remove_highest_weighted_database( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -480,9 +481,7 @@ def test_update_database_weight_to_be_highest( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_db2.client.execute_command.return_value = "OK2" @@ -517,9 +516,7 @@ def test_add_new_failure_detector( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_multi_db_config.event_dispatcher = EventDispatcher() mock_fd = mock_multi_db_config.failure_detectors[0] @@ -571,9 +568,7 @@ def test_add_new_health_check( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_hc.check_health.return_value = True @@ -610,9 +605,7 @@ def test_set_active_database( databases = create_weighted_list(mock_db, mock_db1, mock_db2) mock_multi_db_config.health_checks = [mock_hc] - with ( - patch.object(mock_multi_db_config, "databases", return_value=databases), - ): + with patch.object(mock_multi_db_config, "databases", return_value=databases): mock_db1.client.execute_command.return_value = "OK1" mock_db.client.execute_command.return_value = "OK" diff --git a/tests/test_vsets.py b/tests/test_vsets.py index 36295201e7..6376eeb898 100644 --- a/tests/test_vsets.py +++ b/tests/test_vsets.py @@ -456,7 +456,7 @@ def test_vsim_truth_no_thread_enabled(d_client): elements_count = 5000 vector_dim = 50 for i in range(1, elements_count + 1): - float_array = [random.uniform(10 * i, 1000 * i) for x in range(vector_dim)] + float_array = [i for _ in range(vector_dim)] d_client.vset().vadd("myset", float_array, f"elem_{i}") d_client.vset().vadd("myset", [-22 for _ in range(vector_dim)], "elem_man_2")