Skip to content

Commit 0681908

Browse files
committed
added sync version of connect & exec using std::promise wrappers around async versions
1 parent f6596d6 commit 0681908

File tree

2 files changed

+66
-13
lines changed

2 files changed

+66
-13
lines changed

postgres_asio/postgres_asio.cpp

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1+
#include <future>
12
#include <boost/bind.hpp>
23
#include "postgres_asio.h"
3-
44
#include <boost/log/core.hpp>
55
#include <boost/log/trivial.hpp>
66
#include <boost/log/expressions.hpp>
@@ -68,6 +68,30 @@ namespace postgres_asio
6868
_bg_ios.post(boost::bind(&connection::_bg_connect, this, self, "", cb));
6969
}
7070

71+
int connection::connect(std::string connect_string)
72+
{
73+
std::promise<int> p;
74+
std::future<int> f = p.get_future();
75+
connect(connect_string, [&p](int ec)
76+
{
77+
p.set_value(ec);
78+
});
79+
f.wait();
80+
return f.get();
81+
}
82+
83+
int connection::connect()
84+
{
85+
std::promise<int> p;
86+
std::future<int> f = p.get_future();
87+
connect([&p](int ec)
88+
{
89+
p.set_value(ec);
90+
});
91+
f.wait();
92+
return f.get();
93+
}
94+
7195
// connect syncrounous and run callcack from fg thread event loop
7296
void connection::_bg_connect(boost::shared_ptr<connection> self, std::string connect_string, on_connect_callback cb)
7397
{
@@ -77,12 +101,17 @@ namespace postgres_asio
77101
int32_t duration = (int32_t)(now() - _start_ts);
78102
if (status == CONNECTION_OK)
79103
{
80-
BOOST_LOG_TRIVIAL(info) << _log_id << ", postgres::connect(async) PQconnectdb complete, t=" << duration;
104+
if (duration > _warn_timeout)
105+
{
106+
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::connect - took long time, t=" << duration;
107+
}
108+
109+
BOOST_LOG_TRIVIAL(info) << _log_id << ", postgres::connect PQconnectdb complete, t=" << duration;
81110
_socket.assign(boost::asio::ip::tcp::v4(), socket());
82111
_fg_ios.post([this, self, cb](){ cb(0); });
83112
return;
84113
}
85-
BOOST_LOG_TRIVIAL(error) << _log_id << ", postgres::connect(async) PQconnectdb failed, status=" << status << ", t=" << duration;
114+
BOOST_LOG_TRIVIAL(error) << _log_id << ", postgres::connect PQconnectdb failed, status=" << status << ", t=" << duration;
86115
_fg_ios.post([this, self, status, cb](){ cb(status); });
87116
}
88117

@@ -94,27 +123,40 @@ namespace postgres_asio
94123
_current_statement = statement;
95124
if (PQsendQuery(_pg_conn, statement.c_str()) == 0) // 1 os good, 0 is bad...
96125
{
97-
BOOST_LOG_TRIVIAL(error) << _log_id << ", postgres::exec(async) PQsendQuery failed fast: s=" << statement.substr(0, STATEMENT_LOG_BYTES);
126+
BOOST_LOG_TRIVIAL(error) << _log_id << ", postgres::exec PQsendQuery failed fast: s=" << statement.substr(0, STATEMENT_LOG_BYTES);
98127
_fg_ios.post([this, self, cb](){ cb(PGRES_FATAL_ERROR, NULL); });
99128
return;
100129
}
101130
_socket.async_read_some(boost::asio::null_buffers(), boost::bind(&connection::_fg_socket_rx_cb, this, boost::asio::placeholders::error, self, cb));
102131
}
103132

