Skip to content

Commit fd3a99c

Browse files
committed
Don't spawn off connection in blocking impl
We can now directly return fatal errors, and intercept notifications
1 parent 928a716 commit fd3a99c

File tree

11 files changed

+219
-142
lines changed

11 files changed

+219
-142
lines changed

postgres/src/binary_copy.rs

+13-9
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
//! Utilities for working with the PostgreSQL binary copy format.
22
3+
use crate::connection::ConnectionRef;
34
use crate::types::{ToSql, Type};
4-
use crate::{CopyInWriter, CopyOutReader, Error, Rt};
5+
use crate::{CopyInWriter, CopyOutReader, Error};
56
use fallible_iterator::FallibleIterator;
67
use futures::StreamExt;
78
use std::pin::Pin;
@@ -13,7 +14,7 @@ use tokio_postgres::binary_copy::{self, BinaryCopyOutStream};
1314
///
1415
/// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
1516
pub struct BinaryCopyInWriter<'a> {
16-
runtime: Rt<'a>,
17+
connection: ConnectionRef<'a>,
1718
sink: Pin<Box<binary_copy::BinaryCopyInWriter>>,
1819
}
1920

@@ -26,7 +27,7 @@ impl<'a> BinaryCopyInWriter<'a> {
2627
.expect("writer has already been written to");
2728

2829
BinaryCopyInWriter {
29-
runtime: writer.runtime,
30+
connection: writer.connection,
3031
sink: Box::pin(binary_copy::BinaryCopyInWriter::new(stream, types)),
3132
}
3233
}
@@ -37,7 +38,7 @@ impl<'a> BinaryCopyInWriter<'a> {
3738
///
3839
/// Panics if the number of values provided does not match the number expected.
3940
pub fn write(&mut self, values: &[&(dyn ToSql + Sync)]) -> Result<(), Error> {
40-
self.runtime.block_on(self.sink.as_mut().write(values))
41+
self.connection.block_on(self.sink.as_mut().write(values))
4142
}
4243

4344
/// A maximally-flexible version of `write`.
@@ -50,20 +51,21 @@ impl<'a> BinaryCopyInWriter<'a> {
5051
I: IntoIterator<Item = &'b dyn ToSql>,
5152
I::IntoIter: ExactSizeIterator,
5253
{
53-
self.runtime.block_on(self.sink.as_mut().write_raw(values))
54+
self.connection
55+
.block_on(self.sink.as_mut().write_raw(values))
5456
}
5557

5658
/// Completes the copy, returning the number of rows added.
5759
///
5860
/// This method *must* be used to complete the copy process. If it is not, the copy will be aborted.
5961
pub fn finish(mut self) -> Result<u64, Error> {
60-
self.runtime.block_on(self.sink.as_mut().finish())
62+
self.connection.block_on(self.sink.as_mut().finish())
6163
}
6264
}
6365

6466
/// An iterator of rows deserialized from the PostgreSQL binary copy format.
6567
pub struct BinaryCopyOutIter<'a> {
66-
runtime: Rt<'a>,
68+
connection: ConnectionRef<'a>,
6769
stream: Pin<Box<BinaryCopyOutStream>>,
6870
}
6971

@@ -76,7 +78,7 @@ impl<'a> BinaryCopyOutIter<'a> {
7678
.expect("reader has already been read from");
7779

7880
BinaryCopyOutIter {
79-
runtime: reader.runtime,
81+
connection: reader.connection,
8082
stream: Box::pin(BinaryCopyOutStream::new(stream, types)),
8183
}
8284
}
@@ -87,6 +89,8 @@ impl FallibleIterator for BinaryCopyOutIter<'_> {
8789
type Error = Error;
8890

8991
fn next(&mut self) -> Result<Option<BinaryCopyOutRow>, Error> {
90-
self.runtime.block_on(self.stream.next()).transpose()
92+
let stream = &mut self.stream;
93+
self.connection
94+
.block_on(async { stream.next().await.transpose() })
9195
}
9296
}

