Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,117 changes: 547 additions & 570 deletions Cargo.lock

Large diffs are not rendered by default.

36 changes: 16 additions & 20 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[package]
name = "hyperdriver"
version = "0.11.2"
version = "0.12.0-rc2"
edition = "2021"
description = "The missing middle for Hyper - Servers and Clients with ergonomic APIs"
license = "MIT"
repository = "https://github.com/alexrudy/hyperdriver"
documentation = "https://docs.rs/hyperdriver"
rust-version = "1.76"
rust-version = "1.87"
readme = "README.md"
authors = ["Alex Rudy <[email protected]>"]
categories = [
Expand All @@ -32,28 +32,25 @@ futures-util = "0.3"
http = { version = "1" }
http-body = { version = "1" }
http-body-util = { version = "0.1" }
hyper = { version = "1", features = ["full"] }
hyper = { version = "1.7", features = ["full"] }
ouroboros = { version = "0.18", optional = true }
parking_lot = { version = "0.12", optional = true, features = ["arc_lock"] }
pin-project = { version = "1" }
rustls-native-certs = { version = "0.8.1", optional = true }
socket2 = { version = "0.5", optional = true }
thiserror = { version = "2", optional = true }
thiserror = { version = "2" }
tokio = { version = "1", features = ["full"] }
tracing = { version = "^0.1" }

[dependencies.chateau]
version = "0.2.0-rc2"
features = ["client", "server", "duplex"]

[dependencies.rustls]
version = ">=0.23.18" # RUSTSEC-2024-0399 is fixed in 0.23.18
features = ["tls12"]
default-features = false
optional = true

[dependencies.tokio-rustls]
version = "0.26"
features = ["tls12"]
default-features = false
optional = true

[dependencies.tower]
version = "0.5"
features = ["make", "util"]
Expand All @@ -79,28 +76,27 @@ static-assertions = { version = "1", package = "static_assertions" }
tempfile = "3"
tracing-opentelemetry = "0.31"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
tracing-test = { version = "0.2", features = [] }
webpki-roots.version = "1.0"

[features]
axum = ["dep:axum"]
client = [
"incoming",
"dep:parking_lot",
"dep:socket2",
"dep:thiserror",
"dep:tower-http",
"tower/timeout",
]
default = ["client", "server", "stream"]
docs = []
incoming = []
mocks = []
server = ["incoming", "dep:ouroboros", "dep:thiserror"]
server = ["incoming", "dep:ouroboros"]
sni = []
stream = []
tls = ["dep:rustls-native-certs", "dep:rustls", "dep:tokio-rustls"]
tls-aws-lc = ["rustls/aws_lc_rs", "tokio-rustls/aws_lc_rs"]
tls-ring = ["rustls/ring", "tokio-rustls/ring"]
tls = ["dep:rustls-native-certs", "dep:rustls", "chateau/tls"]
tls-aws-lc = ["rustls/aws_lc_rs", "chateau/tls-aws-lc"]
tls-ring = ["rustls/ring", "chateau/tls-ring"]

[[example]]
name = "google"
Expand All @@ -125,12 +121,12 @@ required-features = ["client", "tls", "tls-ring"]
[[example]]
name = "h2-server"
path = "examples/server/h2.rs"
required-features = ["server", "tls", "tls-ring"]
required-features = ["server", "stream", "tls", "tls-ring"]

[[example]]
name = "single-threaded"
path = "examples/single_threaded.rs"
required-features = ["server", "client"]
required-features = ["server", "client", "tls", "tls-ring"]


[[test]]
Expand Down Expand Up @@ -191,7 +187,7 @@ required-features = ["server", "stream"]
[[test]]
name = "tls"
path = "tests/tls.rs"
required-features = ["client", "server", "tls", "tls-ring"]
required-features = ["client", "server", "stream", "tls", "tls-ring"]

[[test]]
name = "upgrades"
Expand Down
3 changes: 1 addition & 2 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ wildcards = "allow"
highlight = "all"
workspace-default-features = "allow"
external-default-features = "allow"
skip-tree = ["windows-targets", "windows-sys"]
skip = ["sync_wrapper"]
skip-tree = ["windows-targets", "windows-sys", "aws-lc-sys"]

[sources]
unknown-registry = "warn"
Expand Down
5 changes: 4 additions & 1 deletion examples/server/h2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
use std::convert::Infallible;
use std::net::{SocketAddr, SocketAddrV4};

use hyperdriver::server::ServerAcceptorExt;
use hyperdriver::server::ServerConnectionInfoExt;
use hyperdriver::server::ServerProtocolExt;
use hyperdriver::Body;
use tracing_subscriber::EnvFilter;

Expand Down Expand Up @@ -61,7 +64,7 @@ async fn main() {

let server = hyperdriver::server::Server::builder()
.with_incoming(incoming)
.with_tls(tls_config("localhost"))
.with_tls(tls_config("localhost").into())
.with_shared_service(svc)
.with_connection_info()
.with_tls_connection_info()
Expand Down
63 changes: 40 additions & 23 deletions examples/single_threaded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,35 +5,36 @@
use std::cell::Cell;
use std::future::Future;
use std::marker::PhantomData;
use std::net::SocketAddr;
use std::net::{SocketAddr, ToSocketAddrs as _};
use std::pin::{pin, Pin};
use std::rc::Rc;
use std::task::{ready, Context, Poll};
use std::thread;

use chateau::client::conn::service::ClientExecutorService;
use chateau::client::conn::transport::tcp::{TcpConnectionError, TcpTransport};
use chateau::client::conn::Transport;
use chateau::services::make_service_fn;
use chateau::stream::tcp::TcpStream;
use futures_util::FutureExt;
use http_body_util::BodyExt;
use hyper::body::{Body as HttpBody, Bytes, Frame, Incoming};
use hyper::Request;
use hyper::{Error, Response};
use hyperdriver::bridge::io::TokioIo;
use hyperdriver::bridge::service::TowerHyperService;
use hyperdriver::client::conn::dns::GaiResolver;
use hyperdriver::client::conn::protocol::auto;
use hyperdriver::client::conn::protocol::auto::HttpConnection;
use hyperdriver::client::conn::transport::tcp::{TcpConnectionError, TcpTransport};
use hyperdriver::client::conn::transport::TransportExt;
use hyperdriver::client::conn::Transport;
use hyperdriver::client::pool::{Pooled, UriKey};
use hyperdriver::client::ConnectionPoolLayer;
use hyperdriver::client::conn::AutoTlsTransport;
use hyperdriver::client::{ConnectionPoolLayer, UriKey};
use hyperdriver::info::HasConnectionInfo;
use hyperdriver::server::Accept;
use hyperdriver::service::{make_service_fn, RequestExecutor};
use hyperdriver::stream::TcpStream;
use pin_project::pin_project;
use tokio::io::{self, AsyncWriteExt};
use tokio::net::TcpListener;
use tokio::sync::oneshot;
use tower::service_fn;
use tower::ServiceExt;

struct Body {
// Our Body type is !Send and !Sync:
Expand Down Expand Up @@ -183,7 +184,8 @@ async fn http1_client(
let host = url.host().expect("uri has no host");
let port = url.port_u16().unwrap_or(80);
let addr = format!("{host}:{port}");
let stream = TcpStream::connect(addr).await?;
let stream =
TcpStream::connect(addr.to_socket_addrs()?.next().expect("No resolved address")).await?;

let io = TokioIo::new(IOTypeNotSend::new(stream));

Expand Down Expand Up @@ -239,7 +241,7 @@ async fn http1_client(
}

async fn http2_server(rx: oneshot::Receiver<()>) -> Result<(), Box<dyn std::error::Error>> {
use hyper::server::conn::http2;
use hyperdriver::server::conn::Http2Builder;

let mut stdout = io::stdout();

Expand All @@ -257,7 +259,7 @@ async fn http2_server(rx: oneshot::Receiver<()>) -> Result<(), Box<dyn std::erro

let server = hyperdriver::Server::builder()
.with_acceptor(AcceptNotSend::new(listener))
.with_protocol(http2::Builder::new(LocalExec))
.with_protocol(Http2Builder::new(LocalExec))
.with_make_service(make_service_fn(|_| {
let counter = counter.clone();
async move {
Expand Down Expand Up @@ -294,12 +296,14 @@ async fn http2_client(
let client = tower::ServiceBuilder::new()
.layer(
ConnectionPoolLayer::<_, _, _, UriKey>::new(
TcpTransport::<_, TcpStream>::default().without_tls(),
auto::HttpConnectionBuilder::<Body>::default(),
AutoTlsTransport::new(TransportNotSend {
tcp: TcpTransport::<GaiResolver>::default(),
}),
auto::AlpnHttpConnectionBuilder::<Body>::default(),
)
.with_optional_pool(Some(Default::default())),
)
.service(RequestExecutor::<Pooled<HttpConnection<Body>, Body>, _>::new());
.service(ClientExecutorService::new());

let authority = url.authority().unwrap().clone();

Expand All @@ -311,7 +315,7 @@ async fn http2_client(
.header(http::header::HOST, authority.as_str())
.body(Body::from("test".to_string()))?;

let mut res = client.request(req).await?;
let mut res = client.clone().oneshot(req).await?;

let mut stdout = io::stdout();
stdout
Expand Down Expand Up @@ -353,6 +357,16 @@ where
}
}

impl<F> chateau::rt::Executor<F> for LocalExec
where
F: std::future::Future + 'static,
F::Output: 'static,
{
fn execute(&self, fut: F) {
tokio::task::spawn_local(fut);
}
}

#[derive(Debug)]
#[pin_project]
struct AcceptNotSend(#[pin] TcpListener);
Expand All @@ -364,40 +378,43 @@ impl AcceptNotSend {
}

impl Accept for AcceptNotSend {
type Conn = IOTypeNotSend;
type Connection = IOTypeNotSend;

type Error = std::io::Error;

fn poll_accept(
self: std::pin::Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Self::Conn, Self::Error>> {
) -> Poll<Result<Self::Connection, Self::Error>> {
let stream = ready!(self.project().0.poll_accept(cx)).map(IOTypeNotSend::new);
Poll::Ready(stream)
}
}

#[derive(Clone)]
struct TransportNotSend {
tcp: TcpTransport,
tcp: TcpTransport<GaiResolver>,
}

impl Transport for TransportNotSend {
impl<B> Transport<http::Request<B>> for TransportNotSend
where
B: Send + 'static,
{
type IO = TcpStream;

type Error = TcpConnectionError;

type Future = Pin<Box<dyn Future<Output = Result<Self::IO, Self::Error>> + Send>>;

fn connect(&mut self, req: http::request::Parts) -> <Self as Transport>::Future {
fn connect(&mut self, req: &http::Request<B>) -> Self::Future {
self.tcp.connect(req).boxed()
}

fn poll_ready(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), <Self as Transport>::Error>> {
self.tcp.poll_ready(cx)
) -> std::task::Poll<Result<(), Self::Error>> {
Transport::<http::Request<B>>::poll_ready(&mut self.tcp, cx)
}
}

Expand Down
9 changes: 4 additions & 5 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@


nightly := "nightly-2025-06-20"
msrv := "1.76"
msrv := "1.87"
rust := env("RUSTUP_TOOLCHAIN", "stable")

# Run all checks
Expand Down Expand Up @@ -39,11 +39,11 @@ cargo-hack-args := "--target-dir target/hack/"

[private]
check-hack-each:
cargo +{{rust}} hack check {{cargo-hack-args}} --each-feature
cargo +{{rust}} hack check {{cargo-hack-args}} --each-feature --skip tls,tls-ring,tls-aws-lc

[private]
check-hack-powerset:
cargo +{{rust}} hack check {{cargo-hack-args}} --feature-powerset --skip docs,axum,sni,tls-ring,tls-aws-lc
cargo +{{rust}} hack check {{cargo-hack-args}} --feature-powerset --group-features tls,tls-ring --group-features tls,tls-aws-lc --skip docs,axum,sni

[private]
check-hack-tests: (check-hack-targets "tests")
Expand All @@ -59,8 +59,7 @@ check-hack-all-targets: (check-hack-targets "all-targets")

# Check compilation combinations for a specific target
check-hack-targets targets='tests':
cargo +{{rust}} hack check --{{targets}} {{cargo-hack-args}} --no-private --feature-powerset --exclude-no-default-features --include-features mocks,tls-ring,server,client,stream

cargo +{{rust}} hack check --{{targets}} {{cargo-hack-args}} --no-private --feature-powerset --exclude-no-default-features --group-features tls,tls-ring --group-features tls,tls-aws-lc --skip docs,axum,sni
# Build the library in release mode
build:
cargo +{{rust}} build --release
Expand Down
24 changes: 23 additions & 1 deletion src/bridge/rt.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use hyper::rt::Executor;
use chateau::rt::Executor;
use hyper::rt::Executor as HyperExecutor;

/// A tokio executor for running futures within hyper.
#[derive(Debug, Default, Clone, Copy)]
pub struct TokioExecutor;
Expand All @@ -20,6 +22,16 @@ where
}
}

impl<F> HyperExecutor<F> for TokioExecutor
where
F: std::future::Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, future: F) {
tokio::spawn(future);
}
}

/// A tokio executor for running futures on the current thread.
#[derive(Debug, Default, Clone, Copy)]
pub struct TokioCurrentThreadExecutor;
Expand All @@ -40,3 +52,13 @@ where
tokio::task::spawn_local(future);
}
}

impl<F> HyperExecutor<F> for TokioCurrentThreadExecutor
where
F: std::future::Future + 'static,
F::Output: 'static,
{
fn execute(&self, future: F) {
tokio::task::spawn_local(future);
}
}
Loading
Loading