Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions examples/postgres/pooled-with-rustls/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,8 @@ fn establish_connection(config: &str) -> BoxFuture<ConnectionResult<AsyncPgConne
let (client, conn) = tokio_postgres::connect(config, tls)
.await
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("Database connection: {e}");
}
});
AsyncPgConnection::try_from(client).await

AsyncPgConnection::try_from_client_and_connection(client, conn).await
};
fut.boxed()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,7 @@ fn establish_connection(config: &str) -> BoxFuture<ConnectionResult<AsyncPgConne
let (client, conn) = tokio_postgres::connect(config, tls)
.await
.map_err(|e| ConnectionError::BadConnection(e.to_string()))?;
tokio::spawn(async move {
if let Err(e) = conn.await {
eprintln!("Database connection: {e}");
}
});
AsyncPgConnection::try_from(client).await
AsyncPgConnection::try_from_client_and_connection(client, conn).await
};
fut.boxed()
}
Expand Down
75 changes: 64 additions & 11 deletions src/pg/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ const FAKE_OID: u32 = 0;
///
/// [tokio_postgres]: https://docs.rs/tokio-postgres/0.7.6/tokio_postgres/config/struct.Config.html#url
///
/// ## Pipelining
///
/// This connection supports *pipelined* requests. Pipelining can improve performance in use cases in which multiple,
/// independent queries need to be executed. In a traditional workflow, each query is sent to the server after the
/// previous query completes. In contrast, pipelining allows the client to send all of the queries to the server up
Expand Down Expand Up @@ -106,6 +108,18 @@ const FAKE_OID: u32 = 0;
/// assert_eq!(res.1, 2);
/// # Ok(())
/// # }
/// ```
///
/// ## TLS
///
/// Connections created by [`AsyncPgConnection::establish`] do not support TLS.
///
/// TLS support for tokio_postgres connections is implemented by external crates, e.g. [tokio_postgres_rustls].
///
/// [`AsyncPgConnection::try_from_client_and_connection`] can be used to construct a connection from an existing
/// [`tokio_postgres::Connection`] with TLS enabled.
///
/// [tokio_postgres_rustls]: https://docs.rs/tokio-postgres-rustls/0.12.0/tokio_postgres_rustls/
pub struct AsyncPgConnection {
conn: Arc<tokio_postgres::Client>,
stmt_cache: Arc<Mutex<StmtCache<diesel::pg::Pg, Statement>>>,
Expand Down Expand Up @@ -156,24 +170,17 @@ impl AsyncConnection for AsyncPgConnection {
let (client, connection) = tokio_postgres::connect(database_url, tokio_postgres::NoTls)
.await
.map_err(ErrorHelper)?;
let (tx, rx) = tokio::sync::broadcast::channel(1);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();
tokio::spawn(async move {
match futures_util::future::select(shutdown_rx, connection).await {
Either::Left(_) | Either::Right((Ok(_), _)) => {}
Either::Right((Err(e), _)) => {
let _ = tx.send(Arc::new(e));
}
}
});

let (error_rx, shutdown_tx) = drive_connection(connection);

let r = Self::setup(
client,
Some(rx),
Some(error_rx),
Some(shutdown_tx),
Arc::clone(&instrumentation),
)
.await;

instrumentation
.lock()
.unwrap_or_else(|e| e.into_inner())
Expand Down Expand Up @@ -367,6 +374,28 @@ impl AsyncPgConnection {
.await
}

/// Constructs a new `AsyncPgConnection` from an existing [`tokio_postgres::Client`] and
/// [`tokio_postgres::Connection`]
pub async fn try_from_client_and_connection<S>(
client: tokio_postgres::Client,
conn: tokio_postgres::Connection<tokio_postgres::Socket, S>,
) -> ConnectionResult<Self>
where
S: tokio_postgres::tls::TlsStream + Unpin + Send + 'static,
{
let (error_rx, shutdown_tx) = drive_connection(conn);

Self::setup(
client,
Some(error_rx),
Some(shutdown_tx),
Arc::new(std::sync::Mutex::new(
diesel::connection::get_default_instrumentation(),
)),
)
.await
}

async fn setup(
conn: tokio_postgres::Client,
connection_future: Option<broadcast::Receiver<Arc<tokio_postgres::Error>>>,
Expand Down Expand Up @@ -826,6 +855,30 @@ async fn drive_future<R>(
}
}

fn drive_connection<S>(
conn: tokio_postgres::Connection<tokio_postgres::Socket, S>,
) -> (
broadcast::Receiver<Arc<tokio_postgres::Error>>,
oneshot::Sender<()>,
)
where
S: tokio_postgres::tls::TlsStream + Unpin + Send + 'static,
{
let (error_tx, error_rx) = tokio::sync::broadcast::channel(1);
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();

tokio::spawn(async move {
match futures_util::future::select(shutdown_rx, conn).await {
Either::Left(_) | Either::Right((Ok(_), _)) => {}
Either::Right((Err(e), _)) => {
let _ = error_tx.send(Arc::new(e));
}
}
});

(error_rx, shutdown_tx)
}

#[cfg(any(
feature = "deadpool",
feature = "bb8",
Expand Down