Ferrit Explore
中文·繁體·EN·日本語 Sign in Register
cielxl / veld / src / main.rs
//! veld: a high-performance HTTP server written in Rust, inspired by nginx.

pub mod config;
pub mod core;
pub mod handler;
pub mod http;
pub mod log;
pub mod proxy;
pub mod tls;
pub mod util;

use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::sync::Arc;

use clap::{Parser, Subcommand};
use tokio::sync::broadcast;
use tracing::{debug, error, info, warn};

use crate::config::directives::ParsedConfig;
use crate::util::signal::{ServerSignal, SignalHandler};

const VERSION: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
const DEFAULT_CONFIG_PATH: &str = "conf/veld.conf";

#[derive(Parser, Debug)]
#[command(
    name = "veld",
    version,
    about = "A high-performance HTTP server inspired by nginx"
)]
struct Cli {
    #[command(subcommand)]
    command: Option<Command>,

    #[arg(short = 'c', long = "config", default_value = DEFAULT_CONFIG_PATH)]
    config: PathBuf,

    #[arg(short = 'p', long = "prefix")]
    prefix: Option<PathBuf>,

    #[arg(long = "workers")]
    workers: Option<usize>,
}

#[derive(Subcommand, Debug, Clone, PartialEq, Eq)]
enum Command {
    Start,
    Test,
    Reload,
}

#[tokio::main]
async fn main() -> ExitCode {
    let cli = Cli::parse();
    init_logging();

    let prefix = cli
        .prefix
        .clone()
        .unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));

    let config_path = if cli.config.is_absolute() {
        cli.config.clone()
    } else {
        prefix.join(&cli.config)
    };

    let subcommand = cli.command.unwrap_or(Command::Start);

    match subcommand {
        Command::Test => {
            if let Err(e) = run_config_test(&config_path) {
                error!("configuration test failed: {e}");
                return ExitCode::FAILURE;
            }
            println!(
                "configuration file {} test is successful",
                config_path.display()
            );
            ExitCode::SUCCESS
        }
        Command::Reload => {
            if let Err(e) = run_reload(&prefix) {
                error!("reload failed: {e}");
                return ExitCode::FAILURE;
            }
            ExitCode::SUCCESS
        }
        Command::Start => {
            print_banner();
            let worker_count = cli.workers.unwrap_or(0);
            if let Err(e) = run_server(&config_path, &prefix, worker_count).await {
                error!("server exited with error: {e}");
                return ExitCode::FAILURE;
            }
            ExitCode::SUCCESS
        }
    }
}

fn run_config_test(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
    info!("testing configuration file: {}", path.display());
    let _cfg = config::load_config(path)?;
    info!("configuration syntax is ok");
    Ok(())
}

fn run_reload(prefix: &Path) -> Result<(), Box<dyn std::error::Error>> {
    #[cfg(unix)]
    {
        let pid_path = prefix.join("logs").join("veld.pid");
        let pid_str = std::fs::read_to_string(&pid_path)?;
        let pid: i32 = pid_str.trim().parse()?;
        unsafe {
            libc::kill(pid, libc::SIGHUP);
        }
        info!("sent SIGHUP to process {pid}");
    }

    #[cfg(windows)]
    {
        let control_path = prefix.join("logs").join("veld.reload");
        std::fs::write(&control_path, "reload")?;
        info!("wrote reload control file");
        eprintln!("reload signal sent");
    }

    Ok(())
}

