Skip to content

Commit b12e79d

Browse files
committed
Implement getting the cluster nodes list from nodes
Periodically call a user-defined Lua function on the current node to obtain or refresh the full cluster nodes list. Resolves: #134
1 parent a6b6d56 commit b12e79d

File tree

4 files changed

+190
-6
lines changed

4 files changed

+190
-6
lines changed

tarantool/connection.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,7 @@ def __init__(self, host, port,
126126
self.encoding = encoding
127127
self.call_16 = call_16
128128
self.connection_timeout = connection_timeout
129+
self.authenticated = False
129130
if connect_now:
130131
self.connect()
131132

@@ -200,9 +201,11 @@ def connect(self):
200201
:raise: `NetworkError`
201202
'''
202203
try:
204+
self.authenticated = False
203205
self.connect_basic()
204206
self.handshake()
205207
self.load_schema()
208+
self.authenticated = True
206209
except Exception as e:
207210
self.connected = False
208211
raise NetworkError(e)
@@ -370,14 +373,33 @@ def call(self, func_name, *args):
370373
371374
:rtype: `Response` instance
372375
'''
373-
assert isinstance(func_name, str)
374376

375377
# This allows to use a tuple or list as an argument
376378
if len(args) == 1 and isinstance(args[0], (list, tuple)):
377379
args = args[0]
378380

381+
return self.call_ex(func_name, args, True)
382+
383+
def call_ex(self, func_name, args=[], reconnect=True):
384+
'''
385+
Execute CALL request. Call stored Lua function.
386+
387+
:param func_name: stored Lua function name
388+
:type func_name: str
389+
:param args: list of function arguments
390+
:type args: list or tuple
391+
:param reconnect: reconnect before call
392+
:type reconnect: boolean
393+
394+
:rtype: `Response` instance
395+
'''
396+
assert isinstance(func_name, str)
397+
379398
request = RequestCall(self, func_name, args, self.call_16)
380-
response = self._send_request(request)
399+
if reconnect:
400+
response = self._send_request(request)
401+
else:
402+
response = self._send_request_wo_reconnect(request)
381403
return response
382404

383405
def eval(self, expr, *args):
@@ -391,14 +413,33 @@ def eval(self, expr, *args):
391413
392414
:rtype: `Response` instance
393415
'''
394-
assert isinstance(expr, str)
395416

396417
# This allows to use a tuple or list as an argument
397418
if len(args) == 1 and isinstance(args[0], (list, tuple)):
398419
args = args[0]
399420

421+
return self.eval_ex(expr, args, True)
422+
423+
def eval_ex(self, expr, args=[], reconnect=True):
424+
'''
425+
Execute EVAL request. Eval Lua expression.
426+
427+
:param expr: Lua expression
428+
:type expr: str
429+
:param args: list of function arguments
430+
:type args: list or tuple
431+
:param reconnect: reconnect before call
432+
:type reconnect: boolean
433+
434+
:rtype: `Response` instance
435+
'''
436+
assert isinstance(expr, str)
437+
400438
request = RequestEval(self, expr, args)
401-
response = self._send_request(request)
439+
if reconnect:
440+
response = self._send_request(request)
441+
else:
442+
response = self._send_request_wo_reconnect(request)
402443
return response
403444

404445
def replace(self, space_name, values):

tarantool/const.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,5 @@
8686
RECONNECT_MAX_ATTEMPTS = 10
8787
# Default delay between attempts to reconnect (seconds)
8888
RECONNECT_DELAY = 0.1
89+
# Default cluster nodes list refresh interval (seconds)
90+
NODES_REFRESH_INTERVAL = 300

tarantool/mesh_connection.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@
44
between tarantool instances and basic Round-Robin strategy.
55
'''
66

7+
import time
78
from tarantool.connection import Connection
89
from tarantool.error import NetworkError
910
from tarantool.utils import ENCODING_DEFAULT
1011
from tarantool.const import (
1112
SOCKET_TIMEOUT,
1213
RECONNECT_MAX_ATTEMPTS,
13-
RECONNECT_DELAY
14+
RECONNECT_DELAY,
15+
NODES_REFRESH_INTERVAL
1416
)
1517

1618

@@ -34,12 +36,16 @@ def __init__(self, addrs,
3436
reconnect_delay=RECONNECT_DELAY,
3537
connect_now=True,
3638
encoding=ENCODING_DEFAULT,
37-
strategy_class=RoundRobinStrategy):
39+
strategy_class=RoundRobinStrategy,
40+
nodes_refresh_interval=NODES_REFRESH_INTERVAL):
3841
self.nattempts = 2 * len(addrs) + 1
3942
self.strategy = strategy_class(addrs)
43+
self.strategy_class = strategy_class
4044
addr = self.strategy.getnext()
4145
host = addr['host']
4246
port = addr['port']
47+
self.nodes_refresh_interval = nodes_refresh_interval
48+
self.last_nodes_refresh = 0
4349
super(MeshConnection, self).__init__(host=host,
4450
port=port,
4551
user=user,
@@ -63,3 +69,31 @@ def _opt_reconnect(self):
6369
self.port = addr['port']
6470
else:
6571
raise NetworkError
72+
73+
if self.authenticated:
74+
now = time.time()
75+
if now - self.last_nodes_refresh > self.nodes_refresh_interval:
76+
self.refresh_nodes(now)
77+
78+
def refresh_nodes(self, cur_time):
79+
resp = super(MeshConnection, self).eval_ex('return get_nodes ~= nil',
80+
[], reconnect=False)
81+
if not (resp.data and resp.data[0]):
82+
return
83+
84+
resp = super(MeshConnection, self).call_ex('get_nodes', [],
85+
reconnect=False)
86+
87+
if not (resp.data and resp.data[0]):
88+
return
89+
90+
addrs = resp.data[0]
91+
if type(addrs) is list:
92+
self.strategy = self.strategy_class(addrs)
93+
self.last_nodes_refresh = cur_time
94+
if not {'host': self.host, 'port': self.port} in addrs:
95+
addr = self.strategy.getnext()
96+
self.host = addr['host']
97+
self.port = addr['port']
98+
self.close()
99+
self._opt_reconnect()

unit/suites/test_reconnect.py

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ def setUpClass(self):
1616
print('-' * 70, file=sys.stderr)
1717
self.srv = TarantoolServer()
1818
self.srv.script = 'unit/suites/box.lua'
19+
self.srv2 = TarantoolServer()
20+
self.srv2.script = 'unit/suites/box.lua'
1921

2022
def setUp(self):
2123
# prevent a remote tarantool from clean our session
@@ -66,6 +68,111 @@ def test_02_wrong_auth(self):
6668
con.close()
6769
self.srv.stop()
6870

71+
def test_03_mesh(self):
72+
# Start two servers
73+
self.srv.start()
74+
self.srv.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
75+
self.srv.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")
76+
77+
self.srv2.start()
78+
self.srv2.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
79+
self.srv2.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")
80+
81+
# get_nodes function contains both servers' addresses
82+
get_nodes = " \
83+
function get_nodes() \
84+
return { \
85+
{ \
86+
host = '%s', \
87+
port = tonumber(%d) \
88+
}, \
89+
{ \
90+
host = '%s', \
91+
port = tonumber(%d) \
92+
} \
93+
} \
94+
end" % (self.srv.host, self.srv.args['primary'], self.srv2.host, self.srv2.args['primary'])
95+
96+
# Create get_nodes function on servers
97+
self.srv.admin(get_nodes)
98+
self.srv2.admin(get_nodes)
99+
100+
# Create srv_id function (for testing purposes)
101+
self.srv.admin("function srv_id() return 1 end")
102+
self.srv2.admin("function srv_id() return 2 end")
103+
104+
# Create a mesh connection, pass only the first server address
105+
con = tarantool.MeshConnection([{
106+
'host': self.srv.host, 'port': self.srv.args['primary']}],
107+
user='test',
108+
password='test',
109+
connect_now=True)
110+
111+
# Check we work with the first server
112+
resp = con.call('srv_id')
113+
self.assertIs(resp.data and resp.data[0] == 1, True)
114+
115+
# Stop the first server
116+
self.srv.stop()
117+
118+
# Check we work with the second server
119+
resp = con.call('srv_id')
120+
self.assertIs(resp.data and resp.data[0] == 2, True)
121+
122+
# Stop the second server
123+
self.srv2.stop()
124+
125+
# Close the connection
126+
con.close()
127+
128+
def test_04_mesh_exclude_node(self):
129+
# Start two servers
130+
self.srv.start()
131+
self.srv.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
132+
self.srv.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")
133+
134+
self.srv2.start()
135+
self.srv2.admin("box.schema.user.create('test', { password = 'test', if_not_exists = true })")
136+
self.srv2.admin("box.schema.user.grant('test', 'read,write,execute', 'universe')")
137+
138+
# get_nodes function contains only the second server address
139+
get_nodes = " \
140+
function get_nodes() \
141+
return { \
142+
{ \
143+
host = '%s', \
144+
port = tonumber(%d) \
145+
} \
146+
} \
147+
end" % (self.srv2.host, self.srv2.args['primary'])
148+
149+
# Create get_nodes function on servers
150+
self.srv.admin(get_nodes)
151+
self.srv2.admin(get_nodes)
152+
153+
# Create srv_id function (for testing purposes)
154+
self.srv.admin("function srv_id() return 1 end")
155+
self.srv2.admin("function srv_id() return 2 end")
156+
157+
# Create a mesh connection, pass only the first server address
158+
con = tarantool.MeshConnection([{
159+
'host': self.srv.host, 'port': self.srv.args['primary']}],
160+
user='test',
161+
password='test',
162+
connect_now=True)
163+
164+
# Check we work with the second server
165+
resp = con.call('srv_id')
166+
self.assertIs(resp.data and resp.data[0] == 2, True)
167+
168+
# Stop servers
169+
self.srv.stop()
170+
self.srv2.stop()
171+
172+
# Close the connection
173+
con.close()
174+
69175
@classmethod
70176
def tearDownClass(self):
71177
self.srv.clean()
178+
self.srv2.clean()

0 commit comments

Comments
 (0)