|
1 | 1 | """
|
2 | 2 | This module provides API for interaction with a Tarantool server.
|
3 | 3 | """
|
| 4 | +import fcntl |
4 | 5 | # pylint: disable=too-many-lines,duplicate-code
|
5 | 6 |
|
6 | 7 | import os
|
| 8 | +import select |
7 | 9 | import time
|
8 | 10 | import errno
|
9 | 11 | from enum import Enum
|
@@ -623,7 +625,7 @@ def __init__(self, host, port,
|
623 | 625 | Unix sockets.
|
624 | 626 | :type host: :obj:`str` or :obj:`None`
|
625 | 627 |
|
626 |
| - :param port: Server port or Unix socket path. |
| 628 | + :param port: Server port, socket fd or Unix socket path. |
627 | 629 | :type port: :obj:`int` or :obj:`str`
|
628 | 630 |
|
629 | 631 | :param user: User name for authentication on the Tarantool
|
@@ -897,10 +899,34 @@ def connect_basic(self):
|
897 | 899 | :meta private:
|
898 | 900 | """
|
899 | 901 |
|
900 |
| - if self.host is None: |
901 |
| - self.connect_unix() |
902 |
| - else: |
| 902 | + if self.host is not None: |
903 | 903 | self.connect_tcp()
|
| 904 | + elif isinstance(self.port, int): |
| 905 | + self.connect_socket_fd() |
| 906 | + else: |
| 907 | + self.connect_unix() |
| 908 | + |
| 909 | + def connect_socket_fd(self): |
| 910 | + """ |
| 911 | + Establish a connection using an existing socket fd. |
| 912 | +
|
| 913 | + :raise: :exc:`OSError` |
| 914 | +
|
| 915 | + :meta private: |
| 916 | + """ |
| 917 | + |
| 918 | + # If old socket already exists - close it and re-create. |
| 919 | + self.connected = True |
| 920 | + if self._socket: |
| 921 | + self._socket.close() |
| 922 | + |
| 923 | + socket_fd = self.port |
| 924 | + self._socket = socket.socket(fileno=socket_fd) |
| 925 | + |
| 926 | + is_non_blocking = fcntl.fcntl(socket_fd, fcntl.F_GETFL) & os.O_NONBLOCK != 0 |
| 927 | + if is_non_blocking: |
| 928 | + # Explicitly specify, because otherwise it will not be initialized correctly. |
| 929 | + self._socket.settimeout(0) |
904 | 930 |
|
905 | 931 | def connect_tcp(self):
|
906 | 932 | """
|
@@ -1120,9 +1146,12 @@ def _recv(self, to_read):
|
1120 | 1146 | :meta private:
|
1121 | 1147 | """
|
1122 | 1148 |
|
| 1149 | + is_non_blocking = self._socket.gettimeout() == 0 |
1123 | 1150 | buf = b""
|
1124 | 1151 | while to_read > 0:
|
1125 | 1152 | try:
|
| 1153 | + if is_non_blocking: |
| 1154 | + select.select([self._socket.fileno()], [], []) |
1126 | 1155 | tmp = self._socket.recv(to_read)
|
1127 | 1156 | except OverflowError as exc:
|
1128 | 1157 | self._socket.close()
|
@@ -1163,6 +1192,41 @@ def _read_response(self):
|
1163 | 1192 | # Read the packet
|
1164 | 1193 | return self._recv(length)
|
1165 | 1194 |
|
| 1195 | + def _sendall(self, bytes): |
| 1196 | + """ |
| 1197 | + Sends bytes to the transport (socket). |
| 1198 | +
|
| 1199 | + :param bytes: message to send. |
| 1200 | + :type bytes: :obj:`bytes` |
| 1201 | +
|
| 1202 | + :raise: :exc:`~tarantool.error.NetworkError` |
| 1203 | +
|
| 1204 | + :meta: private: |
| 1205 | + """ |
| 1206 | + is_non_blocking = self._socket.gettimeout() == 0 |
| 1207 | + total_sent = 0 |
| 1208 | + while total_sent < len(bytes): |
| 1209 | + try: |
| 1210 | + if is_non_blocking: |
| 1211 | + select.select([], [self._socket.fileno()], []) |
| 1212 | + sent = self._socket.send(bytes[total_sent:]) |
| 1213 | + if sent == 0: |
| 1214 | + err = socket.error( |
| 1215 | + errno.ECONNRESET, |
| 1216 | + "Lost connection to server during query" |
| 1217 | + ) |
| 1218 | + raise NetworkError(err) |
| 1219 | + total_sent += sent |
| 1220 | + except BlockingIOError as exc: |
| 1221 | + total_sent += exc.characters_written |
| 1222 | + continue |
| 1223 | + except socket.error as exc: |
| 1224 | + err = socket.error( |
| 1225 | + errno.ECONNRESET, |
| 1226 | + "Lost connection to server during query" |
| 1227 | + ) |
| 1228 | + raise NetworkError(err) from exc |
| 1229 | + |
1166 | 1230 | def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
|
1167 | 1231 | """
|
1168 | 1232 | Send request without trying to reconnect.
|
@@ -1191,7 +1255,7 @@ def _send_request_wo_reconnect(self, request, on_push=None, on_push_ctx=None):
|
1191 | 1255 | response = None
|
1192 | 1256 | while True:
|
1193 | 1257 | try:
|
1194 |
| - self._socket.sendall(bytes(request)) |
| 1258 | + self._sendall(bytes(request)) |
1195 | 1259 | response = request.response_class(self, self._read_response())
|
1196 | 1260 | break
|
1197 | 1261 | except SchemaReloadException as exc:
|
|
0 commit comments