async fn run_server(
    config_path: &Path,
    prefix: &Path,
    worker_override: usize,
) -> Result<(), Box<dyn std::error::Error>> {
    info!("loading configuration from {}", config_path.display());
    let cfg = config::load_config(config_path)?;
    info!("configuration loaded successfully");

    // Debug: print parsed config
    for server in &cfg.http.servers {
        for listen in &server.listen {
            info!(
                "  parsed listen: addr='{}', port={}, ssl={}",
                listen.addr, listen.port, listen.ssl
            );
        }
        info!("  server_name: {:?}", server.server_name);
        info!("  root: {:?}", server.root);
    }

    let effective_workers = if worker_override != 0 {
        worker_override
    } else {
        let w = cfg.worker_processes;
        if w == 0 {
            util::num_cpus()
        } else {
            w
        }
    };
    info!("worker count: {effective_workers}");

    // Collect the distinct listen endpoints across every server block.  A
    // single port is bound once; it is treated as TLS if any `listen` on it
    // carried the `ssl` flag.
    let endpoints = collect_endpoints(&cfg);
    if endpoints.is_empty() {
        return Err("no listen directives found in configuration".into());
    }

    // Build one SNI-aware TLS acceptor covering every HTTPS server block.
    let tls_acceptor = if endpoints.iter().any(|e| e.ssl) {
        Some(Arc::new(build_tls_acceptor(&cfg)?))
    } else {
        None
    };

    // Write PID file
    let pid_dir = prefix.join("logs");
    std::fs::create_dir_all(&pid_dir)?;
    let pid_path = pid_dir.join("veld.pid");
    std::fs::write(&pid_path, std::process::id().to_string())?;

    // Signal handling
    let signal_handler = SignalHandler::new();
    let mut shutdown_rx = signal_handler.subscribe();
    tokio::spawn(async move {
        signal_handler.run().await;
    });

    // One shared pipeline for all workers/listeners.
    let pipeline = Arc::new(crate::core::pipeline::PipelineProcessor::new(Arc::new(
        cfg.clone(),
    )));

    // On Linux each worker gets its own SO_REUSEPORT listener; elsewhere a
    // single listener per endpoint is shared across workers.
    let reuseport = cfg!(target_os = "linux");

    let mut worker_handles = Vec::new();
    for endpoint in &endpoints {
        let addr = format!("0.0.0.0:{}", endpoint.port);
        info!(
            "listening on {addr} ({})",
            if endpoint.ssl { "https" } else { "http" }
        );

        let shared_listener = if reuseport {
            None
        } else {
            Some(Arc::new(build_listener(&addr)?))
        };

        for worker_id in 0..effective_workers {
            let listener = match &shared_listener {
                Some(l) => l.clone(),
                None => Arc::new(build_listener(&addr)?),
            };
            let mut worker_shutdown = shutdown_rx.resubscribe();
            let pipeline = pipeline.clone();
            let tls = if endpoint.ssl {
                tls_acceptor.clone()
            } else {
                None
            };
            let ssl = endpoint.ssl;
            let port = endpoint.port;

            let handle = tokio::spawn(async move {
                run_worker(
                    worker_id,
                    listener,
                    &mut worker_shutdown,
                    pipeline,
                    ssl,
                    port,
                    tls,
                )
                .await;
            });
            worker_handles.push(handle);
        }
    }

    // Wait for shutdown
    wait_for_shutdown(&mut shutdown_rx).await;
    info!("shutdown signal received, draining workers...");

    for handle in worker_handles {
        let _ = handle.await;
    }

    let _ = std::fs::remove_file(&pid_path);
    info!("server stopped");
    Ok(())
}

/// A distinct address/port the server binds, plus whether it terminates TLS.
#[derive(Debug, Clone)]
struct Endpoint {
    port: u16,
    ssl: bool,
}

/// Collapse every server's `listen` directives into one entry per port.
fn collect_endpoints(cfg: &ParsedConfig) -> Vec<Endpoint> {
    use std::collections::BTreeMap;
    let mut ports: BTreeMap<u16, bool> = BTreeMap::new();
    for server in &cfg.http.servers {
        for l in &server.listen {
            let entry = ports.entry(l.port).or_insert(false);
            *entry = *entry || l.ssl;
        }
    }
    ports
        .into_iter()
        .map(|(port, ssl)| Endpoint { port, ssl })
        .collect()
}

