Skip to content

Commit 807def0

Browse files
committed
Client: refactor network provider
Make it work with connections directly instead of descriptors. Part of #28
1 parent acf4f5b commit 807def0

File tree

4 files changed

+45
-81
lines changed

4 files changed

+45
-81
lines changed

src/Client/Connection.hpp

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,10 @@ struct ConnectionImpl
6666
ConnectionImpl& operator = (const ConnectionImpl& impl) = delete;
6767
~ConnectionImpl();
6868

69+
public:
6970
void ref();
7071
void unref();
71-
public:
72+
7273
Connector<BUFFER, NetProvider> &connector;
7374
BUFFER inBuf;
7475
BUFFER outBuf;
@@ -101,7 +102,8 @@ ConnectionImpl<BUFFER, NetProvider>::~ConnectionImpl()
101102
{
102103
assert(refs == 0);
103104
if (socket >= 0) {
104-
connector.close(socket);
105+
Connection<BUFFER, NetProvider> conn(this);
106+
connector.close(conn);
105107
socket = -1;
106108
}
107109
}
@@ -130,12 +132,16 @@ class Connection
130132
public:
131133
class Space;
132134
Space space;
135+
using Impl_t = ConnectionImpl<BUFFER, NetProvider>;
133136

134137
Connection(Connector<BUFFER, NetProvider> &connector);
138+
Connection(Impl_t *a);
135139
~Connection();
136140
Connection(const Connection& connection);
137141
Connection& operator = (const Connection& connection);
138142

143+
Impl_t *getImpl() { return impl; }
144+
139145
//Required for storing Connections in hash tables (std::unordered_map)
140146
friend bool operator == (const Connection<BUFFER, NetProvider>& lhs,
141147
const Connection<BUFFER, NetProvider>& rhs)
@@ -325,6 +331,13 @@ Connection<BUFFER, NetProvider>::Connection(Connector<BUFFER, NetProvider> &conn
325331
impl->ref();
326332
}
327333

334+
template<class BUFFER, class NetProvider>
335+
Connection<BUFFER, NetProvider>::Connection(ConnectionImpl<BUFFER, NetProvider> *a) :
336+
space(*this), impl(a)
337+
{
338+
impl->ref();
339+
}
340+
328341
template<class BUFFER, class NetProvider>
329342
Connection<BUFFER, NetProvider>::Connection(const Connection& connection) :
330343
space(*this), impl(connection.impl)

src/Client/Connector.hpp

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ class Connector
7474
void finishSend(const Connection<BUFFER, NetProvider> &conn);
7575

7676
std::set<Connection<BUFFER, NetProvider>> m_ReadyToSend;
77-
void close(int socket);
7877
void close(Connection<BUFFER, NetProvider> &conn);
7978
private:
8079
//Timeout of Connector::connect() method.
@@ -109,19 +108,12 @@ Connector<BUFFER, NetProvider>::connect(Connection<BUFFER, NetProvider> &conn,
109108
return 0;
110109
}
111110

112-
template<class BUFFER, class NetProvider>
113-
void
114-
Connector<BUFFER, NetProvider>::close(int socket)
115-
{
116-
m_NetProvider.close(socket);
117-
}
118-
119111
template<class BUFFER, class NetProvider>
120112
void
121113
Connector<BUFFER, NetProvider>::close(Connection<BUFFER, NetProvider> &conn)
122114
{
123115
assert(conn.getSocket() >= 0);
124-
m_NetProvider.close(conn.getSocket());
116+
m_NetProvider.close(conn);
125117
conn.setSocket(-1);
126118
}
127119

src/Client/EpollNetProvider.hpp

