Skip to content

Commit 39815c1

Browse files
committed
conn: create from socket fd
This patch adds the ability to create Tarantool connection using an existing socket fd. To achieve this, several changes have been made to work with non-blocking sockets, as `socket.socketpair` creates such [1]. The authentication [2] might have already occured when we establish such a connection. If that's the case, there is no need to pass 'user' argument. On success, connect takes ownership of the `fd`. 1. tarantool/tarantool#8944 2. https://www.tarantool.io/en/doc/latest/dev_guide/internals/iproto/authentication/ Closes #304
1 parent 02818fb commit 39815c1

File tree

8 files changed

+279
-19
lines changed

8 files changed

+279
-19
lines changed

CHANGELOG.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,11 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## Unreleased
8+
9+
### Added
10+
- The ability to connect to the Tarantool using an existing socket fd (#304).
11+
712
## 1.1.2 - 2023-09-20
813

914
### Fixed

tarantool/connection.py

Lines changed: 57 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# pylint: disable=too-many-lines,duplicate-code
55

66
import os
7+
import select
78
import time
89
import errno
910
from enum import Enum
@@ -623,7 +624,7 @@ def __init__(self, host, port,
623624
Unix sockets.
624625
:type host: :obj:`str` or :obj:`None`
625626
626-
:param port: Server port or Unix socket path.
627+
:param port: Server port, socket fd or Unix socket path.
627628
:type port: :obj:`int` or :obj:`str`
628629
629630
:param user: User name for authentication on the Tarantool
@@ -897,10 +898,25 @@ def connect_basic(self):
897898
:meta private:
898899
"""
899900

900-
if self.host is None:
901-
self.connect_unix()
902-
else:
901+
if self.host is not None:
903902
self.connect_tcp()
903+
elif isinstance(self.port, int):
904+
self.connect_socket_fd()
905+
else:
906+
self.connect_unix()
907+
908+
def connect_socket_fd(self):
909+
"""
910+
Establish a connection using an existing socket fd.
911+
912+
:meta private:
913+
"""
914+
self.connected = True
915+
if self._socket:
916+
self._socket.close()
917+
918+
socket_fd = self.port
919+
self._socket = socket.socket(fileno=socket_fd)
904920

905921
def connect_tcp(self):
906922
"""
@@ -1124,6 +1140,9 @@ def _recv(self, to_read):
11241140
while to_read > 0:
11251141
try:
11261142
tmp = self._socket.recv(to_read)
1143+
except BlockingIOError:
1144+
select.select([self._socket.fileno()], [], [])
1145+
continue
11271146
except OverflowError as exc:
11281147
self._socket.close()
11291148
err = socket.error(
@@ -1163,6 +1182,39 @@ def _read_response(self):
11631182
# Read the packet
11641183
return self._recv(length)
11651184

1185+
def _sendall(self, bytes_to_send):
1186+
"""
1187+
Sends bytes to the transport (socket).
1188+
1189+
:param bytes_to_send: message to send.
1190+
:type bytes_to_send: :obj:`bytes`
1191+
1192+
:raise: :exc:`~tarantool.error.NetworkError`
1193+
1194+
:meta: private:
1195+
"""
1196+
total_sent = 0
1197+
while total_sent < len(bytes_to_send):
1198+
try:
1199+
sent = self._socket.send(bytes_to_send[total_sent:])
1200+
if sent == 0:
1201+
err = socket.error(
1202+
errno.ECONNRESET,
1203+
"Lost connection to server during query"
1204+
)
1205+
raise NetworkError(err)
1206+
total_sent += sent
1207+
except BlockingIOError as exc:
1208+
total_sent += exc.characters_written
1209+
select.select([], [self._socket.fileno()], [])
1210+
continue
1211+
except socket.error as exc:
1212+
err = socket.error(
1213+
errno.ECONNRESET,
1214+
"Lost connection to server during query"
1215+
)
1216+
raise NetworkError(err) from exc
1217+
11661218
def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
11671219
"""
11681220
Send request without trying to reconnect.
@@ -1191,7 +1243,7 @@ def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
11911243
response = None
11921244
while True:
11931245
try:
1194-
self._socket.sendall(bytes(request))
1246+
self._sendall(bytes(request))
11951247
response = request.response_class(self, self._read_response())
11961248
break
11971249
except SchemaReloadException as exc:

tarantool/mesh_connection.py

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -139,23 +139,23 @@ def format_error(address, err):
139139
result[key] = val
140140

141141
if isinstance(result['port'], int):
142-
# Looks like an inet address.
142+
# Looks like an inet address or socket fd.
143143

144144
# Validate host.
145-
if 'host' not in result or result['host'] is None:
146-
return format_error(result,
147-
'host is mandatory for an inet result')
148-
if not isinstance(result['host'], str):
149-
return format_error(result,
150-
'host must be a string for an inet result')
145+
if ('host' in result and result['host'] is not None
146+
and not isinstance(result['host'], str)):
147+
return format_error(result, 'host must be a string for an inet result, '
148+
'or None for a socket fd result')
151149

152150
# Validate port.
153151
if not isinstance(result['port'], int):
154152
return format_error(result,
155-
'port must be an int for an inet result')
156-
if result['port'] < 1 or result['port'] > 65535:
157-
return format_error(result, 'port must be in range [1, 65535] '
158-
'for an inet result')
153+
'port must be an int for an inet/socket fd result')
154+
if 'host' in result and isinstance(result['host'], str):
155+
# Check that the port is correct.
156+
if result['port'] < 1 or result['port'] > 65535:
157+
return format_error(result, 'port must be in range [1, 65535] '
158+
'for an inet result')
159159

160160
# Looks okay.
161161
return result, None
@@ -447,7 +447,8 @@ def __init__(self, host=None, port=None,
447447
# Don't change user provided arguments.
448448
addrs = addrs[:]
449449

450-
if host and port:
450+
if port:
451+
# host can be None in the case of socket fd or Unix socket.
451452
addrs.insert(0, {'host': host,
452453
'port': port,
453454
'transport': transport,

test/suites/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from .test_execute import TestSuiteExecute
1616
from .test_dbapi import TestSuiteDBAPI
1717
from .test_encoding import TestSuiteEncoding
18+
from .test_socket_fd import TestSuiteSocketFD
1819
from .test_ssl import TestSuiteSsl
1920
from .test_decimal import TestSuiteDecimal
2021
from .test_uuid import TestSuiteUUID
@@ -33,7 +34,7 @@
3334
TestSuiteEncoding, TestSuitePool, TestSuiteSsl,
3435
TestSuiteDecimal, TestSuiteUUID, TestSuiteDatetime,
3536
TestSuiteInterval, TestSuitePackage, TestSuiteErrorExt,
36-
TestSuitePush, TestSuiteConnection, TestSuiteCrud,)
37+
TestSuitePush, TestSuiteConnection, TestSuiteCrud, TestSuiteSocketFD)
3738

3839

3940
def load_tests(loader, tests, pattern):

test/suites/lib/skip.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,14 @@ def skip_or_run_iproto_basic_features_test(func):
306306

307307
return skip_or_run_test_tarantool(func, '2.10.0',
308308
'does not support iproto ID and iproto basic features')
309+
310+
311+
def skip_or_run_box_session_new_tests(func):
312+
"""
313+
Decorator to skip or run tests that use box.session.new.
314+
315+
Tarantool supports box.session.new only in current master since
316+
commit 324872a.
317+
See https://github.com/tarantool/tarantool/issues/8801.
318+
"""
319+
return skip_or_run_test_tarantool(func, '3.0.0', 'does not support box.session.new')

test/suites/sidecar.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# pylint: disable=missing-module-docstring
2+
import os
3+
4+
import tarantool
5+
6+
socket_fd = int(os.environ["SOCKET_FD"])
7+
8+
conn = tarantool.connect(host=None, port=socket_fd)
9+
10+
# Check user.
11+
assert conn.eval("return box.session.user()").data[0] == "test"
12+
13+
# Check db operations.
14+
conn.insert("test", [1])
15+
conn.insert("test", [2])
16+
assert conn.select("test").data == [[1], [2]]

test/suites/test_mesh.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,6 @@ def test_01_contructor(self):
135135
# Verify that a bad address given at initialization leads
136136
# to an error.
137137
bad_addrs = [
138-
{"port": 1234}, # no host
139138
{"host": "localhost"}, # no port
140139
{"host": "localhost", "port": "1234"}, # port is str
141140
]

test/suites/test_socket_fd.py

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,175 @@
1+
"""
2+
This module tests work with connection over socket fd.
3+
"""
4+
import os.path
5+
# pylint: disable=missing-class-docstring,missing-function-docstring
6+
7+
import socket
8+
import sys
9+
import unittest
10+
11+
import tarantool
12+
from .lib.skip import skip_or_run_box_session_new_tests
13+
from .lib.tarantool_server import TarantoolServer, find_port
14+
from .utils import assert_admin_success
15+
16+
17+
def find_python():
18+
for _dir in os.environ["PATH"].split(os.pathsep):
19+
exe = os.path.join(_dir, "python")
20+
if os.access(exe, os.X_OK):
21+
return os.path.abspath(exe)
22+
raise RuntimeError("Can't find python executable in " + os.environ["PATH"])
23+
24+
25+
class TestSuiteSocketFD(unittest.TestCase):
26+
EVAL_USER = "return box.session.user()"
27+
28+
@classmethod
29+
def setUpClass(cls):
30+
print(' SOCKET FD '.center(70, '='), file=sys.stderr)
31+
print('-' * 70, file=sys.stderr)
32+
33+
cls.srv = TarantoolServer()
34+
cls.srv.script = 'test/suites/box.lua'
35+
cls.srv.start()
36+
cls.tcp_port = find_port()
37+
38+
# Start tcp server to test work with blocking sockets.
39+
# pylint:disable=C0209
40+
resp = cls.srv.admin("""
41+
local socket = require('socket')
42+
43+
box.cfg{}
44+
box.schema.user.create('test', {password = 'test', if_not_exists = true})
45+
box.schema.user.grant('test', 'read,write,execute,create', 'universe',
46+
nil, {if_not_exists = true})
47+
box.schema.user.grant('guest', 'execute', 'universe',
48+
nil, {if_not_exists = true})
49+
50+
socket.tcp_server('0.0.0.0', %d, function(s)
51+
if not s:nonblock(true) then
52+
s:close()
53+
return
54+
end
55+
box.session.new({
56+
type = 'binary',
57+
fd = s:fd(),
58+
user = 'test',
59+
})
60+
s:detach()
61+
end)
62+
63+
box.schema.create_space('test', {
64+
format = {{type='unsigned', name='id'}},
65+
if_not_exists = true,
66+
})
67+
box.space.test:create_index('primary')
68+
69+
return true
70+
""" % cls.tcp_port)
71+
assert_admin_success(resp)
72+
73+
@skip_or_run_box_session_new_tests
74+
def setUp(self):
75+
# Prevent a remote tarantool from clean our session.
76+
if self.srv.is_started():
77+
self.srv.touch_lock()
78+
79+
def _get_tt_sock(self):
80+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
81+
sock.connect((self.srv.host, self.tcp_port))
82+
return sock
83+
84+
def test_01_socket_fd_connect(self):
85+
sock = self._get_tt_sock()
86+
conn = tarantool.connect(host=None, port=sock.fileno())
87+
sock.detach()
88+
try:
89+
self.assertSequenceEqual(conn.eval(self.EVAL_USER), ["test"])
90+
finally:
91+
conn.close()
92+
93+
def test_02_socket_fd_re_auth(self):
94+
sock = self._get_tt_sock()
95+
conn = tarantool.connect(host=None, port=sock.fileno(), user="guest")
96+
sock.detach()
97+
try:
98+
self.assertSequenceEqual(conn.eval(self.EVAL_USER), ["guest"])
99+
finally:
100+
conn.close()
101+
102+
def test_03_socket_fd_mesh(self):
103+
sock = self._get_tt_sock()
104+
pool = tarantool.ConnectionPool(
105+
addrs=[{'host': None, 'port': sock.fileno()}]
106+
)
107+
sock.detach()
108+
try:
109+
self.assertSequenceEqual(pool.eval(self.EVAL_USER, mode=tarantool.Mode.ANY), ["test"])
110+
finally:
111+
pool.close()
112+
113+
def test_04_socket_fd_pool(self):
114+
sock = self._get_tt_sock()
115+
mesh = tarantool.MeshConnection(
116+
host=None, port=sock.fileno()
117+
)
118+
sock.detach()
119+
try:
120+
self.assertSequenceEqual(mesh.eval(self.EVAL_USER), ["test"])
121+
finally:
122+
mesh.close()
123+
124+
@unittest.skipIf(sys.platform.startswith("win"),
125+
"Skip on Windows since it uses remote server")
126+
def test_05_tarantool_made_socket(self):
127+
python_exe = find_python()
128+
cwd = os.getcwd()
129+
side_script_path = os.path.join(cwd, "test", "suites", "sidecar.py")
130+
131+
# pylint:disable=C0209
132+
ret_code, err = self.srv.admin("""
133+
local socket = require('socket')
134+
local popen = require('popen')
135+
local os = require('os')
136+
local s1, s2 = socket.socketpair('AF_UNIX', 'SOCK_STREAM', 0)
137+
138+
--[[ Tell sidecar which fd use to connect. --]]
139+
os.setenv('SOCKET_FD', tostring(s2:fd()))
140+
141+
--[[ Tell sidecar where find `tarantool` module. --]]
142+
os.setenv('PYTHONPATH', (os.getenv('PYTHONPATH') or '') .. ':' .. '%s')
143+
144+
box.session.new({
145+
type = 'binary',
146+
fd = s1:fd(),
147+
user = 'test',
148+
})
149+
s1:detach()
150+
151+
local ph, err = popen.new({'%s', '%s'}, {
152+
stdout = popen.opts.PIPE,
153+
stderr = popen.opts.PIPE,
154+
inherit_fds = {s2:fd()},
155+
})
156+
157+
if err ~= nil then
158+
return 1, err
159+
end
160+
161+
ph:wait()
162+
163+
local status_code = ph:info().status.exit_code
164+
local stderr = ph:read({stderr=true}):rstrip()
165+
return status_code, stderr
166+
""" % (cwd, python_exe, side_script_path))
167+
if err is not None:
168+
print(err)
169+
self.assertEqual(ret_code, 0)
170+
self.assertIsNone(err)
171+
172+
@classmethod
173+
def tearDownClass(cls):
174+
cls.srv.stop()
175+
cls.srv.clean()

0 commit comments

Comments
 (0)