/// Build the SNI TLS acceptor from every server block that has an
/// `ssl_certificate`.
fn build_tls_acceptor(
    cfg: &ParsedConfig,
) -> Result<tokio_rustls::TlsAcceptor, Box<dyn std::error::Error>> {
    let mut mgr = crate::tls::TlsManager::new();
    for server in &cfg.http.servers {
        if let Some(ssl) = &server.ssl {
            if ssl.certificate.as_os_str().is_empty() {
                continue;
            }
            mgr.add_cert(&server.server_name, &ssl.certificate, &ssl.certificate_key)
                .map_err(|e| format!("loading cert for {:?}: {e}", server.server_name))?;
            info!("loaded TLS cert for {:?}", server.server_name);
        }
    }
    if !mgr.has_certs() {
        return Err("ssl listener configured but no ssl_certificate found".into());
    }
    Ok(mgr.build_acceptor()?)
}

/// Build a listening socket.
///
/// Sets `SO_REUSEADDR`, and on Unix `SO_REUSEPORT`, so multiple workers can
/// each bind the same address and have the kernel distribute connections.
/// A generous backlog reduces connection drops under bursty load.
fn build_listener(addr: &str) -> Result<tokio::net::TcpListener, std::io::Error> {
    use socket2::{Domain, Protocol, Socket, Type};

    let sockaddr: std::net::SocketAddr = addr.parse().map_err(|_| {
        std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid listen address")
    })?;
    let domain = if sockaddr.is_ipv6() {
        Domain::IPV6
    } else {
        Domain::IPV4
    };

    let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
    socket.set_reuse_address(true)?;
    #[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
    socket.set_reuse_port(true)?;
    socket.set_nonblocking(true)?;
    socket.bind(&sockaddr.into())?;
    socket.listen(1024)?;

    tokio::net::TcpListener::from_std(socket.into())
}

async fn run_worker(
    id: usize,
    listener: Arc<tokio::net::TcpListener>,
    shutdown_rx: &mut broadcast::Receiver<ServerSignal>,
    pipeline: Arc<crate::core::pipeline::PipelineProcessor>,
    ssl: bool,
    port: u16,
    tls: Option<Arc<tokio_rustls::TlsAcceptor>>,
) {
    debug!("worker {id}: started (ssl={ssl}, port={port})");

    loop {
        tokio::select! {
            biased;
            result = shutdown_rx.recv() => {
                match result {
                    Ok(ServerSignal::Shutdown) => {
                        info!("worker {id}: received shutdown signal");
                        break;
                    }
                    Ok(ServerSignal::Reload) => {
                        info!("worker {id}: received reload signal");
                    }
                    Ok(ServerSignal::ReopenLogs) => {
                        info!("worker {id}: received reopen-logs signal");
                    }
                    Err(broadcast::error::RecvError::Lagged(n)) => {
                        warn!("worker {id}: missed {n} signal(s)");
                    }
                    Err(broadcast::error::RecvError::Closed) => {
                        break;
                    }
                }
            }
            result = listener.accept() => {
                match result {
                    Ok((stream, addr)) => {
                        debug!("worker {id}: accepted connection from {addr}");
                        let _ = stream.set_nodelay(true);
                        #[cfg(unix)]
                        {
                            let sref = socket2::SockRef::from(&stream);
                            let _ = sref.set_send_buffer_size(512 * 1024);
                        }
                        let pipeline = pipeline.clone();
                        let tls = tls.clone();
                        tokio::spawn(async move {
                            if let Some(acceptor) = tls {
                                // TLS listener: perform the handshake, then run
                                // the connection over the encrypted stream.
                                match acceptor.accept(stream).await {
                                    Ok(tls_stream) => {
                                        handle_connection_generic(tls_stream, pipeline, "https", port).await;
                                    }
                                    Err(e) => debug!("worker {id}: TLS handshake failed: {e}"),
                                }
                            } else {
                                handle_connection(stream, pipeline, port).await;
                            }
                        });
                    }
                    Err(e) => {
                        warn!("worker {id}: accept error: {e}");
                        tokio::time::sleep(std::time::Duration::from_millis(50)).await;
                    }
                }
            }
        }
    }
    debug!("worker {id}: exited");
}

