Ferrit Explore
中文·繁體·EN·日本語 Sign in Register
cielxl / veld / src / core / acceptor.rs
//! TCP listener/acceptor with production-grade socket options.
//!
//! The [`Acceptor`] wraps a standard-library [`TcpListener`] but configures
//! the underlying socket with options that are critical for a high-performance
//! server:
//!
//!   - **`SO_REUSEADDR`** -- always enabled, allowing rapid restarts without
//!     waiting for `TIME_WAIT` sockets to drain.
//!   - **`SO_REUSEPORT`** -- enabled on Linux (and other BSD-derived kernels)
//!     so that multiple worker threads / processes can bind to the same port
//!     and let the kernel distribute incoming connections.
//!   - **`TCP_NODELAY`** -- disables Nagle's algorithm on accepted connections
//!     to minimise latency for small HTTP responses.
//!   - **Non-blocking mode** -- the listener is placed in non-blocking mode
//!     so it integrates cleanly with `tokio::net::TcpStream::from_std`.
//!
//! The implementation relies on the [`socket2`] crate for portable access to
//! low-level socket options and works on both IPv4 and IPv6 addresses.

use std::io;
use std::net::{SocketAddr, TcpListener as StdTcpListener, ToSocketAddrs};

use socket2::{Domain, Protocol, Socket, Type};
use tokio::net::TcpStream;

// ---------------------------------------------------------------------------
// Acceptor
// ---------------------------------------------------------------------------

/// A TCP listener that binds to an address with optimised socket options and
/// accepts connections one at a time.
///
/// # Examples
///
/// ```no_run
/// use veld::core::acceptor::Acceptor;
///
/// # fn main() -> std::io::Result<()> {
/// let acceptor = Acceptor::bind("0.0.0.0:8080")?;
/// println!("listening on {}", acceptor.local_addr());
///
/// // In an async context:
/// // let stream = acceptor.accept().await?;
/// # Ok(())
/// # }
/// ```
pub struct Acceptor {
    /// The tokio-wrapped listener used for async `accept` calls.
    listener: tokio::net::TcpListener,

    /// Cached local address for fast retrieval without a syscall.
    local_addr: SocketAddr,
}

impl Acceptor {
    // ------------------------------------------------------------------
    // Construction
    // ------------------------------------------------------------------

    /// Bind to the given address and return a configured [`Acceptor`].
    ///
    /// The address can be any type that implements [`ToSocketAddrs`], for
    /// example `"0.0.0.0:8080"`, `"127.0.0.1:0"`, or `[::1]:8080` for IPv6.
    ///
    /// The socket is configured with:
    ///   1. `SO_REUSEADDR` (all platforms)
    ///   2. `SO_REUSEPORT` (Linux / BSD -- ignored on unsupported platforms)
    ///   3. Non-blocking mode
    ///
    /// After binding, the listener calls `listen(1024)` to set the backlog
    /// depth.  The value 1024 is a sensible default that avoids dropped
    /// connections under burst traffic while remaining within typical kernel
    /// limits.
    pub fn bind<A: ToSocketAddrs>(addr: A) -> io::Result<Self> {
        // Resolve the address.  We take the first entry; callers that need
        // multi-homed binding should resolve externally and pass a concrete
        // `SocketAddr`.
        let addr = addr
            .to_socket_addrs()?
            .next()
            .ok_or_else(|| io::Error::new(io::ErrorKind::InvalidInput, "no addresses resolved"))?;

        let domain = if addr.is_ipv4() {
            Domain::IPV4
        } else {
            Domain::IPV6
        };

        let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;

        // -- Socket options -------------------------------------------------

        // SO_REUSEADDR: avoid EADDRINUSE when restarting the server while
        // connections are still in TIME_WAIT.
        socket.set_reuse_address(true)?;

        // SO_REUSEPORT: on supporting kernels, allows multiple sockets (and
        // thus multiple worker threads/processes) to bind to the same port.
        // The kernel performs load-balancing across them.
        //
        // This is a no-op on platforms that lack the option (e.g. older
        // Windows versions).
        Self::set_reuse_port(&socket)?;

        // IPv6-only: when binding to `::` we typically want to accept IPv4
        // connections as well (dual-stack).  However, if the caller
        // explicitly binds to an IPv6 address that is *not* `::`, we leave
        // the default kernel behaviour intact.  Disabling `IPV6_V6ONLY`
        // on a dual-stack stack lets us serve both protocols from a single
        // listener.
        if domain == Domain::IPV6 {
            // Best-effort: ignore errors on platforms where the option does
            // not exist.
            let _ = socket.set_only_v6(false);
        }

        // -- Bind and listen -----------------------------------------------

        socket.bind(&addr.into())?;
        socket.listen(1024)?;

        // -- Convert to async listener -------------------------------------

        // `socket2::Socket` -> `std::net::TcpListener` (zero-cost).
        let std_listener: StdTcpListener = socket.into();

        // Place into non-blocking mode so that `tokio::net::TcpListener`
        // can register it with the reactor.
        std_listener.set_nonblocking(true)?;

        let local_addr = std_listener.local_addr()?;

        let listener = tokio::net::TcpListener::from_std(std_listener)?;

        Ok(Self {
            listener,
            local_addr,
        })
    }

