Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -761,9 +761,14 @@ def next_command(self):
client_port = client_info[5:]
client_type = "unix"
else:
# use rsplit as ipv6 addresses contain colons
client_address, client_port = client_info.rsplit(":", 1)
client_type = "tcp"
if client_info == "":
client_address = ""
client_port = ""
client_type = "unknown"
else:
# use rsplit as ipv6 addresses contain colons
client_address, client_port = client_info.rsplit(":", 1)
client_type = "tcp"
return {
"time": float(command_time),
"db": int(db_id),
Expand Down
12 changes: 6 additions & 6 deletions tests/test_asyncio/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1321,7 +1321,7 @@ async def test_msetex_expiration_pxat_and_nx_with_cluster_client(self, r):
for ttl in old_ttls:
assert 10 < ttl <= 30
for ttl in new_ttls:
assert ttl <= 10
assert ttl <= 11
assert await r.mget(*mapping.keys(), "new:{test:1}", "new_2:{test:1}") == [
b"1",
b"2",
Expand Down Expand Up @@ -1370,8 +1370,8 @@ async def test_msetex_expiration_exat_and_xx_with_cluster_client(self, r):
== 1
)
ttls = await asyncio.gather(*[r.ttl(key) for key in mapping.keys()])
assert ttls[0] <= 10
assert ttls[1] <= 10
assert ttls[0] <= 11
assert ttls[1] <= 11
assert 10 < ttls[2] <= 30
assert await r.mget(
"1:{test:1}", "2:{test:1}", "3:{test:1}", "new:{test:1}"
Expand Down Expand Up @@ -1483,7 +1483,7 @@ async def test_msetex_expiration_pxat_and_nx(self, r):
for ttl in old_ttls:
assert 10 < ttl <= 30
for ttl in new_ttls:
assert ttl <= 10
assert ttl <= 11
assert await r.mget(*mapping.keys(), "new", "new_2") == [
b"1",
b"2",
Expand Down Expand Up @@ -1527,8 +1527,8 @@ async def test_msetex_expiration_exat_and_xx(self, r):
== 1
)
ttls = await asyncio.gather(*[r.ttl(key) for key in mapping.keys()])
assert ttls[0] <= 10
assert ttls[1] <= 10
assert ttls[0] <= 11
assert ttls[1] <= 11
assert 10 < ttls[2] <= 30
assert await r.mget("1", "2", "3", "new") == [
b"new_value",
Expand Down
86 changes: 43 additions & 43 deletions tests/test_asyncio/test_multidb/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ async def test_execute_command_against_correct_db_on_successful_initialization(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db1.client.execute_command = AsyncMock(return_value="OK1")

mock_hc.check_health.return_value = True
Expand Down Expand Up @@ -71,9 +69,7 @@ async def test_execute_command_against_correct_db_and_closed_circuit(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db1.client.execute_command = AsyncMock(return_value="OK1")

mock_hc.check_health.side_effect = [
Expand All @@ -88,7 +84,8 @@ async def test_execute_command_against_correct_db_and_closed_circuit(

client = MultiDBClient(mock_multi_db_config)
assert mock_multi_db_config.failover_strategy.set_databases.call_count == 1
assert await client.set("key", "value") == "OK1"
result = await client.set("key", "value")
assert result == "OK1"
assert mock_hc.check_health.call_count == 7

assert mock_db.circuit.state == CBState.CLOSED
Expand Down Expand Up @@ -185,9 +182,7 @@ async def mock_check_health(database):
mock_hc.check_health.side_effect = mock_check_health
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db.client.execute_command.return_value = "OK"
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"
Expand All @@ -201,6 +196,7 @@ async def mock_check_health(database):
assert await db1_became_unhealthy.wait(), (
"Timeout waiting for mock_db1 to become unhealthy"
)

await asyncio.sleep(0.01)

assert await client.set("key", "value") == "OK2"
Expand All @@ -209,7 +205,14 @@ async def mock_check_health(database):
assert await db2_became_unhealthy.wait(), (
"Timeout waiting for mock_db2 to become unhealthy"
)
await asyncio.sleep(0.01)

# Wait for circuit breaker state to actually reflect the unhealthy status
# (instead of just sleeping)
max_retries = 20
for _ in range(max_retries):
if cb2.state == CBState.OPEN: # Circuit is open (unhealthy)
break
await asyncio.sleep(0.01)

assert await client.set("key", "value") == "OK"

Expand Down Expand Up @@ -258,9 +261,7 @@ async def mock_check_health(database):
mock_hc.check_health.side_effect = mock_check_health
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db.client.execute_command.return_value = "OK"
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"
Expand All @@ -271,6 +272,14 @@ async def mock_check_health(database):
async with MultiDBClient(mock_multi_db_config) as client:
assert await client.set("key", "value") == "OK1"
await error_event.wait()
# Wait for circuit breaker to actually open (not just the event)
max_retries = 20
for _ in range(max_retries):
if mock_db1.circuit.state == CBState.OPEN: # Circuit is open
break
await asyncio.sleep(0.01)

# Now the failover strategy will select mock_db2
assert await client.set("key", "value") == "OK2"
await asyncio.sleep(0.5)
assert await client.set("key", "value") == "OK1"
Expand Down Expand Up @@ -312,9 +321,7 @@ async def mock_check_health(database):
mock_hc.check_health.side_effect = mock_check_health
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db.client.execute_command.return_value = "OK"
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"
Expand All @@ -325,6 +332,15 @@ async def mock_check_health(database):
async with MultiDBClient(mock_multi_db_config) as client:
assert await client.set("key", "value") == "OK1"
await error_event.wait()
# Wait for circuit breaker state to actually reflect the unhealthy status
# (instead of just sleeping)
max_retries = 20
for _ in range(max_retries):
if (
mock_db1.circuit.state == CBState.OPEN
): # Circuit is open (unhealthy)
break
await asyncio.sleep(0.01)
assert await client.set("key", "value") == "OK2"
await asyncio.sleep(0.5)
assert await client.set("key", "value") == "OK2"
Expand All @@ -348,9 +364,7 @@ async def test_execute_command_throws_exception_on_failed_initialization(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_hc.check_health.return_value = False

client = MultiDBClient(mock_multi_db_config)
Expand Down Expand Up @@ -382,9 +396,7 @@ async def test_add_database_throws_exception_on_same_database(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_hc.check_health.return_value = False

client = MultiDBClient(mock_multi_db_config)
Expand Down Expand Up @@ -413,9 +425,7 @@ async def test_add_database_makes_new_database_active(
databases = create_weighted_list(mock_db, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"

Expand Down Expand Up @@ -451,9 +461,7 @@ async def test_remove_highest_weighted_database(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"

Expand Down Expand Up @@ -487,9 +495,7 @@ async def test_update_database_weight_to_be_highest(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db1.client.execute_command.return_value = "OK1"
mock_db2.client.execute_command.return_value = "OK2"

Expand Down Expand Up @@ -525,9 +531,7 @@ async def test_add_new_failure_detector(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db1.client.execute_command.return_value = "OK1"
mock_multi_db_config.event_dispatcher = EventDispatcher()
mock_fd = mock_multi_db_config.failure_detectors[0]
Expand All @@ -546,7 +550,7 @@ async def test_add_new_failure_detector(
assert mock_hc.check_health.call_count == 9

# Simulate failing command events that lead to a failure detection
for i in range(5):
for _ in range(5):
await mock_multi_db_config.event_dispatcher.dispatch_async(
command_fail_event
)
Expand All @@ -557,7 +561,7 @@ async def test_add_new_failure_detector(
client.add_failure_detector(another_fd)

# Simulate failing command events that lead to a failure detection
for i in range(5):
for _ in range(5):
await mock_multi_db_config.event_dispatcher.dispatch_async(
command_fail_event
)
Expand All @@ -584,9 +588,7 @@ async def test_add_new_health_check(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db1.client.execute_command.return_value = "OK1"

mock_hc.check_health.return_value = True
Expand Down Expand Up @@ -624,9 +626,7 @@ async def test_set_active_database(
databases = create_weighted_list(mock_db, mock_db1, mock_db2)
mock_multi_db_config.health_checks = [mock_hc]

with (
patch.object(mock_multi_db_config, "databases", return_value=databases),
):
with patch.object(mock_multi_db_config, "databases", return_value=databases):
mock_db1.client.execute_command.return_value = "OK1"
mock_db.client.execute_command.return_value = "OK"

Expand Down
2 changes: 1 addition & 1 deletion tests/test_asyncio/test_vsets.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ async def test_vsim_truth_no_thread_enabled(d_client):
elements_count = 5000
vector_dim = 50
for i in range(1, elements_count + 1):
float_array = [random.uniform(10 * i, 1000 * i) for x in range(vector_dim)]
float_array = [i for _ in range(vector_dim)]
await d_client.vset().vadd("myset", float_array, f"elem_{i}")

await d_client.vset().vadd("myset", [-22 for _ in range(vector_dim)], "elem_man_2")
Expand Down
13 changes: 7 additions & 6 deletions tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -1896,6 +1896,7 @@ def test_msetex_expiration_pxat_and_nx_with_cluster_client(self, r):
)
== 0
)

ttls = [r.ttl(key) for key in mapping.keys()]
for ttl in ttls:
assert 10 < ttl <= 30
Expand All @@ -1915,7 +1916,7 @@ def test_msetex_expiration_pxat_and_nx_with_cluster_client(self, r):
for ttl in old_ttls:
assert 10 < ttl <= 30
for ttl in new_ttls:
assert ttl <= 10
assert ttl <= 11
assert r.mget(*mapping.keys(), "new:{test:1}", "new_2:{test:1}") == [
b"1",
b"2",
Expand Down Expand Up @@ -1959,8 +1960,8 @@ def test_msetex_expiration_exat_and_xx_with_cluster_client(self, r):
== 1
)
ttls = [r.ttl(key) for key in mapping.keys()]
assert ttls[0] <= 10
assert ttls[1] <= 10
assert ttls[0] <= 11
assert ttls[1] <= 11
assert 10 < ttls[2] <= 30
assert r.mget("1:{test:1}", "2:{test:1}", "3:{test:1}", "new:{test:1}") == [
b"new_value",
Expand Down Expand Up @@ -2070,7 +2071,7 @@ def test_msetex_expiration_pxat_and_nx(self, r):
for ttl in old_ttls:
assert 10 < ttl <= 30
for ttl in new_ttls:
assert ttl <= 10
assert ttl <= 11
assert r.mget(*mapping.keys(), "new", "new_2") == [
b"1",
b"2",
Expand Down Expand Up @@ -2114,8 +2115,8 @@ def test_msetex_expiration_exat_and_xx(self, r):
== 1
)
ttls = [r.ttl(key) for key in mapping.keys()]
assert ttls[0] <= 10
assert ttls[1] <= 10
assert ttls[0] <= 11
assert ttls[1] <= 11
assert 10 < ttls[2] <= 30
assert r.mget("1", "2", "3", "new") == [
b"new_value",
Expand Down
Loading