/// Plain-HTTP connection handler.  Keeps the zero-copy `sendfile` fast path
/// for static file bodies on Linux.
async fn handle_connection(
    mut stream: tokio::net::TcpStream,
    pipeline: Arc<crate::core::pipeline::PipelineProcessor>,
    port: u16,
) {
    let mut buf = crate::util::buffer::get_buffer(8192);
    buf.resize(8192, 0);
    let mut parser = crate::http::parser::HttpParser::new();

    let _ = connection_loop(&mut stream, &pipeline, &mut buf, &mut parser, port).await;

    crate::util::buffer::return_buffer(buf);
}

/// Inner plain-HTTP connection loop (sendfile fast path).
async fn connection_loop(
    stream: &mut tokio::net::TcpStream,
    pipeline: &Arc<crate::core::pipeline::PipelineProcessor>,
    buf: &mut [u8],
    parser: &mut crate::http::parser::HttpParser,
    port: u16,
) -> Result<(), ()> {
    use tokio::io::{AsyncReadExt, AsyncWriteExt};

    loop {
        let n = match tokio::time::timeout(std::time::Duration::from_secs(60), stream.read(buf))
            .await
        {
            Ok(Ok(0)) => return Ok(()),
            Ok(Ok(n)) => n,
            Ok(Err(_)) | Err(_) => return Ok(()),
        };

        match parser.feed(&buf[..n]) {
            crate::http::parser::ParseResult::Complete(_) => {
                if let Some(request) = parser.take_request() {
                    // WebSocket / upgrade: splice the raw socket to upstream.
                    if let Some(target) = pipeline.upgrade_target(&request, port) {
                        let head = request.to_bytes();
                        let _ = crate::proxy::websocket_tunnel(stream, &head, &target).await;
                        return Ok(());
                    }

                    let keep_alive = request.is_keep_alive();
                    let response = pipeline.process(&request, "http", port).await;

                    if send_response(stream, &response).await.is_err() {
                        return Err(());
                    }
                    if !keep_alive {
                        return Ok(());
                    }
                    parser.reset();
                }
            }
            crate::http::parser::ParseResult::NeedMore => continue,
            crate::http::parser::ParseResult::Error(_) => {
                let response = crate::http::response::Response::bad_request();
                let _ = stream.write_all(&response.to_bytes()).await;
                return Err(());
            }
        }
    }
}

/// Generic connection handler over any byte stream (used for TLS, where
/// `sendfile` is unavailable so file bodies are written through userspace).
async fn handle_connection_generic<S>(
    mut stream: S,
    pipeline: Arc<crate::core::pipeline::PipelineProcessor>,
    scheme: &'static str,
    port: u16,
) where
    S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
    use tokio::io::{AsyncReadExt, AsyncWriteExt};

    let mut buf = vec![0u8; 8192];
    let mut parser = crate::http::parser::HttpParser::new();

    loop {
        let n =
            match tokio::time::timeout(std::time::Duration::from_secs(60), stream.read(&mut buf))
                .await
            {
                Ok(Ok(0)) => return,
                Ok(Ok(n)) => n,
                Ok(Err(_)) | Err(_) => return,
            };

        match parser.feed(&buf[..n]) {
            crate::http::parser::ParseResult::Complete(_) => {
                if let Some(request) = parser.take_request() {
                    if let Some(target) = pipeline.upgrade_target(&request, port) {
                        let head = request.to_bytes();
                        let _ = crate::proxy::websocket_tunnel(&mut stream, &head, &target).await;
                        return;
                    }

                    let keep_alive = request.is_keep_alive();
                    let response = pipeline.process(&request, scheme, port).await;

                    if write_response_generic(&mut stream, &response)
                        .await
                        .is_err()
                    {
                        return;
                    }
                    if !keep_alive {
                        return;
                    }
                    parser.reset();
                }
            }
            crate::http::parser::ParseResult::NeedMore => continue,
            crate::http::parser::ParseResult::Error(_) => {
                let response = crate::http::response::Response::bad_request();
                let _ = stream.write_all(&response.to_bytes()).await;
                return;
            }
        }
    }
}