postgres/src/client.rs

+23-49
Original file line numberDiff line numberDiff line change
@@ -1,45 +1,21 @@
1+
use crate::connection::Connection;
12
use crate::{
23
CancelToken, Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction,
34
TransactionBuilder,
45
};
5-
use std::ops::{Deref, DerefMut};
6-
use tokio::runtime::Runtime;
76
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
87
use tokio_postgres::types::{ToSql, Type};
98
use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket};
109

11-
pub(crate) struct Rt<'a>(pub &'a mut Runtime);
12-
13-
// no-op impl to extend the borrow until drop
14-
impl Drop for Rt<'_> {
15-
fn drop(&mut self) {}
16-
}
17-
18-
impl Deref for Rt<'_> {
19-
type Target = Runtime;
20-
21-
#[inline]
22-
fn deref(&self) -> &Runtime {
23-
self.0
24-
}
25-
}
26-
27-
impl DerefMut for Rt<'_> {
28-
#[inline]
29-
fn deref_mut(&mut self) -> &mut Runtime {
30-
self.0
31-
}
32-
}
33-
3410
/// A synchronous PostgreSQL client.
3511
pub struct Client {
36-
runtime: Runtime,
12+
connection: Connection,
3713
client: tokio_postgres::Client,
3814
}
3915

