2828 TimeoutError ,
2929 TryAgainError ,
3030)
31+ from redis .lock import Lock
3132from redis .utils import (
3233 dict_merge ,
3334 list_keys_to_dict ,
@@ -227,6 +228,7 @@ class RedisCluster(RedisClusterCommands):
227228 "ACL SETUSER" ,
228229 "ACL USERS" ,
229230 "ACL WHOAMI" ,
231+ "AUTH" ,
230232 "CLIENT LIST" ,
231233 "CLIENT SETNAME" ,
232234 "CLIENT GETNAME" ,
@@ -282,19 +284,29 @@ class RedisCluster(RedisClusterCommands):
282284 "READONLY" ,
283285 "READWRITE" ,
284286 "TIME" ,
287+ "GRAPH.CONFIG" ,
285288 ],
286289 DEFAULT_NODE ,
287290 ),
288291 list_keys_to_dict (
289292 [
290293 "FLUSHALL" ,
291294 "FLUSHDB" ,
295+ "FUNCTION DELETE" ,
296+ "FUNCTION FLUSH" ,
297+ "FUNCTION LIST" ,
298+ "FUNCTION LOAD" ,
299+ "FUNCTION RESTORE" ,
292300 "SCRIPT EXISTS" ,
293301 "SCRIPT FLUSH" ,
294302 "SCRIPT LOAD" ,
295303 ],
296304 PRIMARIES ,
297305 ),
306+ list_keys_to_dict (
307+ ["FUNCTION DUMP" ],
308+ RANDOM ,
309+ ),
298310 list_keys_to_dict (
299311 [
300312 "CLUSTER COUNTKEYSINSLOT" ,
@@ -742,6 +754,76 @@ def pipeline(self, transaction=None, shard_hint=None):
742754 reinitialize_steps = self .reinitialize_steps ,
743755 )
744756
757+ def lock (
758+ self ,
759+ name ,
760+ timeout = None ,
761+ sleep = 0.1 ,
762+ blocking_timeout = None ,
763+ lock_class = None ,
764+ thread_local = True ,
765+ ):
766+ """
767+ Return a new Lock object using key ``name`` that mimics
768+ the behavior of threading.Lock.
769+
770+ If specified, ``timeout`` indicates a maximum life for the lock.
771+ By default, it will remain locked until release() is called.
772+
773+ ``sleep`` indicates the amount of time to sleep per loop iteration
774+ when the lock is in blocking mode and another client is currently
775+ holding the lock.
776+
777+ ``blocking_timeout`` indicates the maximum amount of time in seconds to
778+ spend trying to acquire the lock. A value of ``None`` indicates
779+ continue trying forever. ``blocking_timeout`` can be specified as a
780+ float or integer, both representing the number of seconds to wait.
781+
782+ ``lock_class`` forces the specified lock implementation. Note that as
783+ of redis-py 3.0, the only lock class we implement is ``Lock`` (which is
784+ a Lua-based lock). So, it's unlikely you'll need this parameter, unless
785+ you have created your own custom lock class.
786+
787+ ``thread_local`` indicates whether the lock token is placed in
788+ thread-local storage. By default, the token is placed in thread local
789+ storage so that a thread only sees its token, not a token set by
790+ another thread. Consider the following timeline:
791+
792+ time: 0, thread-1 acquires `my-lock`, with a timeout of 5 seconds.
793+ thread-1 sets the token to "abc"
794+ time: 1, thread-2 blocks trying to acquire `my-lock` using the
795+ Lock instance.
796+ time: 5, thread-1 has not yet completed. redis expires the lock
797+ key.
798+ time: 5, thread-2 acquired `my-lock` now that it's available.
799+ thread-2 sets the token to "xyz"
800+ time: 6, thread-1 finishes its work and calls release(). if the
801+ token is *not* stored in thread local storage, then
802+ thread-1 would see the token value as "xyz" and would be
803+ able to successfully release the thread-2's lock.
804+
805+ In some use cases it's necessary to disable thread local storage. For
806+ example, if you have code where one thread acquires a lock and passes
807+ that lock instance to a worker thread to release later. If thread
808+ local storage isn't disabled in this case, the worker thread won't see
809+ the token set by the thread that acquired the lock. Our assumption
810+ is that these cases aren't common and as such default to using
811+ thread local storage."""
812+ if lock_class is None :
813+ lock_class = Lock
814+ return lock_class (
815+ self ,
816+ name ,
817+ timeout = timeout ,
818+ sleep = sleep ,
819+ blocking_timeout = blocking_timeout ,
820+ thread_local = thread_local ,
821+ )
822+
823+ def set_response_callback (self , command , callback ):
824+ """Set a custom Response Callback"""
825+ self .cluster_response_callbacks [command ] = callback
826+
745827 def _determine_nodes (self , * args , ** kwargs ):
746828 command = args [0 ]
747829 nodes_flag = kwargs .pop ("nodes_flag" , None )
@@ -843,6 +925,10 @@ def determine_slot(self, *args):
843925 else :
844926 keys = self ._get_command_keys (* args )
845927 if keys is None or len (keys ) == 0 :
928+ # FCALL can call a function with 0 keys, that means the function
929+ # can be run on any node so we can just return a random slot
930+ if command in ("FCALL" , "FCALL_RO" ):
931+ return random .randrange (0 , REDIS_CLUSTER_HASH_SLOTS )
846932 raise RedisClusterException (
847933 "No way to dispatch this command to Redis Cluster. "
848934 "Missing key.\n You can execute the command by specifying "
@@ -1113,6 +1199,20 @@ def _process_result(self, command, res, **kwargs):
11131199 else :
11141200 return res
11151201
1202+ def load_external_module (
1203+ self ,
1204+ funcname ,
1205+ func ,
1206+ ):
1207+ """
1208+ This function can be used to add externally defined redis modules,
1209+ and their namespaces to the redis client.
1210+
1211+ ``funcname`` - A string containing the name of the function to create
1212+ ``func`` - The function, being added to this class.
1213+ """
1214+ setattr (self , funcname , func )
1215+
11161216
11171217class ClusterNode :
11181218 def __init__ (self , host , port , server_type = None , redis_connection = None ):
@@ -1958,7 +2058,13 @@ def _send_cluster_commands(
19582058
19592059 # turn the response back into a simple flat array that corresponds
19602060 # to the sequence of commands issued in the stack in pipeline.execute()
1961- response = [c .result for c in sorted (stack , key = lambda x : x .position )]
2061+ response = []
2062+ for c in sorted (stack , key = lambda x : x .position ):
2063+ if c .args [0 ] in self .cluster_response_callbacks :
2064+ c .result = self .cluster_response_callbacks [c .args [0 ]](
2065+ c .result , ** c .options
2066+ )
2067+ response .append (c .result )
19622068
19632069 if raise_on_error :
19642070 self .raise_first_error (stack )
@@ -1972,6 +2078,9 @@ def _fail_on_redirect(self, allow_redirections):
19722078 "ASK & MOVED redirection not allowed in this pipeline"
19732079 )
19742080
2081+ def exists (self , * keys ):
2082+ return self .execute_command ("EXISTS" , * keys )
2083+
19752084 def eval (self ):
19762085 """ """
19772086 raise RedisClusterException ("method eval() is not implemented" )
0 commit comments