/// Write a response over a generic (non-`TcpStream`) transport, reading file
/// bodies through userspace since `sendfile` cannot target a TLS socket.
async fn write_response_generic<S>(
    stream: &mut S,
    response: &crate::http::response::Response,
) -> Result<(), std::io::Error>
where
    S: tokio::io::AsyncWrite + Unpin,
{
    use crate::http::response::Body;
    use tokio::io::AsyncWriteExt;

    let extra = &response.headers;
    match &response.body {
        Body::Cached(resp, head) => {
            if *head {
                stream
                    .write_all(&inject_headers(&resp.header_block, extra))
                    .await?;
            } else if let Some(full) = &resp.full {
                if extra.is_empty() {
                    stream.write_all(full).await?;
                } else {
                    stream.write_all(&inject_headers(full, extra)).await?;
                }
            } else {
                stream
                    .write_all(&inject_headers(&resp.header_block, extra))
                    .await?;
                let data = read_file_arc(resp.file.clone(), resp.size).await?;
                stream.write_all(&data).await?;
            }
        }
        Body::FileFd(file, size) => {
            stream.write_all(&response.to_header_bytes()).await?;
            let data = read_file_arc(file.clone(), *size).await?;
            stream.write_all(&data).await?;
        }
        Body::File(path, _size) => {
            stream.write_all(&response.to_header_bytes()).await?;
            let data = tokio::fs::read(path).await?;
            stream.write_all(&data).await?;
        }
        _ => {
            stream.write_all(&response.to_bytes()).await?;
        }
    }
    stream.flush().await
}

/// Splice extra response headers into a pre-serialized header block.
///
/// Static responses are serialized once into a cached byte block (status line,
/// headers, CRLF, and optional body).  When a location adds `add_header` /
/// `expires`, those land in `response.headers`; this inserts them just before
/// the blank line so they appear on the wire without rebuilding the cached
/// response.
fn inject_headers(buf: &[u8], extra: &crate::http::headers::HeaderMap) -> Vec<u8> {
    if extra.is_empty() {
        return buf.to_vec();
    }
    match buf.windows(4).position(|w| w == b"\r\n\r\n") {
        Some(pos) => {
            let mut out = Vec::with_capacity(buf.len() + 96);
            out.extend_from_slice(&buf[..pos + 2]); // up to & incl last header CRLF
            extra.write_to(&mut out); // "Name: value\r\n" for each extra header
            out.extend_from_slice(&buf[pos + 2..]); // blank line + body
            out
        }
        None => buf.to_vec(),
    }
}

/// Read up to `size` bytes from a shared file descriptor on a blocking thread.
async fn read_file_arc(file: std::sync::Arc<std::fs::File>, size: u64) -> std::io::Result<Vec<u8>> {
    tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
        use std::io::Read;
        let mut buf = Vec::with_capacity(size as usize);
        (&*file).take(size).read_to_end(&mut buf)?;
        Ok(buf)
    })
    .await
    .map_err(std::io::Error::other)?
}