133+
std::pair<int, boost::shared_ptr<PGresult>> connection::exec(std::string statement)
134+
{
135+
std::promise<std::pair<int, boost::shared_ptr<PGresult>>> p;
136+
std::future<std::pair<int, boost::shared_ptr<PGresult>>> f = p.get_future();
137+
exec(statement, [&p](int ec, boost::shared_ptr<PGresult> res)
138+
{
139+
std::pair<int, boost::shared_ptr<PGresult>> val(ec, res);
140+
p.set_value(val);
141+
});
142+
f.wait();
143+
return f.get();
144+
}
145+
104146
void connection::_fg_socket_rx_cb(const boost::system::error_code& ec, boost::shared_ptr<connection> self, on_query_callback cb)
105147
{
106148
BOOST_LOG_TRIVIAL(trace) << _log_id << ", " << BOOST_CURRENT_FUNCTION;
107149
if (ec)
108150
{
109-
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::exec(async) asio ec:" << ec.message();
151+
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::exec asio ec:" << ec.message();
110152
cb(ec.value(), NULL);
111153
return;
112154
}
113155

114156
int res = PQconsumeInput(_pg_conn);
115157
if (!res)
116158
{
117-
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::exec(async) PQconsumeInput read error";
159+
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::exec PQconsumeInput read error";
118160
cb(PGRES_FATAL_ERROR, NULL); // we reuse a error code here...
119161
return;
120162
}
@@ -145,7 +187,7 @@ namespace postgres_asio
145187

146188
if (_results.size() == 0)
147189
{
148-
BOOST_LOG_TRIVIAL(error) << _log_id << ", postgres::exec(async) returned no result, t=" << duration << ", s=" << _current_statement.substr(0, STATEMENT_LOG_BYTES);
190+
BOOST_LOG_TRIVIAL(error) << _log_id << ", postgres::exec returned no result, t=" << duration << ", s=" << _current_statement.substr(0, STATEMENT_LOG_BYTES);
149191
cb(PGRES_FATAL_ERROR, NULL); // we reuse a error code here...
150192
return;
151193
}
@@ -164,20 +206,20 @@ namespace postgres_asio
164206
case PGRES_NONFATAL_ERROR:
165207
case PGRES_COPY_BOTH:
166208
case PGRES_SINGLE_TUPLE:
167-
BOOST_LOG_TRIVIAL(debug) << _log_id << ", postgres::exec(async) complete, t=" << duration << ", s=" << _current_statement.substr(0, STATEMENT_LOG_BYTES);
209+
BOOST_LOG_TRIVIAL(debug) << _log_id << ", postgres::exec complete, t=" << duration << ", s=" << _current_statement.substr(0, STATEMENT_LOG_BYTES);
168210
if (duration > _warn_timeout)
169211
{
170-
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::exec(async) complete - took long time, t=" << duration << ", s = " << _current_statement.substr(0, STATEMENT_LOG_BYTES);
212+
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::exec complete - took long time, t=" << duration << ", s = " << _current_statement.substr(0, STATEMENT_LOG_BYTES);
171213
}
172214
cb(0, std::move(last_result));
173215
break;
174216
case PGRES_BAD_RESPONSE:
175217
case PGRES_FATAL_ERROR:
176-
BOOST_LOG_TRIVIAL(error) << _log_id << ", postgres::exec(async) failed, t=" << duration << ", s=" << _current_statement.substr(0, STATEMENT_LOG_BYTES);
218+
BOOST_LOG_TRIVIAL(error) << _log_id << ", postgres::exec failed, t=" << duration << ", s=" << _current_statement.substr(0, STATEMENT_LOG_BYTES);
177219
cb(status, std::move(last_result));
178220
break;
179221
default:
180-
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::exec(async) unknown status code, t=" << duration << ", s=" << _current_statement.substr(0, STATEMENT_LOG_BYTES);
222+
BOOST_LOG_TRIVIAL(warning) << _log_id << ", postgres::exec unknown status code, t=" << duration << ", s=" << _current_statement.substr(0, STATEMENT_LOG_BYTES);
181223
cb(status, std::move(last_result));
182224
break;
183225
}

postgres_asio/postgres_asio.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
#pragma once
2-
2+
#include <utility>
33
#include <boost/asio.hpp>
44
#include <boost/function.hpp>
55
#include <boost/chrono/system_clocks.hpp>
@@ -36,11 +36,19 @@ namespace postgres_asio
3636
password -- password used if the backend demands password authentication.
3737
options -- trace/debug options to send to backend.
3838
tty -- file or tty for optional debug output from backend.
39+
40+
async connect
3941
*/
4042
void connect(std::string connect_string, on_connect_callback cb);
43+
//async connect
4144
void connect(on_connect_callback cb);
4245

43-
//status
46+
//sync connect
47+
int connect(std::string connect_string);
48+
//sync connect
49+
int connect();
50+
51+
//status (non blocking)
4452
std::string user_name() const;
4553
std::string password() const;
4654
std::string host_name() const;
@@ -55,7 +63,10 @@ namespace postgres_asio
5563
std::string get_log_id() const;
5664
void set_warning_timout(uint32_t ms);
5765

66+
//async exec
5867
void exec(std::string statement, on_query_callback cb);
68+
//sync exec
69+
std::pair<int, boost::shared_ptr<PGresult>> exec(std::string statement);
5970
private:
6071
int socket() const;
6172

0 commit comments

Comments
 (0)