|
1 | 1 | import binascii
|
2 | 2 | import datetime
|
| 3 | +import random |
3 | 4 | import select
|
4 | 5 | import socket
|
5 | 6 | import socketserver
|
|
8 | 9 | import warnings
|
9 | 10 | from queue import LifoQueue, Queue
|
10 | 11 | from time import sleep
|
| 12 | +from unittest import mock |
11 | 13 | from unittest.mock import DEFAULT, Mock, call, patch
|
12 | 14 |
|
13 | 15 | import pytest
|
|
25 | 27 | REDIS_CLUSTER_HASH_SLOTS,
|
26 | 28 | REPLICA,
|
27 | 29 | ClusterNode,
|
| 30 | + LoadBalancer, |
28 | 31 | NodesManager,
|
29 | 32 | RedisCluster,
|
30 | 33 | get_node_name,
|
@@ -927,7 +930,7 @@ def raise_error(target_node, *args, **kwargs):
|
927 | 930 | rc = get_mocked_redis_client(
|
928 | 931 | host=default_host,
|
929 | 932 | port=default_port,
|
930 |
| - retry=Retry(ConstantBackoff(1), 3), |
| 933 | + retry=Retry(ConstantBackoff(1), 10), |
931 | 934 | )
|
932 | 935 |
|
933 | 936 | with pytest.raises(error):
|
@@ -2655,6 +2658,37 @@ def test_allow_custom_queue_class(self, queue_class):
|
2655 | 2658 | for node in rc.nodes_manager.nodes_cache.values():
|
2656 | 2659 | assert node.redis_connection.connection_pool.queue_class == queue_class
|
2657 | 2660 |
|
| 2661 | + @pytest.mark.parametrize("invalid_index", [-10, 10]) |
| 2662 | + def test_return_primary_if_invalid_node_index_is_returned(self, invalid_index): |
| 2663 | + rc = get_mocked_redis_client( |
| 2664 | + url="redis://[email protected]:7000", |
| 2665 | + cluster_slots=default_cluster_slots, |
| 2666 | + ) |
| 2667 | + random_slot = random.randint( |
| 2668 | + default_cluster_slots[0][0], default_cluster_slots[0][1] |
| 2669 | + ) |
| 2670 | + |
| 2671 | + ports = set() |
| 2672 | + for _ in range(0, 10): |
| 2673 | + ports.add( |
| 2674 | + rc.nodes_manager.get_node_from_slot( |
| 2675 | + random_slot, read_from_replicas=True |
| 2676 | + ).port |
| 2677 | + ) |
| 2678 | + assert ports == {default_port, 7003} |
| 2679 | + |
| 2680 | + ports = set() |
| 2681 | + with mock.patch.object( |
| 2682 | + LoadBalancer, "get_server_index", return_value=invalid_index |
| 2683 | + ): |
| 2684 | + for _ in range(0, 10): |
| 2685 | + ports.add( |
| 2686 | + rc.nodes_manager.get_node_from_slot( |
| 2687 | + random_slot, read_from_replicas=True |
| 2688 | + ).port |
| 2689 | + ) |
| 2690 | + assert ports == {default_port} |
| 2691 | + |
2658 | 2692 |
|
2659 | 2693 | @pytest.mark.onlycluster
|
2660 | 2694 | class TestClusterPubSubObject:
|
@@ -3085,6 +3119,33 @@ def test_empty_stack(self, r):
|
3085 | 3119 | result = p.execute()
|
3086 | 3120 | assert result == []
|
3087 | 3121 |
|
| 3122 | + @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) |
| 3123 | + def test_additional_backoff_cluster_pipeline(self, r, error): |
| 3124 | + with patch.object(ConstantBackoff, "compute") as compute: |
| 3125 | + |
| 3126 | + def _compute(target_node, *args, **kwargs): |
| 3127 | + return 1 |
| 3128 | + |
| 3129 | + compute.side_effect = _compute |
| 3130 | + with patch("redis.cluster.get_connection") as get_connection: |
| 3131 | + |
| 3132 | + def raise_error(target_node, *args, **kwargs): |
| 3133 | + get_connection.failed_calls += 1 |
| 3134 | + raise error("mocked error") |
| 3135 | + |
| 3136 | + get_connection.side_effect = raise_error |
| 3137 | + |
| 3138 | + r.set_retry(Retry(ConstantBackoff(1), 10)) |
| 3139 | + pipeline = r.pipeline() |
| 3140 | + |
| 3141 | + with pytest.raises(error): |
| 3142 | + pipeline.get("bar") |
| 3143 | + pipeline.get("bar") |
| 3144 | + pipeline.execute() |
| 3145 | + # cluster pipeline does one more back off than a single Redis command |
| 3146 | + # this is not required, but it's just how it's implemented as of now |
| 3147 | + assert compute.call_count == r.cluster_error_retry_attempts + 1 |
| 3148 | + |
3088 | 3149 |
|
3089 | 3150 | @pytest.mark.onlycluster
|
3090 | 3151 | class TestReadOnlyPipeline:
|
|
0 commit comments