Lines changed: 26 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,13 @@ class EpollNetProvider {
5656
~EpollNetProvider();
5757
int connect(Conn_t &conn, const std::string_view& addr, unsigned port,
5858
size_t timeout);
59-
void close(int socket);
59+
void close(Conn_t &conn);
6060
/** Read and write to sockets; polling using epoll. */
6161
int wait(int timeout);
6262

6363
bool check(Conn_t &conn);
6464
private:
6565
static constexpr size_t DEFAULT_TIMEOUT = 100;
66-
static constexpr size_t EVENT_POLL_COUNT_MAX = 64;
67-
static constexpr size_t EPOLL_QUEUE_LEN = 1024;
6866
static constexpr size_t EPOLL_EVENTS_MAX = 128;
6967

7068
//return 0 if all data from buffer was processed (sent or read);
@@ -73,13 +71,10 @@ class EpollNetProvider {
7371
int send(Conn_t &conn);
7472
int recv(Conn_t &conn);
7573

76-
int poll(struct ConnectionEvent *fds, size_t *fd_count,
77-
int timeout = DEFAULT_TIMEOUT);
78-
void setPollSetting(int socket, int setting);
79-
void registerEpoll(int socket);
74+
void setPollSetting(Conn_t &conn, int setting);
75+
void registerEpoll(Conn_t &conn);
8076

8177
/** <socket : connection> map. Contains both ready to read/send connections */
82-
std::map<int, Conn_t > m_Connections;
8378
Connector_t &m_Connector;
8479
int m_EpollFd;
8580
};
@@ -88,7 +83,7 @@ template<class BUFFER, class NETWORK>
8883
EpollNetProvider<BUFFER, NETWORK>::EpollNetProvider(Connector_t &connector) :
8984
m_Connector(connector)
9085
{
91-
m_EpollFd = epoll_create(EPOLL_QUEUE_LEN);
86+
m_EpollFd = epoll_create1(EPOLL_CLOEXEC);
9287
if (m_EpollFd == -1) {
9388
LOG_ERROR("Failed to initialize epoll: ", strerror(errno));
9489
abort();
@@ -99,22 +94,20 @@ template<class BUFFER, class NETWORK>
9994
EpollNetProvider<BUFFER, NETWORK>::~EpollNetProvider()
10095
{
10196
::close(m_EpollFd);
102-
m_EpollFd = 0;
103-
104-
for (auto conn = m_Connections.begin(); conn != m_Connections.end();)
105-
conn = m_Connections.erase(conn);
97+
m_EpollFd = -1;
10698
}
10799

108100
template<class BUFFER, class NETWORK>
109101
void
110-
EpollNetProvider<BUFFER, NETWORK>::registerEpoll(int socket)
102+
EpollNetProvider<BUFFER, NETWORK>::registerEpoll(Conn_t &conn)
111103
{
112104
/* Configure epoll with new socket. */
113105
assert(m_EpollFd >= 0);
114106
struct epoll_event event;
115107
event.events = EPOLLIN;
116-
event.data.fd = socket;
117-
if (epoll_ctl(m_EpollFd, EPOLL_CTL_ADD, socket, &event) != 0) {
108+
event.data.ptr = conn.getImpl();
109+
conn.getImpl()->ref();
110+
if (epoll_ctl(m_EpollFd, EPOLL_CTL_ADD, conn.getSocket(), &event) != 0) {
118111
LOG_ERROR("Failed to add socket to epoll: "
119112
"epoll_ctl() returned with errno: ",
120113
strerror(errno));
@@ -124,11 +117,11 @@ EpollNetProvider<BUFFER, NETWORK>::registerEpoll(int socket)
124117

125118
template<class BUFFER, class NETWORK>
126119
void
127-
EpollNetProvider<BUFFER, NETWORK>::setPollSetting(int socket, int setting) {
120+
EpollNetProvider<BUFFER, NETWORK>::setPollSetting(Conn_t &conn, int setting) {
128121
struct epoll_event event;
129122
event.events = setting;
130-
event.data.fd = socket;
131-
if (epoll_ctl(m_EpollFd, EPOLL_CTL_MOD, socket, &event) != 0) {
123+
event.data.ptr = conn.getImpl();
124+
if (epoll_ctl(m_EpollFd, EPOLL_CTL_MOD, conn.getSocket(), &event) != 0) {
132125
LOG_ERROR("Failed to change epoll mode: "
133126
"epoll_ctl() returned with errno: ",
134127
strerror(errno));
@@ -173,16 +166,16 @@ EpollNetProvider<BUFFER, NETWORK>::connect(Conn_t &conn,
173166
LOG_DEBUG("Greetings are decoded");
174167
LOG_DEBUG("Authentication processing...");
175168
//TODO: add authentication step.
176-
registerEpoll(socket);
177169
conn.setSocket(socket);
178-
m_Connections.insert({socket, conn});
170+
registerEpoll(conn);
179171
return 0;
180172
}
181173

182174
template<class BUFFER, class NETWORK>
183175
void
184-
EpollNetProvider<BUFFER, NETWORK>::close(int socket)
176+
EpollNetProvider<BUFFER, NETWORK>::close(Conn_t& conn)
185177
{
178+
int socket = conn.getSocket();
186179
assert(socket >= 0);
187180
#ifndef NDEBUG
188181
struct sockaddr sa;
@@ -201,46 +194,15 @@ EpollNetProvider<BUFFER, NETWORK>::close(int socket)
201194
" corresponding to address ", addr);
202195
}
203196
#endif
197+
conn.getImpl()->unref();
204198
NETWORK::close(socket);
205-
struct epoll_event event;
206-
event.events = EPOLLIN;
207-
event.data.fd = socket;
208199
/*
209200
* Descriptor is automatically removed from epoll handler
210201
* when all descriptors are closed. So in case
211202
* there's other descriptors on open socket, invoke
212203
* epoll_ctl manually.
213204
*/
214-
epoll_ctl(m_EpollFd, EPOLL_CTL_DEL, socket, &event);
215-
//close can be called during epoll provider destruction. In this case
216-
//all connections staying alive only due to the presence in m_Connections
217-
//map. While cleaning up m_Connections destructors of connections will be
218-
//called. So to avoid double-free presence check in m_Connections is required.
219-
if (m_Connections.find(socket) != m_Connections.end()) {
220-
assert(m_Connections.find(socket)->second.getSocket() == socket);
221-
m_Connections.erase(socket);
222-
}
223-
}
224-
225-
template<class BUFFER, class NETWORK>
226-
int
227-
EpollNetProvider<BUFFER, NETWORK>::poll(struct ConnectionEvent *fds,
228-
size_t *fd_count, int timeout)
229-
{
230-
static struct epoll_event events[EPOLL_EVENTS_MAX];
231-
*fd_count = 0;
232-
int event_cnt = epoll_wait(m_EpollFd, events, EPOLL_EVENTS_MAX,
233-
timeout);
234-
if (event_cnt == -1)
235-
return -1;
236-
assert(event_cnt >= 0);
237-
for (int i = 0; i < event_cnt; ++i) {
238-
fds[*fd_count].sock = events[i].data.fd;
239-
fds[*fd_count].event = events[i].events;
240-
(*fd_count)++;
241-
}
242-
assert(*fd_count == (size_t) event_cnt);
243-
return 0;
205+
epoll_ctl(m_EpollFd, EPOLL_CTL_DEL, socket, nullptr);
244206
}
245207

246208
template<class BUFFER, class NETWORK>
@@ -286,7 +248,7 @@ EpollNetProvider<BUFFER, NETWORK>::send(Conn_t &conn)
286248
hasSentBytes(conn, sent_bytes);
287249
if (rc != 0) {
288250
if (netWouldBlock(errno)) {
289-
setPollSetting(conn.getSocket(), EPOLLIN | EPOLLOUT);
251+
setPollSetting(conn, EPOLLIN | EPOLLOUT);
290252
return 1;
291253
}
292254
conn.setError(std::string("Failed to send request: ") +
@@ -316,19 +278,17 @@ EpollNetProvider<BUFFER, NETWORK>::wait(int timeout)
316278
}
317279

318280
/* Firstly poll connections to point out if there's data to read. */
319-
static struct ConnectionEvent events[EVENT_POLL_COUNT_MAX];
320-
size_t event_cnt = 0;
321-
if (poll((ConnectionEvent *)&events, &event_cnt, timeout) != 0) {
281+
struct epoll_event events[EPOLL_EVENTS_MAX];
282+
int event_cnt = epoll_wait(m_EpollFd, events, EPOLL_EVENTS_MAX, timeout);
283+
if (event_cnt < 0) {
322284
//Poll error doesn't belong to any connection so just global
323285
//log it.
324286
LOG_ERROR("Poll failed: ", strerror(errno));
325287
return -1;
326288
}
327-
for (size_t i = 0; i < event_cnt; ++i) {
328-
assert(m_Connections.find(events[i].sock) != m_Connections.end());
329-
if ((events[i].event & EPOLLIN) != 0) {
330-
Conn_t conn = m_Connections.find(events[i].sock)->second;
331-
assert(conn.getSocket() == events[i].sock);
289+
for (int i = 0; i < event_cnt; ++i) {
290+
Conn_t conn((typename Conn_t::Impl_t *)events[i].data.ptr);
291+
if ((events[i].events & EPOLLIN) != 0) {
332292
LOG_DEBUG("Registered poll event ", i, ": ",
333293
conn.getSocket(), " socket is ready to read");
334294
/*
@@ -342,9 +302,7 @@ EpollNetProvider<BUFFER, NETWORK>::wait(int timeout)
342302
m_Connector.readyToDecode(conn);
343303
}
344304

345-
if ((events[i].event & EPOLLOUT) != 0) {
346-
Conn_t conn = m_Connections.find(events[i].sock)->second;
347-
assert(conn.getSocket() == events[i].sock);
305+
if ((events[i].events & EPOLLOUT) != 0) {
348306
LOG_DEBUG("Registered poll event ", i, ": ",
349307
conn.getSocket(), " socket is ready to write");
350308
int rc = send(conn);
@@ -353,7 +311,7 @@ EpollNetProvider<BUFFER, NETWORK>::wait(int timeout)
353311
/* All data from connection has been successfully written. */
354312
if (rc == 0) {
355313
m_Connector.finishSend(conn);
356-
setPollSetting(conn.getSocket(), EPOLLIN);
314+
setPollSetting(conn, EPOLLIN);
357315
}
358316
}
359317
}

src/Client/LibevNetProvider.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ class LibevNetProvider {
8888
LibevNetProvider(Connector_t &connector, struct ev_loop *loop = nullptr);
8989
int connect(Conn_t &conn, const std::string_view& addr, unsigned port,
9090
size_t timeout);
91-
void close(int socket);
91+
void close(Conn_t &conn);
9292
int wait(int timeout);
9393
bool check(Conn_t &conn);
9494

@@ -316,8 +316,9 @@ LibevNetProvider<BUFFER, NETWORK>::connect(Conn_t &conn,
316316

317317
template<class BUFFER, class NETWORK>
318318
void
319-
LibevNetProvider<BUFFER, NETWORK>::close(int socket)
319+
LibevNetProvider<BUFFER, NETWORK>::close(Conn_t &conn)
320320
{
321+
int socket = conn.getSocket();
321322
NETWORK::close(socket);
322323
//close can be called during libev provider destruction. In this case
323324
//all connections staying alive only due to the presence in m_Watchers

0 commit comments

Comments
 (0)