Skip to content

Commit c25c651

Browse files
Add cluster discovery function to mesh connection
Make Mesh class compatible with original Connection Make mesh addrs validation parts Add warning on mesh config validation. Reworked tests on mesh class Reworked strategy class Autodocs on mesh class
1 parent b54ffb0 commit c25c651

File tree

8 files changed

+430
-19
lines changed

8 files changed

+430
-19
lines changed

doc/api/class-mesh-connection.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
2+
.. currentmodule:: tarantool.mesh_connection
3+
4+
class :class:`MeshConnection`
5+
-----------------------------
6+
7+
.. autoclass:: MeshConnection
8+

doc/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ API Reference
4040

4141
api/module-tarantool.rst
4242
api/class-connection.rst
43+
api/class-mesh-connection.rst
4344
api/class-space.rst
4445
api/class-response.rst
4546

doc/index.ru.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040

4141
api/module-tarantool.rst
4242
api/class-connection.rst
43+
api/class-mesh-connection.rst
4344
api/class-space.rst
4445
api/class-response.rst
4546

tarantool/const.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,5 @@
8686
RECONNECT_MAX_ATTEMPTS = 10
8787
# Default delay between attempts to reconnect (seconds)
8888
RECONNECT_DELAY = 0.1
89+
# Default cluster nodes list refresh interval (seconds)
90+
CLUSTER_DISCOVERY_DELAY = 60

tarantool/error.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,10 @@ class NetworkWarning(UserWarning):
152152
pass
153153

154154

155+
class ConfigurationWarning(UserWarning):
156+
'''Warning in configuration validation'''
157+
pass
158+
155159
# always print this warnings
156160
warnings.filterwarnings("always", category=NetworkWarning)
157161

tarantool/mesh_connection.py

