Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion crates/tuic-server/src/wind_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ impl TuicRouter {
}
}


/// Build the DNS resolver selected by the configuration.
fn build_resolver(cfg: &crate::Config) -> eyre::Result<Arc<dyn Resolver>> {
let default_ip_mode = cfg.outbound.default.ip_mode.unwrap_or(StackPrefer::V4first);
Expand Down Expand Up @@ -281,6 +280,10 @@ async fn create_quinn_inbound(ctx: &Arc<TuicAppContext>) -> eyre::Result<TuicInb
initial_mtu: quinn.initial_mtu,
min_mtu: quinn.min_mtu,
gso: quinn.gso,
// `CongestionController` is a type alias for `wind_tuic::quinn::CongestionControl`,
// so the configured controller and initial window flow straight through.
congestion_control: quinn.congestion_control.controller,
initial_window: quinn.congestion_control.initial_window,
};
tracing::info!("Initializing quinn (wind-tuic) backend");
Ok(TuicInbound::new(wind_ctx, opts))
Expand Down
14 changes: 12 additions & 2 deletions crates/wind-base/src/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ impl OutboundAction for DirectOutbound {
async move {
let target_sa = resolve_target(&target, self.resolver.as_ref()).await?;
let mut target_stream = connect_direct_tcp(target_sa, &self.opts).await?;
if let Err(e) = tokio::io::copy_bidirectional(&mut stream, &mut target_stream).await {
if let (_, _, Some(e)) = wind_core::io::copy_io(&mut stream, &mut target_stream).await {
tracing::debug!(error = %e, "direct copy_bidirectional ended");
}
Ok(())
Expand Down Expand Up @@ -82,7 +82,17 @@ pub async fn connect_direct_tcp(addr: SocketAddr, opts: &DirectOutboundOpts) ->
socket.bind_device(Some(dev.as_bytes()))?;
}

Ok(socket.connect(addr).await?)
let stream = socket.connect(addr).await?;
// Disable Nagle's algorithm. Proxied browser traffic is dominated by small
// writes (TLS records, HTTP request headers); with Nagle on, each small
// segment waits for the previous one to be ACKed, and interacts with the
// peer's delayed-ACK timer to add up to ~40 ms of latency per round trip —
// the classic "browsing feels laggy through the proxy" symptom. The QUIC
// inbound has no such buffering, so this is the only hop that needs it.
if let Err(e) = stream.set_nodelay(true) {
tracing::debug!(error = %e, "failed to set TCP_NODELAY on direct outbound");
}
Ok(stream)
}

async fn relay_udp_direct(_opts: DirectOutboundOpts, resolver: Arc<dyn Resolver>, udp_stream: UdpStream) -> eyre::Result<()> {
Expand Down
28 changes: 20 additions & 8 deletions crates/wind-core/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,33 @@
use tokio::io::{AsyncRead, AsyncWrite};

/// Per-direction copy buffer size used by [`copy_io`].
///
/// `tokio::io::copy_bidirectional`'s default is only 8 KiB, which caps
/// single-stream throughput over high bandwidth-delay-product links (a QUIC
/// tunnel to a distant peer is exactly that): every 8 KiB requires a fresh
/// read/write/wakeup cycle, so the relay can't keep enough bytes in flight to
/// fill the congestion/flow-control window. 64 KiB lets each direction move an
/// order of magnitude more data per syscall while staying well within typical
/// stream receive windows.
pub const RELAY_BUF_SIZE: usize = 64 * 1024;

/// Bidirectionally relay bytes between two duplex streams until BOTH sides
/// have closed.
///
/// Delegates to [`tokio::io::copy_bidirectional`], which correctly handles
/// half-close: when one direction sees EOF, it calls `shutdown()` on the
/// opposite writer and continues pumping the remaining direction. The
/// previous hand-rolled implementation broke out of the outer loop on the
/// FIRST EOF, dropping any in-flight bytes flowing the other way — a common
/// problem for HTTP, where a client sends its request and FINs while the
/// server is still streaming the response.
/// Delegates to [`tokio::io::copy_bidirectional_with_sizes`] (with
/// [`RELAY_BUF_SIZE`] per direction), which correctly handles half-close: when
/// one direction sees EOF, it calls `shutdown()` on the opposite writer and
/// continues pumping the remaining direction. The previous hand-rolled
/// implementation broke out of the outer loop on the FIRST EOF, dropping any
/// in-flight bytes flowing the other way — a common problem for HTTP, where a
/// client sends its request and FINs while the server is still streaming the
/// response.
pub async fn copy_io<A, B>(a: &mut A, b: &mut B) -> (usize, usize, Option<std::io::Error>)
where
A: AsyncRead + AsyncWrite + Unpin + ?Sized,
B: AsyncRead + AsyncWrite + Unpin + ?Sized,
{
match tokio::io::copy_bidirectional(a, b).await {
match tokio::io::copy_bidirectional_with_sizes(a, b, RELAY_BUF_SIZE, RELAY_BUF_SIZE).await {
Ok((a2b, b2a)) => (a2b as usize, b2a as usize, None),
Err(e) => (0, 0, Some(e)),
}
Expand Down
8 changes: 7 additions & 1 deletion crates/wind-socks/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl OutboundAction for Socks5Action {
let span = tracing::debug_span!("socks5_tcp", target = %target, addr = %self.opts.addr);
async move {
let mut socks_stream = connect_socks5_tcp(&self.opts.addr, &target, &self.opts).await?;
if let Err(e) = tokio::io::copy_bidirectional(&mut stream, &mut socks_stream).await {
if let (_, _, Some(e)) = wind_core::io::copy_io(&mut stream, &mut socks_stream).await {
tracing::debug!(error = %e, "socks5 copy_bidirectional ended");
}
Ok(())
Expand Down Expand Up @@ -77,5 +77,11 @@ async fn connect_socks5_tcp(
.map_err(|e| eyre::eyre!("SOCKS5 connect failed: {}", e))?,
};

// Disable Nagle on the hop to the SOCKS proxy — the same small-write latency
// concern as the direct path (see `wind_base::direct::connect_direct_tcp`).
if let Err(e) = stream.get_socket_ref().set_nodelay(true) {
tracing::debug!(error = %e, "failed to set TCP_NODELAY on socks5 outbound");
}

Ok(stream)
}
12 changes: 10 additions & 2 deletions crates/wind-socks/src/outbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,9 @@ impl AbstractOutbound for SocksOutbound {
.map_err(|e| eyre!(e))?;
p.request(Socks5Command::TCPConnect, socks_addr).await.map_err(|e| eyre!(e))?;
let mut p: Box<dyn AbstractTcpStream> = Box::new(p);
tokio::io::copy_bidirectional(&mut stream, &mut p).await?;
if let (_, _, Some(e)) = wind_core::io::copy_io(&mut stream, &mut p).await {
return Err(eyre!(e));
}
Ok::<(), eyre::Report>(())
};

Expand All @@ -70,12 +72,18 @@ impl AbstractOutbound for SocksOutbound {
} else {
// Direct connection to the SOCKS server
let tcp_stream = tokio::net::TcpStream::connect(self.opts.server_addr).await?;
// Disable Nagle on the hop to the SOCKS proxy (small-write latency).
if let Err(e) = tcp_stream.set_nodelay(true) {
tracing::debug!(error = %e, "failed to set TCP_NODELAY on socks5 outbound");
}
let mut p = Socks5Stream::use_stream(tcp_stream, auth, socks_config)
.await
.map_err(|e| eyre!(e))?;
p.request(Socks5Command::TCPConnect, socks_addr).await.map_err(|e| eyre!(e))?;
let mut proxy_stream: Box<dyn AbstractTcpStream> = Box::new(p);
tokio::io::copy_bidirectional(&mut stream, &mut proxy_stream).await?;
if let (_, _, Some(e)) = wind_core::io::copy_io(&mut stream, &mut proxy_stream).await {
return Err(eyre!(e));
}
}

Ok(())
Expand Down
46 changes: 44 additions & 2 deletions crates/wind-tuic/src/quinn/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use wind_core::{
udp::{UdpPacket, UdpStream as CoreUdpStream},
};

use crate::proto::{CmdType, Command};
use crate::{
proto::{CmdType, Command},
quinn::CongestionControl,
};

async fn spawn_logged(label: &str, fut: impl std::future::Future<Output = eyre::Result<()>>) {
if let Err(err) = fut.await {
Expand Down Expand Up @@ -156,6 +159,14 @@ pub struct TuicInboundOpts {
pub min_mtu: u16,

pub gso: bool,

/// Congestion control algorithm for the QUIC transport.
pub congestion_control: CongestionControl,

/// Initial congestion window, in bytes. A larger value lets short-lived
/// connections (e.g. one TCP-over-QUIC stream per browser request) ramp out
/// of slow-start faster instead of trickling the first few round trips.
pub initial_window: u64,
}

impl Default for TuicInboundOpts {
Expand All @@ -177,6 +188,8 @@ impl Default for TuicInboundOpts {
initial_mtu: 1200,
min_mtu: 1200,
gso: true,
congestion_control: CongestionControl::Bbr,
initial_window: 1024 * 1024,
}
}
}
Expand Down Expand Up @@ -248,12 +261,41 @@ impl TuicInbound {
))
.initial_mtu(self.opts.initial_mtu)
.min_mtu(self.opts.min_mtu)
.enable_segmentation_offload(self.opts.gso);
.enable_segmentation_offload(self.opts.gso)
.congestion_controller_factory(self.congestion_controller_factory());

config.transport_config(Arc::new(transport));

Ok(config)
}

/// Build the QUIC congestion-controller factory selected by
/// [`TuicInboundOpts::congestion_control`], applying the configured initial
/// window. Previously the server always used quinn's default (CUBIC with a
/// small initial window) and the operator's congestion-control settings
/// were silently ignored.
fn congestion_controller_factory(&self) -> Arc<dyn quinn::congestion::ControllerFactory + Send + Sync + 'static> {
let iw = self.opts.initial_window;
match self.opts.congestion_control {
// `quinn-congestions` provides a single BBR implementation; both the
// `bbr` and `bbr3` config aliases map to it.
CongestionControl::Bbr | CongestionControl::Bbr3 => {
let mut cfg = quinn_congestions::bbr::BbrConfig::default();
cfg.initial_window(iw);
Arc::new(cfg)
}
CongestionControl::Cubic => {
let mut cfg = quinn::congestion::CubicConfig::default();
cfg.initial_window(iw);
Arc::new(cfg)
}
CongestionControl::NewReno => {
let mut cfg = quinn::congestion::NewRenoConfig::default();
cfg.initial_window(iw);
Arc::new(cfg)
}
}
}
}

impl AbstractInbound for TuicInbound {
Expand Down
Loading