//! TCP listener/acceptor with production-grade socket options.
//!
//! The [`Acceptor`] wraps a standard-library [`TcpListener`] but configures
//! the underlying socket with options that are critical for a high-performance
//! server:
//!
//! - **`SO_REUSEADDR`** -- always enabled, allowing rapid restarts without
//! waiting for `TIME_WAIT` sockets to drain.
//! - **`SO_REUSEPORT`** -- enabled on Linux (and other BSD-derived kernels)
//! so that multiple worker threads / processes can bind to the same port
//! and let the kernel distribute incoming connections.
//! - **`TCP_NODELAY`** -- disables Nagle's algorithm on accepted connections
//! to minimise latency for small HTTP responses.
//! - **Non-blocking mode** -- the listener is placed in non-blocking mode
//! so it integrates cleanly with `tokio::net::TcpStream::from_std`.
//!
//! The implementation relies on the [`socket2`] crate for portable access to
//! low-level socket options and works on both IPv4 and IPv6 addresses.
use std::io;
use std::net::{SocketAddr, TcpListener as StdTcpListener, ToSocketAddrs};
use socket2::{Domain, Protocol, Socket, Type};
use tokio::net::TcpStream;
// ---------------------------------------------------------------------------
// Acceptor
// ---------------------------------------------------------------------------
/// A TCP listener that binds to an address with optimised socket options and
/// accepts connections one at a time.
///
/// # Examples
///
/// ```no_run
/// use veld::core::acceptor::Acceptor;
///
/// # fn main() -> std::io::Result<()> {
/// let acceptor = Acceptor::bind("0.0.0.0:8080")?;
/// println!("listening on {}", acceptor.local_addr());
///
/// // In an async context:
/// // let stream = acceptor.accept().await?;
/// # Ok(())
/// # }
/// ```
pub struct Acceptor {
/// The tokio-wrapped listener used for async `accept` calls.
listener: tokio::net::TcpListener,
/// Cached local address for fast retrieval without a syscall.
local_addr: SocketAddr,
}
impl Acceptor {
// ------------------------------------------------------------------
// Construction
// ------------------------------------------------------------------
/// Bind to the given address and return a configured [`Acceptor`].
///
/// The address can be any type that implements [`ToSocketAddrs`], for
/// example `"0.0.0.0:8080"`, `"127.0.0.1:0"`, or `[::1]:8080` for IPv6.
///
/// The socket is configured with:
/// 1. `SO_REUSEADDR` (all platforms)
/// 2. `SO_REUSEPORT` (Linux / BSD -- ignored on unsupported platforms)
/// 3. Non-blocking mode
///
/// After binding, the listener calls `listen(1024)` to set the backlog
/// depth. The value 1024 is a sensible default that avoids dropped
/// connections under burst traffic while remaining within typical kernel
/// limits.
pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
// Resolve the address. We take the first entry; callers that need
// multi-homed binding should resolve externally and pass a concrete
// `SocketAddr`.
let addr = addr
.to_socket_addrs()?
.next()
.ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "no addresses resolved"))?;
let domain = if addr.is_ipv4() {
Domain::IPV4
} else {
Domain::IPV6
};
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
// -- Socket options -------------------------------------------------
// SO_REUSEADDR: avoid EADDRINUSE when restarting the server while
// connections are still in TIME_WAIT.
socket.set_reuse_address(true)?;
// SO_REUSEPORT: on supporting kernels, allows multiple sockets (and
// thus multiple worker threads/processes) to bind to the same port.
// The kernel performs load-balancing across them.
//
// This is a no-op on platforms that lack the option (e.g. older
// Windows versions).
Self::set_reuse_port(&socket)?;
// IPv6-only: when binding to `::` we typically want to accept IPv4
// connections as well (dual-stack). However, if the caller
// explicitly binds to an IPv6 address that is *not* `::`, we leave
// the default kernel behaviour intact. Disabling `IPV6_V6ONLY`
// on a dual-stack stack lets us serve both protocols from a single
// listener.
if domain == Domain::IPV6 {
// Best-effort: ignore errors on platforms where the option does
// not exist.
let _ = socket.set_only_v6(false);
}
// -- Bind and listen -----------------------------------------------
socket.bind(&addr.into())?;
socket.listen(1024)?;
// -- Convert to async listener -------------------------------------
// `socket2::Socket` -> `std::net::TcpListener` (zero-cost).
let std_listener: StdTcpListener = socket.into();
// Place into non-blocking mode so that `tokio::net::TcpListener`
// can register it with the reactor.
std_listener.set_nonblocking(true)?;
let local_addr = std_listener.local_addr()?;
let listener = tokio::net::TcpListener::from_std(std_listener)?;
Ok(Self {
listener,
local_addr,
})
}
// ------------------------------------------------------------------
// Accept
// ------------------------------------------------------------------
/// Accept a single incoming TCP connection.
///
/// The returned [`TcpStream`] has `TCP_NODELAY` enabled so that HTTP
/// responses consisting of small buffers are dispatched immediately
/// rather than being coalesced by Nagle's algorithm.
///
/// This method is `async` -- it yields to the tokio runtime when no
/// connection is pending.
pub async fn accept(&self) -> io::Result<TcpStream> {
let (stream, _peer_addr) = self.listener.accept().await?;
// Disable Nagle's algorithm: for a request/response protocol like
// HTTP the extra latency is unacceptable. Individual handlers can
// re-enable it for large streaming responses if needed.
stream.set_nodelay(true)?;
Ok(stream)
}
/// Accept a connection and return both the stream and the peer address.
///
/// This is a convenience wrapper around [`accept`](Self::accept) for
/// call sites that need the peer address for logging or access control
/// without calling `peer_addr()` on the stream.
pub async fn accept_with_addr(&self) -> io::Result<(TcpStream, SocketAddr)> {
let (stream, peer_addr) = self.listener.accept().await?;
stream.set_nodelay(true)?;
Ok((stream, peer_addr))
}
// ------------------------------------------------------------------
// Accessors
// ------------------------------------------------------------------
/// Return the local address that this listener is bound to.
///
/// Useful for logging and for discovering the ephemeral port when
/// binding to port `0` (e.g. in tests).
pub fn local_addr(&self) -> SocketAddr {
self.local_addr
}
// ------------------------------------------------------------------
// Private helpers
// ------------------------------------------------------------------
/// Attempt to set `SO_REUSEPORT` on the socket.
///
/// On platforms that support the option this allows multiple processes
/// (or threads) to bind to the same address+port pair. The kernel
/// distributes incoming connections across them.
///
/// On platforms that do *not* support the option (e.g. some older
/// Windows builds) the call is silently ignored.
fn set_reuse_port(_socket: &Socket) -> io::Result<()> {
// socket2 only exposes `set_reuse_port` on Unix-like platforms.
// On Windows, `SO_REUSEADDR` (already set above) provides
// equivalent port-sharing behaviour for our use case.
#[cfg(all(unix, not(target_os = "solaris")))]
{
use std::os::unix::io::AsRawFd;
let fd = _socket.as_raw_fd();
unsafe {
let optval: libc::c_int = 1;
libc::setsockopt(
fd,
libc::SOL_SOCKET,
libc::SO_REUSEPORT,
&optval as *const _ as *const libc::c_void,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
);
}
}
Ok(())
}
}
impl std::fmt::Debug for Acceptor {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Acceptor")
.field("local_addr", &self.local_addr)
.finish_non_exhaustive()
}
}
// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
/// Bind to an ephemeral port and verify the accessor.
#[tokio::test]
async fn bind_ephemeral_port() {
let acceptor = Acceptor::bind("127.0.0.1:0").expect("bind to 127.0.0.1:0");
let addr = acceptor.local_addr();
assert_eq!(addr.ip(), "127.0.0.1".parse::<std::net::IpAddr>().unwrap());
assert_ne!(addr.port(), 0, "OS should have assigned a real port");
}
/// IPv6 loopback binding.
#[tokio::test]
async fn bind_ipv6_loopback() {
let acceptor = Acceptor::bind("[::1]:0").expect("bind to [::1]:0");
let addr = acceptor.local_addr();
assert!(addr.is_ipv6());
assert_ne!(addr.port(), 0);
}
/// Verify that we can accept a connection and that TCP_NODELAY is set.
#[tokio::test]
async fn accept_sets_nodelay() {
let acceptor = Acceptor::bind("127.0.0.1:0").unwrap();
let addr = acceptor.local_addr();
// Spawn a client that connects immediately.
let connect_handle =
tokio::spawn(async move { tokio::net::TcpStream::connect(addr).await.unwrap() });
let server_stream = acceptor.accept().await.unwrap();
let client_stream = connect_handle.await.unwrap();
assert!(
server_stream.nodelay().unwrap(),
"accepted stream should have TCP_NODELAY enabled"
);
// The client stream should NOT have NODELAY set by us.
// (The OS default is Nagle enabled, i.e. nodelay == false.)
// Note: some systems may have a different default, so we only
// check the server side definitively.
drop(server_stream);
drop(client_stream);
}
/// Verify `accept_with_addr` returns a plausible peer address.
#[tokio::test]
async fn accept_with_addr_returns_peer() {
let acceptor = Acceptor::bind("127.0.0.1:0").unwrap();
let addr = acceptor.local_addr();
let connect_handle =
tokio::spawn(async move { tokio::net::TcpStream::connect(addr).await.unwrap() });
let (_stream, peer) = acceptor.accept_with_addr().await.unwrap();
assert_eq!(peer.ip(), "127.0.0.1".parse::<std::net::IpAddr>().unwrap());
let _ = connect_handle.await;
}
/// Two listeners on the same port thanks to SO_REUSEPORT (Unix only).
///
/// On Linux, SO_REUSEPORT allows multiple listeners on the same port.
/// On Windows, SO_REUSEADDR has different semantics that don't
/// guarantee dual-bind success, so we skip this test there.
#[cfg(unix)]
#[tokio::test]
async fn reuse_port_allows_double_bind() {
let a1 = Acceptor::bind("127.0.0.1:0").unwrap();
let port = a1.local_addr().port();
// Attempt a second bind to the same port. This should succeed on
// Linux (SO_REUSEPORT) and may succeed on other platforms depending
// on their SO_REUSEADDR semantics.
let a2 = Acceptor::bind(("127.0.0.1", port));
// We assert *success* here -- if your platform rejects this, skip
// or conditionally compile the test.
assert!(a2.is_ok(), "second bind to the same port should succeed");
}
/// Debug output includes the local address.
#[tokio::test]
async fn debug_fmt_shows_addr() {
let acceptor = Acceptor::bind("127.0.0.1:0").unwrap();
let dbg = format!("{:?}", acceptor);
assert!(
dbg.contains("local_addr"),
"Debug output should include local_addr"
);
}
}