Skip to content

Add support for ZMPOP #1852

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 14 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 12 additions & 63 deletions redis/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -387,8 +387,7 @@ def __init__(
port=6379,
startup_nodes=None,
cluster_error_retry_attempts=3,
require_full_coverage=True,
skip_full_coverage_check=False,
require_full_coverage=False,
reinitialize_steps=10,
read_from_replicas=False,
url=None,
Expand All @@ -404,16 +403,15 @@ def __init__(
:port: 'int'
Can be used to point to a startup node
:require_full_coverage: 'bool'
If set to True, as it is by default, all slots must be covered.
If set to False and not all slots are covered, the instance
creation will succeed only if 'cluster-require-full-coverage'
configuration is set to 'no' in all of the cluster's nodes.
Otherwise, RedisClusterException will be thrown.
:skip_full_coverage_check: 'bool'
If require_full_coverage is set to False, a check of
cluster-require-full-coverage config will be executed against all
nodes. Set skip_full_coverage_check to True to skip this check.
Useful for clusters without the CONFIG command (like ElastiCache)
When set to False (default value): the client will not require a
full coverage of the slots. However, if not all slots are covered,
and at least one node has 'cluster-require-full-coverage' set to
'yes,' the server will throw a ClusterDownError for some key-based
commands. See -
https://redis.io/topics/cluster-tutorial#redis-cluster-configuration-parameters
When set to True: all slots must be covered to construct the
cluster client. If not all slots are covered, RedisClusterException
will be thrown.
:read_from_replicas: 'bool'
Enable read from replicas in READONLY mode. You can read possibly
stale data.
Expand Down Expand Up @@ -510,7 +508,6 @@ def __init__(
startup_nodes=startup_nodes,
from_url=from_url,
require_full_coverage=require_full_coverage,
skip_full_coverage_check=skip_full_coverage_check,
**kwargs,
)

Expand Down Expand Up @@ -1111,8 +1108,7 @@ def __init__(
self,
startup_nodes,
from_url=False,
require_full_coverage=True,
skip_full_coverage_check=False,
require_full_coverage=False,
lock=None,
**kwargs,
):
Expand All @@ -1123,7 +1119,6 @@ def __init__(
self.populate_startup_nodes(startup_nodes)
self.from_url = from_url
self._require_full_coverage = require_full_coverage
self._skip_full_coverage_check = skip_full_coverage_check
self._moved_exception = None
self.connection_kwargs = kwargs
self.read_load_balancer = LoadBalancer()
Expand Down Expand Up @@ -1249,32 +1244,6 @@ def populate_startup_nodes(self, nodes):
for n in nodes:
self.startup_nodes[n.name] = n

def cluster_require_full_coverage(self, cluster_nodes):
"""
if exists 'cluster-require-full-coverage no' config on redis servers,
then even all slots are not covered, cluster still will be able to
respond
"""

def node_require_full_coverage(node):
try:
return (
"yes"
in node.redis_connection.config_get(
"cluster-require-full-coverage"
).values()
)
except ConnectionError:
return False
except Exception as e:
raise RedisClusterException(
'ERROR sending "config get cluster-require-full-coverage"'
f" command to redis server: {node.name}, {e}"
)

# at least one node should have cluster-require-full-coverage yes
return any(node_require_full_coverage(node) for node in cluster_nodes.values())

def check_slots_coverage(self, slots_cache):
# Validate if all slots are covered or if we should try next
# startup node
Expand Down Expand Up @@ -1450,29 +1419,9 @@ def initialize(self):
# isn't a full coverage
raise RedisClusterException(
f"All slots are not covered after query all startup_nodes. "
f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} "
f"{len(tmp_slots)} of {REDIS_CLUSTER_HASH_SLOTS} "
f"covered..."
)
elif not fully_covered and not self._require_full_coverage:
# The user set require_full_coverage to False.
# In case of full coverage requirement in the cluster's Redis
# configurations, we will raise an exception. Otherwise, we may
# continue with partial coverage.
# see Redis Cluster configuration parameters in
# https://redis.io/topics/cluster-tutorial
if (
not self._skip_full_coverage_check
and self.cluster_require_full_coverage(tmp_nodes_cache)
):
raise RedisClusterException(
"Not all slots are covered but the cluster's "
"configuration requires full coverage. Set "
"cluster-require-full-coverage configuration to no on "
"all of the cluster nodes if you wish the cluster to "
"be able to serve without being fully covered."
f"{len(self.slots_cache)} of {REDIS_CLUSTER_HASH_SLOTS} "
f"covered..."
)

# Set the tmp variables to the real variables
self.nodes_cache = tmp_nodes_cache
Expand Down
27 changes: 27 additions & 0 deletions redis/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import hashlib
import time
import warnings
from typing import List, Optional

from redis.exceptions import ConnectionError, DataError, NoScriptError, RedisError

Expand Down Expand Up @@ -3255,6 +3256,32 @@ def bzpopmin(self, keys, timeout=0):
keys.append(timeout)
return self.execute_command("BZPOPMIN", *keys)

def zmpop(
self,
num_keys: int,
keys: List[str],
min: Optional[bool] = False,
max: Optional[bool] = False,
count: Optional[int] = 1,
) -> list:
"""
Pop ``count`` values (default 1) off of the first non-empty sorted set
named in the ``keys`` list.

For more information check https://redis.io/commands/zmpop
"""
args = [num_keys] + keys
if (min and max) or (not min and not max):
raise DataError
elif min:
args.append("MIN")
else:
args.append("MAX")
if count != 1:
args.extend(["COUNT", count])

return self.execute_command("ZMPOP", *args)

def _zrange(
self,
command,
Expand Down
4 changes: 2 additions & 2 deletions redis/commands/graph/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def query(self, q, params=None, timeout=None, read_only=False, profile=False):

Args:

q :
q : str
The query.
params : dict
Query parameters.
Expand Down Expand Up @@ -178,7 +178,7 @@ def config(self, name, value=None, set=False):
name : str
The name of the configuration
value :
The value we want to ser (can be used only when `set` is on)
The value we want to set (can be used only when `set` is on)
set : bool
Turn on to set a configuration. Default behavior is get.
"""
Expand Down
3 changes: 1 addition & 2 deletions redis/commands/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ def _get_pubsub_keys(self, *args):
return None
args = [str_if_bytes(arg) for arg in args]
command = args[0].upper()
keys = None
if command == "PUBSUB":
# the second argument is a part of the command name, e.g.
# ['PUBSUB', 'NUMSUB', 'foo'].
Expand All @@ -117,6 +118,4 @@ def _get_pubsub_keys(self, *args):
# format example:
# PUBLISH channel message
keys = [args[1]]
else:
keys = None
return keys
78 changes: 58 additions & 20 deletions redis/commands/search/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,12 @@

NOOFFSETS = "NOOFFSETS"
NOFIELDS = "NOFIELDS"
NOHL = "NOHL"
NOFREQS = "NOFREQS"
MAXTEXTFIELDS = "MAXTEXTFIELDS"
TEMPORARY = "TEMPORARY"
STOPWORDS = "STOPWORDS"
SKIPINITIALSCAN = "SKIPINITIALSCAN"
WITHSCORES = "WITHSCORES"
FUZZY = "FUZZY"
WITHPAYLOADS = "WITHPAYLOADS"
Expand All @@ -66,27 +71,57 @@ def create_index(
no_field_flags=False,
stopwords=None,
definition=None,
max_text_fields=False,
temporary=None,
no_highlight=False,
no_term_frequencies=False,
skip_initial_scan=False,
):
"""
Create the search index. The index must not already exist.

### Parameters:

- **fields**: a list of TextField or NumericField objects
- **no_term_offsets**: If true, we will not save term offsets in the index
- **no_field_flags**: If true, we will not save field flags that allow searching in specific fields
- **stopwords**: If not None, we create the index with this custom stopword list. The list can be empty
- **no_term_offsets**: If true, we will not save term offsets in
the index
- **no_field_flags**: If true, we will not save field flags that
allow searching in specific fields
- **stopwords**: If not None, we create the index with this custom
stopword list. The list can be empty
- **max_text_fields**: If true, we will encode indexes as if there
were more than 32 text fields which allows you to add additional
fields (beyond 32).
- **temporary**: Create a lightweight temporary index which will
expire after the specified period of inactivity (in seconds). The
internal idle timer is reset whenever the index is searched or added to.
- **no_highlight**: If true, disabling highlighting support.
Also implied by no_term_offsets.
- **no_term_frequencies**: If true, we avoid saving the term frequencies
in the index.
- **skip_initial_scan**: If true, we do not scan and index.

For more information: https://oss.redis.com/redisearch/Commands/#ftcreate
""" # noqa

args = [CREATE_CMD, self.index_name]
if definition is not None:
args += definition.args
if max_text_fields:
args.append(MAXTEXTFIELDS)
if temporary is not None and isinstance(temporary, int):
args.append(TEMPORARY)
args.append(temporary)
if no_term_offsets:
args.append(NOOFFSETS)
if no_highlight:
args.append(NOHL)
if no_field_flags:
args.append(NOFIELDS)
if no_term_frequencies:
args.append(NOFREQS)
if skip_initial_scan:
args.append(SKIPINITIALSCAN)
if stopwords is not None and isinstance(stopwords, (list, tuple, set)):
args += [STOPWORDS, len(stopwords)]
if len(stopwords) > 0:
Expand Down Expand Up @@ -129,7 +164,6 @@ def dropindex(self, delete_documents=False):
### Parameters:

- **delete_documents**: If `True`, all documents will be deleted.

For more information: https://oss.redis.com/redisearch/Commands/#ftdropindex
""" # noqa
keep_str = "" if delete_documents else "KEEPDOCS"
Expand Down Expand Up @@ -217,23 +251,27 @@ def add_document(
### Parameters

- **doc_id**: the id of the saved document.
- **nosave**: if set to true, we just index the document, and don't \
save a copy of it. This means that searches will just return ids.
- **score**: the document ranking, between 0.0 and 1.0.
- **payload**: optional inner-index payload we can save for fast access in scoring functions
- **replace**: if True, and the document already is in the index, \
- **nosave**: if set to true, we just index the document, and don't
save a copy of it. This means that searches will just
return ids.
- **score**: the document ranking, between 0.0 and 1.0
- **payload**: optional inner-index payload we can save for fast
i access in scoring functions
- **replace**: if True, and the document already is in the index,
we perform an update and reindex the document
- **partial**: if True, the fields specified will be added to the \
existing document. \
This has the added benefit that any fields specified \
with `no_index` will not be reindexed again. Implies `replace`
- **partial**: if True, the fields specified will be added to the
existing document.
This has the added benefit that any fields specified
with `no_index`
will not be reindexed again. Implies `replace`
- **language**: Specify the language used for document tokenization.
- **no_create**: if True, the document is only updated and reindexed \
if it already exists. If the document does not exist, an error will be \
returned. Implies `replace`
- **fields** kwargs dictionary of the document fields to be saved and/or indexed.

NOTE: Geo points shoule be encoded as strings of "lon,lat"
- **no_create**: if True, the document is only updated and reindexed
if it already exists.
If the document does not exist, an error will be
returned. Implies `replace`
- **fields** kwargs dictionary of the document fields to be saved
and/or indexed.
NOTE: Geo points shoule be encoded as strings of "lon,lat"

For more information: https://oss.redis.com/redisearch/Commands/#ftadd
""" # noqa
Expand Down Expand Up @@ -481,7 +519,7 @@ def spellcheck(self, query, distance=None, include=None, exclude=None):

**query**: search query.
**distance***: the maximal Levenshtein distance for spelling
suggestions (default: 1, max: 4).
suggestions (default: 1, max: 4).
**include**: specifies an inclusion custom dictionary.
**exclude**: specifies an exclusion custom dictionary.

Expand Down
2 changes: 1 addition & 1 deletion redis/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ def get_connection(self, command_name, *keys, **options):
try:
if connection.can_read():
raise ConnectionError("Connection has data")
except ConnectionError:
except (ConnectionError, OSError):
connection.disconnect()
connection.connect()
if connection.can_read():
Expand Down
4 changes: 2 additions & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
deprecated
packaging
deprecated>=1.2.3
packaging>=20.4
8 changes: 7 additions & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,18 @@
]
),
url="https://github.com/redis/redis-py",
project_urls={
"Documentation": "https://redis.readthedocs.io/en/latest/",
"Changes": "https://github.com/redis/redis-py/releases",
"Code": "https://github.com/redis/redis-py",
"Issue tracker": "https://github.com/redis/redis-py/issues",
},
author="Redis Inc.",
author_email="[email protected]",
python_requires=">=3.6",
install_requires=[
"deprecated>=1.2.3",
"packaging>=21.3",
"packaging>=20.4",
'importlib-metadata >= 1.0; python_version < "3.8"',
],
classifiers=[
Expand Down
Loading