4016
impl Client {
41-
pub(crate) fn new(runtime: Runtime, client: tokio_postgres::Client) -> Client {
42-
Client { runtime, client }
17+
pub(crate) fn new(connection: Connection, client: tokio_postgres::Client) -> Client {
18+
Client { connection, client }
4319
}
4420

4521
/// A convenience function which parses a configuration string into a `Config` and then connects to the database.
@@ -62,10 +38,6 @@ impl Client {
6238
Config::new()
6339
}
6440

65-
fn rt(&mut self) -> Rt<'_> {
66-
Rt(&mut self.runtime)
67-
}
68-
6941
/// Executes a statement, returning the number of rows modified.
7042
///
7143
/// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
@@ -104,7 +76,7 @@ impl Client {
10476
where
10577
T: ?Sized + ToStatement,
10678
{
107-
self.runtime.block_on(self.client.execute(query, params))
79+
self.connection.block_on(self.client.execute(query, params))
10880
}
10981

11082
/// Executes a statement, returning the resulting rows.
@@ -140,7 +112,7 @@ impl Client {
140112
where
141113
T: ?Sized + ToStatement,
142114
{
143-
self.runtime.block_on(self.client.query(query, params))
115+
self.connection.block_on(self.client.query(query, params))
144116
}
145117

146118
/// Executes a statement which returns a single row, returning it.
@@ -177,7 +149,8 @@ impl Client {
177149
where
178150
T: ?Sized + ToStatement,
179151
{
180-
self.runtime.block_on(self.client.query_one(query, params))
152+
self.connection
153+
.block_on(self.client.query_one(query, params))
181154
}
182155

183156
/// Executes a statement which returns zero or one rows, returning it.
@@ -223,7 +196,8 @@ impl Client {
223196
where
224197
T: ?Sized + ToStatement,
225198
{
226-
self.runtime.block_on(self.client.query_opt(query, params))
199+
self.connection
200+
.block_on(self.client.query_opt(query, params))
227201
}
228202

229203
/// A maximally-flexible version of `query`.
@@ -289,9 +263,9 @@ impl Client {
289263
I::IntoIter: ExactSizeIterator,
290264
{
291265
let stream = self
292-
.runtime
266+
.connection
293267
.block_on(self.client.query_raw(query, params))?;
294-
Ok(RowIter::new(self.rt(), stream))
268+
Ok(RowIter::new(self.connection.as_ref(), stream))
295269
}
296270

297271
/// Creates a new prepared statement.
@@ -318,7 +292,7 @@ impl Client {
318292
/// # }
319293
/// ```
320294
pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
321-
self.runtime.block_on(self.client.prepare(query))
295+
self.connection.block_on(self.client.prepare(query))
322296
}
323297

324298
/// Like `prepare`, but allows the types of query parameters to be explicitly specified.
@@ -349,7 +323,7 @@ impl Client {
349323
/// # }
350324
/// ```
351325
pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
352-
self.runtime
326+
self.connection
353327
.block_on(self.client.prepare_typed(query, types))
354328
}
355329

@@ -380,8 +354,8 @@ impl Client {
380354
where
381355
T: ?Sized + ToStatement,
382356
{
383-
let sink = self.runtime.block_on(self.client.copy_in(query))?;
384-
Ok(CopyInWriter::new(self.rt(), sink))
357+
let sink = self.connection.block_on(self.client.copy_in(query))?;
358+
Ok(CopyInWriter::new(self.connection.as_ref(), sink))
385359
}
386360

387361
/// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
@@ -408,8 +382,8 @@ impl Client {
408382
where
409383
T: ?Sized + ToStatement,
410384
{
411-
let stream = self.runtime.block_on(self.client.copy_out(query))?;
412-
Ok(CopyOutReader::new(self.rt(), stream))
385+
let stream = self.connection.block_on(self.client.copy_out(query))?;
386+
Ok(CopyOutReader::new(self.connection.as_ref(), stream))
413387
}
414388

415389
/// Executes a sequence of SQL statements using the simple query protocol.
@@ -428,7 +402,7 @@ impl Client {
428402
/// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass
429403
/// them to this method!
430404
pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
431-
self.runtime.block_on(self.client.simple_query(query))
405+
self.connection.block_on(self.client.simple_query(query))
432406
}
433407

434408
/// Executes a sequence of SQL statements using the simple query protocol.
@@ -442,7 +416,7 @@ impl Client {
442416
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
443417
/// them to this method!
444418
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
445-
self.runtime.block_on(self.client.batch_execute(query))
419+
self.connection.block_on(self.client.batch_execute(query))
446420
}
447421

448422
/// Begins a new database transaction.
@@ -466,8 +440,8 @@ impl Client {
466440
/// # }
467441
/// ```
468442
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
469-
let transaction = self.runtime.block_on(self.client.transaction())?;
470-
Ok(Transaction::new(&mut self.runtime, transaction))
443+
let transaction = self.connection.block_on(self.client.transaction())?;
444+
Ok(Transaction::new(self.connection.as_ref(), transaction))
471445
}
472446

473447
/// Returns a builder for a transaction with custom settings.
@@ -494,7 +468,7 @@ impl Client {
494468
/// # }
495469
/// ```
496470
pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
497-
TransactionBuilder::new(&mut self.runtime, self.client.build_transaction())
471+
TransactionBuilder::new(self.connection.as_ref(), self.client.build_transaction())
498472
}
499473

500474
/// Constructs a cancellation token that can later be used to request

postgres/src/config.rs

+3-11
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,8 @@
22
//!
33
//! Requires the `runtime` Cargo feature (enabled by default).
44
5+
use crate::connection::Connection;
56
use crate::Client;
6-
use futures::FutureExt;
7-
use log::error;
87
use std::fmt;
98
use std::path::Path;
109
use std::str::FromStr;
@@ -324,15 +323,8 @@ impl Config {
324323

325324
let (client, connection) = runtime.block_on(self.config.connect(tls))?;
326325

327-
// FIXME don't spawn this so error reporting is less weird.
328-
let connection = connection.map(|r| {
329-
if let Err(e) = r {
330-
error!("postgres connection error: {}", e)
331-
}
332-
});
333-
runtime.spawn(connection);
334-
335-
Ok(Client::new(runtime, client))
326+
let connection = Connection::new(runtime, connection);
327+
Ok(Client::new(connection, client))
336328
}
337329
}
338330

0 commit comments

Comments
 (0)