    // ------------------------------------------------------------------
    // Accept
    // ------------------------------------------------------------------

    /// Accept a single incoming TCP connection.
    ///
    /// The returned [`TcpStream`] has `TCP_NODELAY` enabled so that HTTP
    /// responses consisting of small buffers are dispatched immediately
    /// rather than being coalesced by Nagle's algorithm.
    ///
    /// This method is `async` -- it yields to the tokio runtime when no
    /// connection is pending.
    pub async fn accept(&self) -> io::Result<TcpStream> {
        let (stream, _peer_addr) = self.listener.accept().await?;

        // Disable Nagle's algorithm: for a request/response protocol like
        // HTTP the extra latency is unacceptable.  Individual handlers can
        // re-enable it for large streaming responses if needed.
        stream.set_nodelay(true)?;

        Ok(stream)
    }

    /// Accept a connection and return both the stream and the peer address.
    ///
    /// This is a convenience wrapper around [`accept`](Self::accept) for
    /// call sites that need the peer address for logging or access control
    /// without calling `peer_addr()` on the stream.
    pub async fn accept_with_addr(&self) -> io::Result<(TcpStream, SocketAddr)> {
        let (stream, peer_addr) = self.listener.accept().await?;
        stream.set_nodelay(true)?;
        Ok((stream, peer_addr))
    }

    // ------------------------------------------------------------------
    // Accessors
    // ------------------------------------------------------------------

    /// Return the local address that this listener is bound to.
    ///
    /// Useful for logging and for discovering the ephemeral port when
    /// binding to port `0` (e.g. in tests).
    pub fn local_addr(&self) -> SocketAddr {
        self.local_addr
    }

    // ------------------------------------------------------------------
    // Private helpers
    // ------------------------------------------------------------------

    /// Attempt to set `SO_REUSEPORT` on the socket.
    ///
    /// On platforms that support the option this allows multiple processes
    /// (or threads) to bind to the same address+port pair.  The kernel
    /// distributes incoming connections across them.
    ///
    /// On platforms that do *not* support the option (e.g. some older
    /// Windows builds) the call is silently ignored.
    fn set_reuse_port(_socket: &Socket) -> io::Result<()> {
        // socket2 only exposes `set_reuse_port` on Unix-like platforms.
        // On Windows, `SO_REUSEADDR` (already set above) provides
        // equivalent port-sharing behaviour for our use case.
        #[cfg(all(unix, not(target_os = "solaris")))]
        {
            use std::os::unix::io::AsRawFd;
            let fd = _socket.as_raw_fd();
            unsafe {
                let optval: libc::c_int = 1;
                libc::setsockopt(
                    fd,
                    libc::SOL_SOCKET,
                    libc::SO_REUSEPORT,
                    &optval as *const _ as *const libc::c_void,
                    std::mem::size_of::<libc::c_int>() as libc::socklen_t,
                );
            }
        }

        Ok(())
    }
}