/// Send an HTTP response over the TCP stream.
///
/// For file-backed bodies the headers are written first and the body is then
/// transferred with the `sendfile(2)` syscall on Linux for true zero-copy
/// I/O (the file contents never enter user space).  `Body::FileFd` reuses a
/// cached open descriptor and avoids a per-request `open(2)`.  For
/// `Body::Bytes` (and as a cross-platform fallback) the full serialized
/// response is written in one go.
async fn send_response(
    stream: &mut tokio::net::TcpStream,
    response: &crate::http::response::Response,
) -> Result<(), std::io::Error> {
    use crate::http::response::Body;
    use tokio::io::AsyncWriteExt;

    let extra = &response.headers;
    match &response.body {
        Body::Cached(resp, head) => {
            if *head {
                // HEAD: headers only.
                if extra.is_empty() {
                    stream.write_all(&resp.header_block).await
                } else {
                    stream
                        .write_all(&inject_headers(&resp.header_block, extra))
                        .await
                }
            } else if let Some(full) = &resp.full {
                // Small file: the entire response is one contiguous buffer.
                if extra.is_empty() {
                    stream.write_all(full).await
                } else {
                    stream.write_all(&inject_headers(full, extra)).await
                }
            } else {
                // Large file: stream the body zero-copy with sendfile.  The
                // header and body are flushed within a single readiness event
                // (one reactor interaction in the common case); the autotuned
                // send buffer absorbs the transfer in one or two calls and
                // TCP_NODELAY prevents header stalls.  (A blocking-thread
                // poll/sendfile loop was tried and reverted: it collapsed
                // under high concurrency as hundreds of blocking threads
                // thrashed the scheduler.)
                #[cfg(target_os = "linux")]
                {
                    if extra.is_empty() {
                        send_header_and_file(stream, &resp.header_block, &resp.file, resp.size)
                            .await
                    } else {
                        let hb = inject_headers(&resp.header_block, extra);
                        send_header_and_file(stream, &hb, &resp.file, resp.size).await
                    }
                }
                #[cfg(not(target_os = "linux"))]
                {
                    if extra.is_empty() {
                        stream.write_all(&resp.header_block).await?;
                    } else {
                        stream
                            .write_all(&inject_headers(&resp.header_block, extra))
                            .await?;
                    }
                    send_file_body(stream, &resp.file, resp.size).await
                }
            }
        }
        Body::FileFd(file, size) => {
            let headers = response.to_header_bytes();
            stream.write_all(&headers).await?;
            send_file_body(stream, file, *size).await
        }
        Body::File(path, size) => {
            let headers = response.to_header_bytes();
            stream.write_all(&headers).await?;
            let file = std::sync::Arc::new(std::fs::File::open(path)?);
            send_file_body(stream, &file, *size).await
        }
        _ => {
            // Body::Bytes or Body::Empty -- serialize everything together.
            let bytes = response.to_bytes();
            stream.write_all(&bytes).await
        }
    }
}

/// Toggle `TCP_CORK` on the socket (Linux only; no-op elsewhere).
///
/// Corking holds back small writes so the response header and the
/// subsequent `sendfile` body are merged into full-sized TCP segments,
/// mirroring nginx's `tcp_nopush`.  Uncorking flushes any held data.
#[cfg(target_os = "linux")]
fn set_cork(stream: &tokio::net::TcpStream, on: bool) {
    use std::os::unix::io::AsRawFd;
    let val: libc::c_int = if on { 1 } else { 0 };
    unsafe {
        libc::setsockopt(
            stream.as_raw_fd(),
            libc::IPPROTO_TCP,
            libc::TCP_CORK,
            &val as *const _ as *const libc::c_void,
            std::mem::size_of::<libc::c_int>() as libc::socklen_t,
        );
    }
}

#[cfg(not(target_os = "linux"))]
#[allow(dead_code)] // TCP_CORK is Linux-only; this is a no-op stub elsewhere
fn set_cork(_stream: &tokio::net::TcpStream, _on: bool) {}

/// Transfer a file body to the socket, using zero-copy `sendfile(2)` on Linux
/// and a buffered fallback elsewhere.
async fn send_file_body(
    stream: &mut tokio::net::TcpStream,
    file: &std::sync::Arc<std::fs::File>,
    size: u64,
) -> Result<(), std::io::Error> {
    #[cfg(target_os = "linux")]
    {
        sendfile_zero_copy(stream, file, size).await
    }
    #[cfg(not(target_os = "linux"))]
    {
        use tokio::io::AsyncWriteExt;
        let f = file.clone();
        let want = size as usize;
        let data = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
            use std::io::Read;
            let mut buf = Vec::with_capacity(want);
            (&*f).take(want as u64).read_to_end(&mut buf)?;
            Ok(buf)
        })
        .await
        .map_err(std::io::Error::other)??;
        stream.write_all(&data).await
    }
}

