Skip to content

Commit 7a4659b

Browse files
committed
Implement getting the cluster nodes list from nodes
Periodically call a user-defined Lua function on the current node to obtain or refresh the full cluster nodes list. Resolves: #134
1 parent a6b6d56 commit 7a4659b

File tree

4 files changed

+137
-6
lines changed

4 files changed

+137
-6
lines changed

tarantool/connection.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def __init__(self, host, port,
126126
self.encoding = encoding
127127
self.call_16 = call_16
128128
self.connection_timeout = connection_timeout
129+
self.authenticated = False
129130
if connect_now:
130131
self.connect()
131132

@@ -200,9 +201,11 @@ def connect(self):
200201
:raise: `NetworkError`
201202
'''
202203
try:
204+
self.authenticated = False
203205
self.connect_basic()
204206
self.handshake()
205207
self.load_schema()
208+
self.authenticated = True
206209
except Exception as e:
207210
self.connected = False
208211
raise NetworkError(e)
@@ -370,14 +373,33 @@ def call(self, func_name, *args):
370373
371374
:rtype: `Response` instance
372375
'''
373-
assert isinstance(func_name, str)
374376

375377
# This allows to use a tuple or list as an argument
376378
if len(args) == 1 and isinstance(args[0], (list, tuple)):
377379
args = args[0]
378380

381+
return self.call_ex(func_name, args, True)
382+
383+
def call_ex(self, func_name, args=[], reconnect=True):
384+
'''
385+
Execute CALL request. Call stored Lua function.
386+
387+
:param func_name: stored Lua function name
388+
:type func_name: str
389+
:param args: list of function arguments
390+
:type args: list or tuple
391+
:param reconnect: reconnect before call
392+
:type reconnect: boolean
393+
394+
:rtype: `Response` instance
395+
'''
396+
assert isinstance(func_name, str)
397+
379398
request = RequestCall(self, func_name, args, self.call_16)
380-
response = self._send_request(request)
399+
if reconnect:
400+
response = self._send_request(request)
401+
else:
402+
response = self._send_request_wo_reconnect(request)
381403
return response
382404

383405
def eval(self, expr, *args):
@@ -391,14 +413,33 @@ def eval(self, expr, *args):
391413
392414
:rtype: `Response` instance
393415
'''
394-
assert isinstance(expr, str)
395416

396417
# This allows to use a tuple or list as an argument
397418
if len(args) == 1 and isinstance(args[0], (list, tuple)):
398419
args = args[0]
399420

421+
return self.eval_ex(expr, args, True)
422+
423+
def eval_ex(self, expr, args=[], reconnect=True):
424+
'''
425+
Execute EVAL request. Eval Lua expression.
426+
427+
:param expr: Lua expression
428+
:type expr: str
429+
:param args: list of function arguments
430+
:type args: list or tuple
431+
:param reconnect: reconnect before call
432+
:type reconnect: boolean
433+
434+
:rtype: `Response` instance
435+
'''
436+
assert isinstance(expr, str)
437+
400438
request = RequestEval(self, expr, args)
401-
response = self._send_request(request)
439+
if reconnect:
440+
response = self._send_request(request)
441+
else:
442+
response = self._send_request_wo_reconnect(request)
402443
return response
403444

404445
def replace(self, space_name, values):

tarantool/const.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,5 @@
8686
RECONNECT_MAX_ATTEMPTS = 10
8787
# Default delay between attempts to reconnect (seconds)
8888
RECONNECT_DELAY = 0.1
89+
# Default cluster nodes list refresh interval (seconds)
90+
NODES_REFRESH_INTERVAL = 300

tarantool/mesh_connection.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
between tarantool instances and basic Round-Robin strategy.
55
'''
66

7+
import time
78
from tarantool.connection import Connection
89
from tarantool.error import NetworkError
910
from tarantool.utils import ENCODING_DEFAULT
1011
from tarantool.const import (
1112
SOCKET_TIMEOUT,
1213
RECONNECT_MAX_ATTEMPTS,
13-
RECONNECT_DELAY
14+
RECONNECT_DELAY,
15+
NODES_REFRESH_INTERVAL
1416
)
1517

1618

@@ -34,12 +36,15 @@ def __init__(self, addrs,
3436
reconnect_delay=RECONNECT_DELAY,
3537
connect_now=True,
3638
encoding=ENCODING_DEFAULT,
37-
strategy_class=RoundRobinStrategy):
39+
strategy_class=RoundRobinStrategy,
40+
nodes_refresh_interval=NODES_REFRESH_INTERVAL):
3841
self.nattempts = 2 * len(addrs) + 1
3942
self.strategy = strategy_class(addrs)
4043
addr = self.strategy.getnext()
4144
host = addr['host']
4245
port = addr['port']
46+
self.nodes_refresh_interval = nodes_refresh_interval
47+
self.last_nodes_refresh = 0
4348
super(MeshConnection, self).__init__(host=host,
4449
port=port,
4550
user=user,
@@ -63,3 +68,28 @@ def _opt_reconnect(self):
6368
self.port = addr['port']
6469
else:
6570
raise NetworkError
71+
72+
if self.authenticated:
73+
now = time.time()
74+
if now - self.last_nodes_refresh > self.nodes_refresh_interval:
75+
self.refresh_nodes(now)
76+
77+
def refresh_nodes(self, cur_time):
78+
resp = super(MeshConnection, self).eval_ex('return get_nodes ~= nil',
79+
[], reconnect=False)
80+
if not (resp.data and resp.data[0]):
81+
return
82+
83+
resp = super(MeshConnection, self).call_ex('get_nodes', [],
84+
reconnect=False)
85+
86+
if not (resp.data and resp.data[0]):
87+
return
88+
89+
addrs = resp.data[0]
90+
if type(addrs) is list:
91+
self.strategy.addrs = addrs
92+
self.strategy.pos = 0
93+
self.last_nodes_refresh = cur_time
94+
if not {'host': self.host, 'port': self.port} in addrs:
95+
self._opt_reconnect()

unit/suites/test_reconnect.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,64 @@ def test_02_wrong_auth(self):
6666
con.close()
6767
self.srv.stop()
6868

69+
def test_03_mesh(self):
70+
# Start two servers
71+
self.srv.start()
72+
self.srv.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
73+
self.srv.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")
74+
75+
self.srv2 = TarantoolServer()
76+
self.srv2.script = 'unit/suites/box.lua'
77+
self.srv2.start()
78+
self.srv2.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
79+
self.srv2.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")
80+
81+
get_nodes = " \
82+
function get_nodes() \
83+
return { \
84+
{ \
85+
host = '%s', \
86+
port = tonumber(%d) \
87+
}, \
88+
{ \
89+
host = '%s', \
90+
port = tonumber(%d) \
91+
} \
92+
} \
93+
end" % (self.srv.host, self.srv.args['primary'], self.srv2.host, self.srv2.args['primary'])
94+
95+
# Create get_nodes function
96+
self.srv.admin(get_nodes)
97+
self.srv2.admin(get_nodes)
98+
99+
# Create srv_id function (for testing purposes)
100+
self.srv.admin("function srv_id() return 1 end")
101+
self.srv2.admin("function srv_id() return 2 end")
102+
103+
# Create a mesh connection, pass only the first server address
104+
con = tarantool.MeshConnection([{
105+
'host': self.srv.host, 'port': self.srv.args['primary']}],
106+
user='test',
107+
password='test',
108+
connect_now=True)
109+
110+
# Check we work with the first server
111+
resp = con.call('srv_id')
112+
self.assertIs(resp.data and resp.data[0] == 1, True)
113+
114+
# Stop the first server.
115+
self.srv.stop()
116+
117+
# Check we work with the second server
118+
resp = con.call('srv_id')
119+
self.assertIs(resp.data and resp.data[0] == 2, True)
120+
121+
# Stop the second server.
122+
self.srv2.stop()
123+
124+
# Close the connection
125+
con.close()
126+
69127
@classmethod
70128
def tearDownClass(self):
71129
self.srv.clean()

0 commit comments

Comments
 (0)