impl std::fmt::Debug for Acceptor {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Acceptor")
            .field("local_addr", &self.local_addr)
            .finish_non_exhaustive()
    }
}

// ---------------------------------------------------------------------------
// Tests
// ---------------------------------------------------------------------------

#[cfg(test)]
mod tests {
    use super::*;

    /// Bind to an ephemeral port and verify the accessor.
    #[tokio::test]
    async fn bind_ephemeral_port() {
        let acceptor = Acceptor::bind("127.0.0.1:0").expect("bind to 127.0.0.1:0");
        let addr = acceptor.local_addr();
        assert_eq!(addr.ip(), "127.0.0.1".parse::<std::net::IpAddr>().unwrap());
        assert_ne!(addr.port(), 0, "OS should have assigned a real port");
    }

    /// IPv6 loopback binding.
    #[tokio::test]
    async fn bind_ipv6_loopback() {
        let acceptor = Acceptor::bind("[::1]:0").expect("bind to [::1]:0");
        let addr = acceptor.local_addr();
        assert!(addr.is_ipv6());
        assert_ne!(addr.port(), 0);
    }

    /// Verify that we can accept a connection and that TCP_NODELAY is set.
    #[tokio::test]
    async fn accept_sets_nodelay() {
        let acceptor = Acceptor::bind("127.0.0.1:0").unwrap();
        let addr = acceptor.local_addr();

        // Spawn a client that connects immediately.
        let connect_handle =
            tokio::spawn(async move { tokio::net::TcpStream::connect(addr).await.unwrap() });

        let server_stream = acceptor.accept().await.unwrap();
        let client_stream = connect_handle.await.unwrap();

        assert!(
            server_stream.nodelay().unwrap(),
            "accepted stream should have TCP_NODELAY enabled"
        );

        // The client stream should NOT have NODELAY set by us.
        // (The OS default is Nagle enabled, i.e. nodelay == false.)
        // Note: some systems may have a different default, so we only
        // check the server side definitively.

        drop(server_stream);
        drop(client_stream);
    }

    /// Verify `accept_with_addr` returns a plausible peer address.
    #[tokio::test]
    async fn accept_with_addr_returns_peer() {
        let acceptor = Acceptor::bind("127.0.0.1:0").unwrap();
        let addr = acceptor.local_addr();

        let connect_handle =
            tokio::spawn(async move { tokio::net::TcpStream::connect(addr).await.unwrap() });

        let (_stream, peer) = acceptor.accept_with_addr().await.unwrap();
        assert_eq!(peer.ip(), "127.0.0.1".parse::<std::net::IpAddr>().unwrap());

        let _ = connect_handle.await;
    }

    /// Two listeners on the same port thanks to SO_REUSEPORT (Unix only).
    ///
    /// On Linux, SO_REUSEPORT allows multiple listeners on the same port.
    /// On Windows, SO_REUSEADDR has different semantics that don't
    /// guarantee dual-bind success, so we skip this test there.
    #[cfg(unix)]
    #[tokio::test]
    async fn reuse_port_allows_double_bind() {
        let a1 = Acceptor::bind("127.0.0.1:0").unwrap();
        let port = a1.local_addr().port();

        // Attempt a second bind to the same port.  This should succeed on
        // Linux (SO_REUSEPORT) and may succeed on other platforms depending
        // on their SO_REUSEADDR semantics.
        let a2 = Acceptor::bind(("127.0.0.1", port));
        // We assert *success* here -- if your platform rejects this, skip
        // or conditionally compile the test.
        assert!(a2.is_ok(), "second bind to the same port should succeed");
    }

    /// Debug output includes the local address.
    #[tokio::test]
    async fn debug_fmt_shows_addr() {
        let acceptor = Acceptor::bind("127.0.0.1:0").unwrap();
        let dbg = format!("{:?}", acceptor);
        assert!(
            dbg.contains("local_addr"),
            "Debug output should include local_addr"
        );
    }
}