Lines changed: 213 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,42 +4,173 @@
44
between tarantool instances and basic Round-Robin strategy.
55
'''
66

7+
import time
8+
from itertools import cycle
9+
10+
711
from tarantool.connection import Connection
8-
from tarantool.error import NetworkError
12+
from tarantool.error import warn, NetworkError, ConfigurationWarning
913
from tarantool.utils import ENCODING_DEFAULT
1014
from tarantool.const import (
15+
CONNECTION_TIMEOUT,
1116
SOCKET_TIMEOUT,
1217
RECONNECT_MAX_ATTEMPTS,
13-
RECONNECT_DELAY
18+
RECONNECT_DELAY,
19+
CLUSTER_DISCOVERY_DELAY,
20+
)
21+
22+
from tarantool.request import (
23+
RequestCall
1424
)
1525

26+
try:
27+
string_types = basestring
28+
except NameError:
29+
string_types = str
30+
1631

1732
class RoundRobinStrategy(object):
33+
"""
34+
Simple round-robin address rotation
35+
"""
1836
def __init__(self, addrs):
19-
self.addrs = addrs
20-
self.pos = 0
37+
self.reload(addrs)
38+
39+
def __validate(self, address):
40+
err = []
41+
42+
if "host" not in address:
43+
err.append("host key must be set")
44+
elif not isinstance(address["host"], string_types):
45+
err.append("host value must be string type")
46+
47+
if "port" not in address:
48+
err.append("port is not set")
49+
elif not isinstance(address["port"], int):
50+
err.append("port value must be int type")
51+
elif address["port"] == 0:
52+
err.append("port value must not be zero")
53+
54+
if err:
55+
wrn = 'Address validation : %s ' % ", ".join(err)
56+
warn(wrn, ConfigurationWarning)
57+
return False
58+
return True
59+
60+
def reload(self, addrs):
61+
self.addrs = []
62+
for i in addrs:
63+
if self.__validate(i) and i not in self.addrs:
64+
self.addrs.append(i)
65+
66+
self.__gen = cycle(self.addrs)
2167

2268
def getnext(self):
23-
tmp = self.pos
24-
self.pos = (self.pos + 1) % len(self.addrs)
25-
return self.addrs[tmp]
69+
return next(self.__gen)
70+
71+
72+
def parse_uri(uri_str):
73+
if not uri_str or uri_str.count(":") != 1:
74+
return
75+
uri = uri_str.split(':', 1)
76+
host = uri[0]
77+
try:
78+
port = int(uri[1])
79+
except ValueError:
80+
return
81+
82+
if host and port:
83+
return {'host': host, 'port': port}
2684

2785

2886
class MeshConnection(Connection):
29-
def __init__(self, addrs,
87+
'''
88+
Represents a connection to a cluster of Tarantool servers.
89+
90+
This class uses Connection to connect to one of the nodes of the cluster.
91+
The initial list of nodes is passed to the constructor in 'addrs' parameter.
92+
The class set in 'strategy_class' parameter is used to select a node from
93+
the list and switch nodes in case of unavailability of the current node.
94+
95+
'cluster_discovery_function' param of the constructor sets the name of a stored
96+
Lua function used to refresh the list of available nodes. The function takes
97+
no parameters and returns a list of strings in format 'host:port'. A generic
98+
function for getting the list of nodes looks like this:
99+
100+
.. code-block:: lua
101+
102+
function get_cluster_nodes()
103+
return {
104+
'192.168.0.1:3301',
105+
'192.168.0.2:3302',
106+
-- ...
107+
}
108+
end
109+
110+
You may put in this list whatever you need depending on your cluster
111+
topology. Chances are you'll want to make the list of nodes from nodes'
112+
replication config. Here is an example for it:
113+
114+
.. code-block:: lua
115+
116+
local uri_lib = require('uri')
117+
118+
function get_cluster_nodes()
119+
local nodes = {}
120+
121+
local replicas = box.cfg.replication
122+
123+
for i = 1, #replicas do
124+
local uri = uri_lib.parse(replicas[i])
125+
126+
if uri.host and uri.service then
127+
table.insert(nodes, uri.host .. ':' .. uri.service)
128+
end
129+
end
130+
131+
-- if your replication config doesn't contain the current node
132+
-- you have to add it manually like this:
133+
table.insert(nodes, '192.168.0.1:3301')
134+
135+
return nodes
136+
end
137+
'''
138+
139+
def __init__(self, host=None, port=None,
30140
user=None,
31141
password=None,
32142
socket_timeout=SOCKET_TIMEOUT,
33143
reconnect_max_attempts=RECONNECT_MAX_ATTEMPTS,
34144
reconnect_delay=RECONNECT_DELAY,
35145
connect_now=True,
36146
encoding=ENCODING_DEFAULT,
37-
strategy_class=RoundRobinStrategy):
38-
self.nattempts = 2 * len(addrs) + 1
147+
call_16=False,
148+
connection_timeout=CONNECTION_TIMEOUT,
149+
addrs=None,
150+
strategy_class=RoundRobinStrategy,
151+
cluster_discovery_function=None,
152+
cluster_discovery_delay=CLUSTER_DISCOVERY_DELAY):
153+
154+
if addrs is None:
155+
addrs = []
156+
157+
if host and port:
158+
addrs.insert(0, {'host': host, 'port': port})
159+
160+
self.strategy_class = strategy_class
39161
self.strategy = strategy_class(addrs)
162+
163+
if not self.strategy.addrs and connect_now:
164+
raise ValueError("Host/port or addrs list must be set")
165+
40166
addr = self.strategy.getnext()
41167
host = addr['host']
42168
port = addr['port']
169+
170+
self.cluster_discovery_function = cluster_discovery_function
171+
self.cluster_discovery_delay = cluster_discovery_delay
172+
self.last_nodes_refresh = 0
173+
43174
super(MeshConnection, self).__init__(host=host,
44175
port=port,
45176
user=user,
@@ -48,18 +179,82 @@ def __init__(self, addrs,
48179
reconnect_max_attempts=reconnect_max_attempts,
49180
reconnect_delay=reconnect_delay,
50181
connect_now=connect_now,
51-
encoding=encoding)
182+
encoding=encoding,
183+
call_16=call_16,
184+
connection_timeout=connection_timeout)
185+
if connect_now and self.cluster_discovery_function:
186+
self._opt_refresh_instances()
52187

53188
def _opt_reconnect(self):
54-
nattempts = self.nattempts
55-
while nattempts > 0:
189+
'''
190+
Use original opt_reconnect with address rotation
191+
'''
192+
193+
for i in range(len(self.strategy.addrs)+1):
56194
try:
57195
super(MeshConnection, self)._opt_reconnect()
58-
break
59196
except NetworkError:
60-
nattempts -= 1
61197
addr = self.strategy.getnext()
62-
self.host = addr['host']
63-
self.port = addr['port']
64-
else:
198+
self.host = addr["host"]
199+
self.port = addr["port"]
200+
continue
201+
202+
if not self._socket:
65203
raise NetworkError
204+
205+
def _opt_refresh_instances(self):
206+
'''
207+
Refresh list of cluster instances.
208+
If current connection not in server list will change connection.
209+
'''
210+
now = time.time()
211+
212+
if not self.connected or not self.cluster_discovery_function:
213+
return
214+
215+
if now - self.last_nodes_refresh > self.cluster_discovery_delay:
216+
request = RequestCall(self,
217+
self.cluster_discovery_function,
218+
(),
219+
self.call_16)
220+
221+
self._opt_reconnect()
222+
resp = self._send_request_wo_reconnect(request)
223+
224+
# got data to refresh
225+
if resp.data and resp.data[0]:
226+
addrs = []
227+
for i in resp.data[0]:
228+
addr = parse_uri(i)
229+
if addr:
230+
addrs.append(addr)
231+
232+
self.strategy.reload(addrs)
233+
self.last_nodes_refresh = now
234+
235+
if not self.strategy.addrs:
236+
raise NetworkError('No addrs to connect')
237+
238+
# Check current addr in list
239+
if {'host': self.host, 'port': self.port} not in self.strategy.addrs:
240+
self.close()
241+
addr = self.strategy.getnext()
242+
self.host = addr['host']
243+
self.port = addr['port']
244+
self._opt_reconnect()
245+
246+
def _send_request(self, request):
247+
'''
248+
Send the request to the server through the socket.
249+
Return an instance of `Response` class.
250+
251+
Update instances list from server `cluster_discovery_function` function.
252+
253+
:param request: object representing a request
254+
:type request: `Request` instance
255+
256+
:rtype: `Response` instance
257+
'''
258+
self._opt_refresh_instances()
259+
260+
return super(MeshConnection, self)._send_request(request)

unit/suites/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
from .test_dml import TestSuite_Request
99
from .test_protocol import TestSuite_Protocol
1010
from .test_reconnect import TestSuite_Reconnect
11+
from .test_mesh import TestSuite_Mesh
1112

1213
test_cases = (TestSuite_Schema, TestSuite_Request, TestSuite_Protocol,
13-
TestSuite_Reconnect)
14+
TestSuite_Reconnect, TestSuite_Mesh)
1415

1516
def load_tests(loader, tests, pattern):
1617
suite = unittest.TestSuite()

0 commit comments

Comments
 (0)