/// Send a response header followed by a file body within a single socket
/// readiness event on Linux.
///
/// The header (a small in-memory buffer) is flushed with `send(2)` and the
/// body is streamed with zero-copy `sendfile(2)`, both inside one
/// `async_io` closure.  In the common case (warm send buffer) the whole
/// response leaves in a single reactor interaction, avoiding the extra
/// poll cycle a separate `write_all` for the header would cost.
#[cfg(target_os = "linux")]
async fn send_header_and_file(
    stream: &mut tokio::net::TcpStream,
    header: &[u8],
    file: &std::fs::File,
    size: u64,
) -> Result<(), std::io::Error> {
    use std::os::unix::io::AsRawFd;
    use tokio::io::Interest;

    let in_fd = file.as_raw_fd();
    let out_fd = stream.as_raw_fd();
    let total = size as libc::off_t;
    let mut hdr_off = 0usize;
    let mut offset: libc::off_t = 0;

    stream
        .async_io(Interest::WRITABLE, || {
            // 1. Flush the header (MSG_NOSIGNAL avoids SIGPIPE on a closed peer).
            while hdr_off < header.len() {
                let n = unsafe {
                    libc::send(
                        out_fd,
                        header[hdr_off..].as_ptr() as *const libc::c_void,
                        header.len() - hdr_off,
                        libc::MSG_NOSIGNAL,
                    )
                };
                if n < 0 {
                    return Err(std::io::Error::last_os_error());
                }
                hdr_off += n as usize;
            }
            // 2. Stream the body zero-copy.
            while offset < total {
                let count = ((total - offset) as usize).min(0x7fff_f000);
                let n = unsafe { libc::sendfile(out_fd, in_fd, &mut offset, count) };
                if n < 0 {
                    return Err(std::io::Error::last_os_error());
                }
                if n == 0 {
                    break;
                }
            }
            Ok(())
        })
        .await
}

/// Zero-copy file-to-socket transfer using the Linux `sendfile(2)` syscall.
///
/// The socket is non-blocking (tokio), so `sendfile` may return `EAGAIN`
/// when the socket send buffer is full; [`TcpStream::async_io`] parks the
/// task until the socket is writable again and retries.  The kernel copies
/// directly from the page cache to the socket, never touching user space.
#[cfg(target_os = "linux")]
async fn sendfile_zero_copy(
    stream: &mut tokio::net::TcpStream,
    file: &std::fs::File,
    size: u64,
) -> Result<(), std::io::Error> {
    use std::os::unix::io::AsRawFd;
    use tokio::io::Interest;

    let in_fd = file.as_raw_fd();
    let out_fd = stream.as_raw_fd();
    let total = size as libc::off_t;
    let mut offset: libc::off_t = 0;

    // Drain as much as the kernel will accept on each writability event.
    // The inner loop keeps issuing sendfile while bytes flow; it only
    // returns to the async runtime when the socket would block (EAGAIN),
    // which avoids per-chunk reactor/future overhead and matches the tight
    // sendfile loop nginx runs.  `offset` lives across retries; sendfile
    // advances it in place and never touches the fd's own offset, so a
    // shared cached descriptor is safe.
    stream
        .async_io(Interest::WRITABLE, || {
            while offset < total {
                let count = ((total - offset) as usize).min(0x7fff_f000);
                let n = unsafe { libc::sendfile(out_fd, in_fd, &mut offset, count) };
                if n < 0 {
                    // EAGAIN -> park until writable; any other errno -> fail.
                    return Err(std::io::Error::last_os_error());
                }
                if n == 0 {
                    // Premature EOF (file shorter than advertised); stop.
                    break;
                }
            }
            Ok(())
        })
        .await
}

async fn wait_for_shutdown(rx: &mut broadcast::Receiver<ServerSignal>) {
    loop {
        match rx.recv().await {
            Ok(ServerSignal::Shutdown) => return,
            Ok(ServerSignal::Reload) => {
                info!("master: reload requested");
            }
            Ok(ServerSignal::ReopenLogs) => {
                info!("master: reopen-logs requested");
            }
            Err(broadcast::error::RecvError::Lagged(n)) => {
                warn!("master: missed {n} signal(s)");
            }
            Err(broadcast::error::RecvError::Closed) => {
                return;
            }
        }
    }
}

fn init_logging() {
    use tracing_subscriber::EnvFilter;
    let filter =
        EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("veld=info,warn"));
    tracing_subscriber::fmt()
        .with_env_filter(filter)
        .with_target(true)
        .compact()
        .init();
}

fn print_banner() {
    info!("{VERSION} starting");
    info!("targeting {}", std::env::consts::OS);
}