Skip to content

Commit 3f68e9b

Browse files
committed
Implement getting the cluster nodes list from nodes
Periodically call a user-defined Lua function on the current node to obtain or refresh the full cluster nodes list. Resolves: #134
1 parent a6b6d56 commit 3f68e9b

File tree

3 files changed

+79
-6
lines changed

3 files changed

+79
-6
lines changed

tarantool/connection.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def __init__(self, host, port,
126126
self.encoding = encoding
127127
self.call_16 = call_16
128128
self.connection_timeout = connection_timeout
129+
self.authenticated = False
129130
if connect_now:
130131
self.connect()
131132

@@ -200,9 +201,11 @@ def connect(self):
200201
:raise: `NetworkError`
201202
'''
202203
try:
204+
self.authenticated = False
203205
self.connect_basic()
204206
self.handshake()
205207
self.load_schema()
208+
self.authenticated = True
206209
except Exception as e:
207210
self.connected = False
208211
raise NetworkError(e)
@@ -370,14 +373,33 @@ def call(self, func_name, *args):
370373
371374
:rtype: `Response` instance
372375
'''
373-
assert isinstance(func_name, str)
374376

375377
# This allows to use a tuple or list as an argument
376378
if len(args) == 1 and isinstance(args[0], (list, tuple)):
377379
args = args[0]
378380

381+
return self.call_ex(func_name, args, True)
382+
383+
def call_ex(self, func_name, args=[], reconnect=True):
384+
'''
385+
Execute CALL request. Call stored Lua function.
386+
387+
:param func_name: stored Lua function name
388+
:type func_name: str
389+
:param args: list of function arguments
390+
:type args: list or tuple
391+
:param reconnect: reconnect before call
392+
:type reconnect: boolean
393+
394+
:rtype: `Response` instance
395+
'''
396+
assert isinstance(func_name, str)
397+
379398
request = RequestCall(self, func_name, args, self.call_16)
380-
response = self._send_request(request)
399+
if reconnect:
400+
response = self._send_request(request)
401+
else:
402+
response = self._send_request_wo_reconnect(request)
381403
return response
382404

383405
def eval(self, expr, *args):
@@ -391,14 +413,33 @@ def eval(self, expr, *args):
391413
392414
:rtype: `Response` instance
393415
'''
394-
assert isinstance(expr, str)
395416

396417
# This allows to use a tuple or list as an argument
397418
if len(args) == 1 and isinstance(args[0], (list, tuple)):
398419
args = args[0]
399420

421+
return self.eval_ex(expr, args, True)
422+
423+
def eval_ex(self, expr, args=[], reconnect=True):
424+
'''
425+
Execute EVAL request. Eval Lua expression.
426+
427+
:param expr: Lua expression
428+
:type expr: str
429+
:param args: list of function arguments
430+
:type args: list or tuple
431+
:param reconnect: reconnect before call
432+
:type reconnect: boolean
433+
434+
:rtype: `Response` instance
435+
'''
436+
assert isinstance(expr, str)
437+
400438
request = RequestEval(self, expr, args)
401-
response = self._send_request(request)
439+
if reconnect:
440+
response = self._send_request(request)
441+
else:
442+
response = self._send_request_wo_reconnect(request)
402443
return response
403444

404445
def replace(self, space_name, values):

tarantool/const.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,5 @@
8686
RECONNECT_MAX_ATTEMPTS = 10
8787
# Default delay between attempts to reconnect (seconds)
8888
RECONNECT_DELAY = 0.1
89+
# Default cluster nodes list refresh interval (seconds)
90+
NODES_REFRESH_INTERVAL = 300

tarantool/mesh_connection.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
between tarantool instances and basic Round-Robin strategy.
55
'''
66

7+
import time
78
from tarantool.connection import Connection
89
from tarantool.error import NetworkError
910
from tarantool.utils import ENCODING_DEFAULT
1011
from tarantool.const import (
1112
SOCKET_TIMEOUT,
1213
RECONNECT_MAX_ATTEMPTS,
13-
RECONNECT_DELAY
14+
RECONNECT_DELAY,
15+
NODES_REFRESH_INTERVAL
1416
)
1517

1618

@@ -34,12 +36,15 @@ def __init__(self, addrs,
3436
reconnect_delay=RECONNECT_DELAY,
3537
connect_now=True,
3638
encoding=ENCODING_DEFAULT,
37-
strategy_class=RoundRobinStrategy):
39+
strategy_class=RoundRobinStrategy,
40+
nodes_refresh_interval=NODES_REFRESH_INTERVAL):
3841
self.nattempts = 2 * len(addrs) + 1
3942
self.strategy = strategy_class(addrs)
4043
addr = self.strategy.getnext()
4144
host = addr['host']
4245
port = addr['port']
46+
self.nodes_refresh_interval = nodes_refresh_interval
47+
self.last_nodes_refresh = 0
4348
super(MeshConnection, self).__init__(host=host,
4449
port=port,
4550
user=user,
@@ -63,3 +68,28 @@ def _opt_reconnect(self):
6368
self.port = addr['port']
6469
else:
6570
raise NetworkError
71+
72+
if self.authenticated:
73+
now = time.time()
74+
if now - self.last_nodes_refresh > self.nodes_refresh_interval:
75+
self.refresh_nodes(now)
76+
77+
def refresh_nodes(self, cur_time):
78+
resp = super(MeshConnection, self).eval_ex('return get_nodes ~= nil',
79+
[], reconnect=False)
80+
if not (resp.data and resp.data[0]):
81+
return
82+
83+
resp = super(MeshConnection, self).call_ex('get_nodes', [],
84+
reconnect=False)
85+
86+
if not (resp.data and resp.data[0]):
87+
return
88+
89+
addrs = resp.data[0]
90+
if type(addrs) is list:
91+
self.strategy.addrs = addrs
92+
self.strategy.pos = 0
93+
self.last_nodes_refresh = cur_time
94+
if not {'host': self.host, 'port': self.port} in addrs:
95+
self._opt_reconnect()

0 commit comments

Comments
 (0)