//! veld: a high-performance HTTP server written in Rust, inspired by nginx.
pub mod config;
pub mod core;
pub mod handler;
pub mod http;
pub mod log;
pub mod proxy;
pub mod tls;
pub mod util;
use std::path::{Path, PathBuf};
use std::process::ExitCode;
use std::sync::Arc;
use clap::{Parser, Subcommand};
use tokio::sync::broadcast;
use tracing::{debug, error, info, warn};
use crate::config::directives::ParsedConfig;
use crate::util::signal::{ServerSignal, SignalHandler};
const VERSION: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
const DEFAULT_CONFIG_PATH: &str = "conf/veld.conf";
#[derive(Parser, Debug)]
#[command(
name = "veld",
version,
about = "A high-performance HTTP server inspired by nginx"
)]
struct Cli {
#[command(subcommand)]
command: Option<Command>,
#[arg(short = 'c', long = "config", default_value = DEFAULT_CONFIG_PATH)]
config: PathBuf,
#[arg(short = 'p', long = "prefix")]
prefix: Option<PathBuf>,
#[arg(long = "workers")]
workers: Option<usize>,
}
#[derive(Subcommand, Debug, Clone, PartialEq, Eq)]
enum Command {
Start,
Test,
Reload,
}
#[tokio::main]
async fn main() -> ExitCode {
let cli = Cli::parse();
init_logging();
let prefix = cli
.prefix
.clone()
.unwrap_or_else(|| std::env::current_dir().unwrap_or_else(|_| PathBuf::from(".")));
let config_path = if cli.config.is_absolute() {
cli.config.clone()
} else {
prefix.join(&cli.config)
};
let subcommand = cli.command.unwrap_or(Command::Start);
match subcommand {
Command::Test => {
if let Err(e) = run_config_test(&config_path) {
error!("configuration test failed: {e}");
return ExitCode::FAILURE;
}
println!(
"configuration file {} test is successful",
config_path.display()
);
ExitCode::SUCCESS
}
Command::Reload => {
if let Err(e) = run_reload(&prefix) {
error!("reload failed: {e}");
return ExitCode::FAILURE;
}
ExitCode::SUCCESS
}
Command::Start => {
print_banner();
let worker_count = cli.workers.unwrap_or(0);
if let Err(e) = run_server(&config_path, &prefix, worker_count).await {
error!("server exited with error: {e}");
return ExitCode::FAILURE;
}
ExitCode::SUCCESS
}
}
}
fn run_config_test(path: &Path) -> Result<(), Box<dyn std::error::Error>> {
info!("testing configuration file: {}", path.display());
let _cfg = config::load_config(path)?;
info!("configuration syntax is ok");
Ok(())
}
fn run_reload(prefix: &Path) -> Result<(), Box<dyn std::error::Error>> {
#[cfg(unix)]
{
let pid_path = prefix.join("logs").join("veld.pid");
let pid_str = std::fs::read_to_string(&pid_path)?;
let pid: i32 = pid_str.trim().parse()?;
unsafe {
libc::kill(pid, libc::SIGHUP);
}
info!("sent SIGHUP to process {pid}");
}
#[cfg(windows)]
{
let control_path = prefix.join("logs").join("veld.reload");
std::fs::write(&control_path, "reload")?;
info!("wrote reload control file");
eprintln!("reload signal sent");
}
Ok(())
}
async fn run_server(
config_path: &Path,
prefix: &Path,
worker_override: usize,
) -> Result<(), Box<dyn std::error::Error>> {
info!("loading configuration from {}", config_path.display());
let cfg = config::load_config(config_path)?;
info!("configuration loaded successfully");
// Debug: print parsed config
for server in &cfg.http.servers {
for listen in &server.listen {
info!(
" parsed listen: addr='{}', port={}, ssl={}",
listen.addr, listen.port, listen.ssl
);
}
info!(" server_name: {:?}", server.server_name);
info!(" root: {:?}", server.root);
}
let effective_workers = if worker_override != 0 {
worker_override
} else {
let w = cfg.worker_processes;
if w == 0 {
util::num_cpus()
} else {
w
}
};
info!("worker count: {effective_workers}");
// Collect the distinct listen endpoints across every server block. A
// single port is bound once; it is treated as TLS if any `listen` on it
// carried the `ssl` flag.
let endpoints = collect_endpoints(&cfg);
if endpoints.is_empty() {
return Err("no listen directives found in configuration".into());
}
// Build one SNI-aware TLS acceptor covering every HTTPS server block.
let tls_acceptor = if endpoints.iter().any(|e| e.ssl) {
Some(Arc::new(build_tls_acceptor(&cfg)?))
} else {
None
};
// Write PID file
let pid_dir = prefix.join("logs");
std::fs::create_dir_all(&pid_dir)?;
let pid_path = pid_dir.join("veld.pid");
std::fs::write(&pid_path, std::process::id().to_string())?;
// Signal handling
let signal_handler = SignalHandler::new();
let mut shutdown_rx = signal_handler.subscribe();
tokio::spawn(async move {
signal_handler.run().await;
});
// One shared pipeline for all workers/listeners.
let pipeline = Arc::new(crate::core::pipeline::PipelineProcessor::new(Arc::new(
cfg.clone(),
)));
// On Linux each worker gets its own SO_REUSEPORT listener; elsewhere a
// single listener per endpoint is shared across workers.
let reuseport = cfg!(target_os = "linux");
let mut worker_handles = Vec::new();
for endpoint in &endpoints {
let addr = format!("0.0.0.0:{}", endpoint.port);
info!(
"listening on {addr} ({})",
if endpoint.ssl { "https" } else { "http" }
);
let shared_listener = if reuseport {
None
} else {
Some(Arc::new(build_listener(&addr)?))
};
for worker_id in 0..effective_workers {
let listener = match &shared_listener {
Some(l) => l.clone(),
None => Arc::new(build_listener(&addr)?),
};
let mut worker_shutdown = shutdown_rx.resubscribe();
let pipeline = pipeline.clone();
let tls = if endpoint.ssl {
tls_acceptor.clone()
} else {
None
};
let ssl = endpoint.ssl;
let port = endpoint.port;
let handle = tokio::spawn(async move {
run_worker(
worker_id,
listener,
&mut worker_shutdown,
pipeline,
ssl,
port,
tls,
)
.await;
});
worker_handles.push(handle);
}
}
// Wait for shutdown
wait_for_shutdown(&mut shutdown_rx).await;
info!("shutdown signal received, draining workers...");
for handle in worker_handles {
let _ = handle.await;
}
let _ = std::fs::remove_file(&pid_path);
info!("server stopped");
Ok(())
}
/// A distinct address/port the server binds, plus whether it terminates TLS.
#[derive(Debug, Clone)]
struct Endpoint {
port: u16,
ssl: bool,
}
/// Collapse every server's `listen` directives into one entry per port.
fn collect_endpoints(cfg: &ParsedConfig) -> Vec<Endpoint> {
use std::collections::BTreeMap;
let mut ports: BTreeMap<u16, bool> = BTreeMap::new();
for server in &cfg.http.servers {
for l in &server.listen {
let entry = ports.entry(l.port).or_insert(false);
*entry = *entry || l.ssl;
}
}
ports
.into_iter()
.map(|(port, ssl)| Endpoint { port, ssl })
.collect()
}
/// Build the SNI TLS acceptor from every server block that has an
/// `ssl_certificate`.
fn build_tls_acceptor(
cfg: &ParsedConfig,
) -> Result<tokio_rustls::TlsAcceptor, Box<dyn std::error::Error>> {
let mut mgr = crate::tls::TlsManager::new();
for server in &cfg.http.servers {
if let Some(ssl) = &server.ssl {
if ssl.certificate.as_os_str().is_empty() {
continue;
}
mgr.add_cert(&server.server_name, &ssl.certificate, &ssl.certificate_key)
.map_err(|e| format!("loading cert for {:?}: {e}", server.server_name))?;
info!("loaded TLS cert for {:?}", server.server_name);
}
}
if !mgr.has_certs() {
return Err("ssl listener configured but no ssl_certificate found".into());
}
Ok(mgr.build_acceptor()?)
}
/// Build a listening socket.
///
/// Sets `SO_REUSEADDR`, and on Unix `SO_REUSEPORT`, so multiple workers can
/// each bind the same address and have the kernel distribute connections.
/// A generous backlog reduces connection drops under bursty load.
fn build_listener(addr: &str) -> Result<tokio::net::TcpListener, std::io::Error> {
use socket2::{Domain, Protocol, Socket, Type};
let sockaddr: std::net::SocketAddr = addr.parse().map_err(|_| {
std::io::Error::new(std::io::ErrorKind::InvalidInput, "invalid listen address")
})?;
let domain = if sockaddr.is_ipv6() {
Domain::IPV6
} else {
Domain::IPV4
};
let socket = Socket::new(domain, Type::STREAM, Some(Protocol::TCP))?;
socket.set_reuse_address(true)?;
#[cfg(all(unix, not(target_os = "solaris"), not(target_os = "illumos")))]
socket.set_reuse_port(true)?;
socket.set_nonblocking(true)?;
socket.bind(&sockaddr.into())?;
socket.listen(1024)?;
tokio::net::TcpListener::from_std(socket.into())
}
async fn run_worker(
id: usize,
listener: Arc<tokio::net::TcpListener>,
shutdown_rx: &mut broadcast::Receiver<ServerSignal>,
pipeline: Arc<crate::core::pipeline::PipelineProcessor>,
ssl: bool,
port: u16,
tls: Option<Arc<tokio_rustls::TlsAcceptor>>,
) {
debug!("worker {id}: started (ssl={ssl}, port={port})");
loop {
tokio::select! {
biased;
result = shutdown_rx.recv() => {
match result {
Ok(ServerSignal::Shutdown) => {
info!("worker {id}: received shutdown signal");
break;
}
Ok(ServerSignal::Reload) => {
info!("worker {id}: received reload signal");
}
Ok(ServerSignal::ReopenLogs) => {
info!("worker {id}: received reopen-logs signal");
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("worker {id}: missed {n} signal(s)");
}
Err(broadcast::error::RecvError::Closed) => {
break;
}
}
}
result = listener.accept() => {
match result {
Ok((stream, addr)) => {
debug!("worker {id}: accepted connection from {addr}");
let _ = stream.set_nodelay(true);
#[cfg(unix)]
{
let sref = socket2::SockRef::from(&stream);
let _ = sref.set_send_buffer_size(512 * 1024);
}
let pipeline = pipeline.clone();
let tls = tls.clone();
tokio::spawn(async move {
if let Some(acceptor) = tls {
// TLS listener: perform the handshake, then run
// the connection over the encrypted stream.
match acceptor.accept(stream).await {
Ok(tls_stream) => {
handle_connection_generic(tls_stream, pipeline, "https", port).await;
}
Err(e) => debug!("worker {id}: TLS handshake failed: {e}"),
}
} else {
handle_connection(stream, pipeline, port).await;
}
});
}
Err(e) => {
warn!("worker {id}: accept error: {e}");
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
}
}
}
debug!("worker {id}: exited");
}
/// Plain-HTTP connection handler. Keeps the zero-copy `sendfile` fast path
/// for static file bodies on Linux.
async fn handle_connection(
mut stream: tokio::net::TcpStream,
pipeline: Arc<crate::core::pipeline::PipelineProcessor>,
port: u16,
) {
let mut buf = crate::util::buffer::get_buffer(8192);
buf.resize(8192, 0);
let mut parser = crate::http::parser::HttpParser::new();
let _ = connection_loop(&mut stream, &pipeline, &mut buf, &mut parser, port).await;
crate::util::buffer::return_buffer(buf);
}
/// Inner plain-HTTP connection loop (sendfile fast path).
async fn connection_loop(
stream: &mut tokio::net::TcpStream,
pipeline: &Arc<crate::core::pipeline::PipelineProcessor>,
buf: &mut [u8],
parser: &mut crate::http::parser::HttpParser,
port: u16,
) -> Result<(), ()> {
use tokio::io::{AsyncReadExt, AsyncWriteExt};
loop {
let n = match tokio::time::timeout(std::time::Duration::from_secs(60), stream.read(buf))
.await
{
Ok(Ok(0)) => return Ok(()),
Ok(Ok(n)) => n,
Ok(Err(_)) | Err(_) => return Ok(()),
};
match parser.feed(&buf[..n]) {
crate::http::parser::ParseResult::Complete(_) => {
if let Some(request) = parser.take_request() {
// WebSocket / upgrade: splice the raw socket to upstream.
if let Some(target) = pipeline.upgrade_target(&request, port) {
let head = request.to_bytes();
let _ = crate::proxy::websocket_tunnel(stream, &head, &target).await;
return Ok(());
}
let keep_alive = request.is_keep_alive();
let response = pipeline.process(&request, "http", port).await;
if send_response(stream, &response).await.is_err() {
return Err(());
}
if !keep_alive {
return Ok(());
}
parser.reset();
}
}
crate::http::parser::ParseResult::NeedMore => continue,
crate::http::parser::ParseResult::Error(_) => {
let response = crate::http::response::Response::bad_request();
let _ = stream.write_all(&response.to_bytes()).await;
return Err(());
}
}
}
}
/// Generic connection handler over any byte stream (used for TLS, where
/// `sendfile` is unavailable so file bodies are written through userspace).
async fn handle_connection_generic<S>(
mut stream: S,
pipeline: Arc<crate::core::pipeline::PipelineProcessor>,
scheme: &'static str,
port: u16,
) where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
use tokio::io::{AsyncReadExt, AsyncWriteExt};
let mut buf = vec![0u8; 8192];
let mut parser = crate::http::parser::HttpParser::new();
loop {
let n =
match tokio::time::timeout(std::time::Duration::from_secs(60), stream.read(&mut buf))
.await
{
Ok(Ok(0)) => return,
Ok(Ok(n)) => n,
Ok(Err(_)) | Err(_) => return,
};
match parser.feed(&buf[..n]) {
crate::http::parser::ParseResult::Complete(_) => {
if let Some(request) = parser.take_request() {
if let Some(target) = pipeline.upgrade_target(&request, port) {
let head = request.to_bytes();
let _ = crate::proxy::websocket_tunnel(&mut stream, &head, &target).await;
return;
}
let keep_alive = request.is_keep_alive();
let response = pipeline.process(&request, scheme, port).await;
if write_response_generic(&mut stream, &response)
.await
.is_err()
{
return;
}
if !keep_alive {
return;
}
parser.reset();
}
}
crate::http::parser::ParseResult::NeedMore => continue,
crate::http::parser::ParseResult::Error(_) => {
let response = crate::http::response::Response::bad_request();
let _ = stream.write_all(&response.to_bytes()).await;
return;
}
}
}
}
/// Write a response over a generic (non-`TcpStream`) transport, reading file
/// bodies through userspace since `sendfile` cannot target a TLS socket.
async fn write_response_generic<S>(
stream: &mut S,
response: &crate::http::response::Response,
) -> Result<(), std::io::Error>
where
S: tokio::io::AsyncWrite + Unpin,
{
use crate::http::response::Body;
use tokio::io::AsyncWriteExt;
let extra = &response.headers;
match &response.body {
Body::Cached(resp, head) => {
if *head {
stream
.write_all(&inject_headers(&resp.header_block, extra))
.await?;
} else if let Some(full) = &resp.full {
if extra.is_empty() {
stream.write_all(full).await?;
} else {
stream.write_all(&inject_headers(full, extra)).await?;
}
} else {
stream
.write_all(&inject_headers(&resp.header_block, extra))
.await?;
let data = read_file_arc(resp.file.clone(), resp.size).await?;
stream.write_all(&data).await?;
}
}
Body::FileFd(file, size) => {
stream.write_all(&response.to_header_bytes()).await?;
let data = read_file_arc(file.clone(), *size).await?;
stream.write_all(&data).await?;
}
Body::File(path, _size) => {
stream.write_all(&response.to_header_bytes()).await?;
let data = tokio::fs::read(path).await?;
stream.write_all(&data).await?;
}
_ => {
stream.write_all(&response.to_bytes()).await?;
}
}
stream.flush().await
}
/// Splice extra response headers into a pre-serialized header block.
///
/// Static responses are serialized once into a cached byte block (status line,
/// headers, CRLF, and optional body). When a location adds `add_header` /
/// `expires`, those land in `response.headers`; this inserts them just before
/// the blank line so they appear on the wire without rebuilding the cached
/// response.
fn inject_headers(buf: &[u8], extra: &crate::http::headers::HeaderMap) -> Vec<u8> {
if extra.is_empty() {
return buf.to_vec();
}
match buf.windows(4).position(|w| w == b"\r\n\r\n") {
Some(pos) => {
let mut out = Vec::with_capacity(buf.len() + 96);
out.extend_from_slice(&buf[..pos + 2]); // up to & incl last header CRLF
extra.write_to(&mut out); // "Name: value\r\n" for each extra header
out.extend_from_slice(&buf[pos + 2..]); // blank line + body
out
}
None => buf.to_vec(),
}
}
/// Read up to `size` bytes from a shared file descriptor on a blocking thread.
async fn read_file_arc(file: std::sync::Arc<std::fs::File>, size: u64) -> std::io::Result<Vec<u8>> {
tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
use std::io::Read;
let mut buf = Vec::with_capacity(size as usize);
(&*file).take(size).read_to_end(&mut buf)?;
Ok(buf)
})
.await
.map_err(std::io::Error::other)?
}
/// Send an HTTP response over the TCP stream.
///
/// For file-backed bodies the headers are written first and the body is then
/// transferred with the `sendfile(2)` syscall on Linux for true zero-copy
/// I/O (the file contents never enter user space). `Body::FileFd` reuses a
/// cached open descriptor and avoids a per-request `open(2)`. For
/// `Body::Bytes` (and as a cross-platform fallback) the full serialized
/// response is written in one go.
async fn send_response(
stream: &mut tokio::net::TcpStream,
response: &crate::http::response::Response,
) -> Result<(), std::io::Error> {
use crate::http::response::Body;
use tokio::io::AsyncWriteExt;
let extra = &response.headers;
match &response.body {
Body::Cached(resp, head) => {
if *head {
// HEAD: headers only.
if extra.is_empty() {
stream.write_all(&resp.header_block).await
} else {
stream
.write_all(&inject_headers(&resp.header_block, extra))
.await
}
} else if let Some(full) = &resp.full {
// Small file: the entire response is one contiguous buffer.
if extra.is_empty() {
stream.write_all(full).await
} else {
stream.write_all(&inject_headers(full, extra)).await
}
} else {
// Large file: stream the body zero-copy with sendfile. The
// header and body are flushed within a single readiness event
// (one reactor interaction in the common case); the autotuned
// send buffer absorbs the transfer in one or two calls and
// TCP_NODELAY prevents header stalls. (A blocking-thread
// poll/sendfile loop was tried and reverted: it collapsed
// under high concurrency as hundreds of blocking threads
// thrashed the scheduler.)
#[cfg(target_os = "linux")]
{
if extra.is_empty() {
send_header_and_file(stream, &resp.header_block, &resp.file, resp.size)
.await
} else {
let hb = inject_headers(&resp.header_block, extra);
send_header_and_file(stream, &hb, &resp.file, resp.size).await
}
}
#[cfg(not(target_os = "linux"))]
{
if extra.is_empty() {
stream.write_all(&resp.header_block).await?;
} else {
stream
.write_all(&inject_headers(&resp.header_block, extra))
.await?;
}
send_file_body(stream, &resp.file, resp.size).await
}
}
}
Body::FileFd(file, size) => {
let headers = response.to_header_bytes();
stream.write_all(&headers).await?;
send_file_body(stream, file, *size).await
}
Body::File(path, size) => {
let headers = response.to_header_bytes();
stream.write_all(&headers).await?;
let file = std::sync::Arc::new(std::fs::File::open(path)?);
send_file_body(stream, &file, *size).await
}
_ => {
// Body::Bytes or Body::Empty -- serialize everything together.
let bytes = response.to_bytes();
stream.write_all(&bytes).await
}
}
}
/// Toggle `TCP_CORK` on the socket (Linux only; no-op elsewhere).
///
/// Corking holds back small writes so the response header and the
/// subsequent `sendfile` body are merged into full-sized TCP segments,
/// mirroring nginx's `tcp_nopush`. Uncorking flushes any held data.
#[cfg(target_os = "linux")]
fn set_cork(stream: &tokio::net::TcpStream, on: bool) {
use std::os::unix::io::AsRawFd;
let val: libc::c_int = if on { 1 } else { 0 };
unsafe {
libc::setsockopt(
stream.as_raw_fd(),
libc::IPPROTO_TCP,
libc::TCP_CORK,
&val as *const _ as *const libc::c_void,
std::mem::size_of::<libc::c_int>() as libc::socklen_t,
);
}
}
#[cfg(not(target_os = "linux"))]
#[allow(dead_code)] // TCP_CORK is Linux-only; this is a no-op stub elsewhere
fn set_cork(_stream: &tokio::net::TcpStream, _on: bool) {}
/// Transfer a file body to the socket, using zero-copy `sendfile(2)` on Linux
/// and a buffered fallback elsewhere.
async fn send_file_body(
stream: &mut tokio::net::TcpStream,
file: &std::sync::Arc<std::fs::File>,
size: u64,
) -> Result<(), std::io::Error> {
#[cfg(target_os = "linux")]
{
sendfile_zero_copy(stream, file, size).await
}
#[cfg(not(target_os = "linux"))]
{
use tokio::io::AsyncWriteExt;
let f = file.clone();
let want = size as usize;
let data = tokio::task::spawn_blocking(move || -> std::io::Result<Vec<u8>> {
use std::io::Read;
let mut buf = Vec::with_capacity(want);
(&*f).take(want as u64).read_to_end(&mut buf)?;
Ok(buf)
})
.await
.map_err(std::io::Error::other)??;
stream.write_all(&data).await
}
}
/// Send a response header followed by a file body within a single socket
/// readiness event on Linux.
///
/// The header (a small in-memory buffer) is flushed with `send(2)` and the
/// body is streamed with zero-copy `sendfile(2)`, both inside one
/// `async_io` closure. In the common case (warm send buffer) the whole
/// response leaves in a single reactor interaction, avoiding the extra
/// poll cycle a separate `write_all` for the header would cost.
#[cfg(target_os = "linux")]
async fn send_header_and_file(
stream: &mut tokio::net::TcpStream,
header: &[u8],
file: &std::fs::File,
size: u64,
) -> Result<(), std::io::Error> {
use std::os::unix::io::AsRawFd;
use tokio::io::Interest;
let in_fd = file.as_raw_fd();
let out_fd = stream.as_raw_fd();
let total = size as libc::off_t;
let mut hdr_off = 0usize;
let mut offset: libc::off_t = 0;
stream
.async_io(Interest::WRITABLE, || {
// 1. Flush the header (MSG_NOSIGNAL avoids SIGPIPE on a closed peer).
while hdr_off < header.len() {
let n = unsafe {
libc::send(
out_fd,
header[hdr_off..].as_ptr() as *const libc::c_void,
header.len() - hdr_off,
libc::MSG_NOSIGNAL,
)
};
if n < 0 {
return Err(std::io::Error::last_os_error());
}
hdr_off += n as usize;
}
// 2. Stream the body zero-copy.
while offset < total {
let count = ((total - offset) as usize).min(0x7fff_f000);
let n = unsafe { libc::sendfile(out_fd, in_fd, &mut offset, count) };
if n < 0 {
return Err(std::io::Error::last_os_error());
}
if n == 0 {
break;
}
}
Ok(())
})
.await
}
/// Zero-copy file-to-socket transfer using the Linux `sendfile(2)` syscall.
///
/// The socket is non-blocking (tokio), so `sendfile` may return `EAGAIN`
/// when the socket send buffer is full; [`TcpStream::async_io`] parks the
/// task until the socket is writable again and retries. The kernel copies
/// directly from the page cache to the socket, never touching user space.
#[cfg(target_os = "linux")]
async fn sendfile_zero_copy(
stream: &mut tokio::net::TcpStream,
file: &std::fs::File,
size: u64,
) -> Result<(), std::io::Error> {
use std::os::unix::io::AsRawFd;
use tokio::io::Interest;
let in_fd = file.as_raw_fd();
let out_fd = stream.as_raw_fd();
let total = size as libc::off_t;
let mut offset: libc::off_t = 0;
// Drain as much as the kernel will accept on each writability event.
// The inner loop keeps issuing sendfile while bytes flow; it only
// returns to the async runtime when the socket would block (EAGAIN),
// which avoids per-chunk reactor/future overhead and matches the tight
// sendfile loop nginx runs. `offset` lives across retries; sendfile
// advances it in place and never touches the fd's own offset, so a
// shared cached descriptor is safe.
stream
.async_io(Interest::WRITABLE, || {
while offset < total {
let count = ((total - offset) as usize).min(0x7fff_f000);
let n = unsafe { libc::sendfile(out_fd, in_fd, &mut offset, count) };
if n < 0 {
// EAGAIN -> park until writable; any other errno -> fail.
return Err(std::io::Error::last_os_error());
}
if n == 0 {
// Premature EOF (file shorter than advertised); stop.
break;
}
}
Ok(())
})
.await
}
async fn wait_for_shutdown(rx: &mut broadcast::Receiver<ServerSignal>) {
loop {
match rx.recv().await {
Ok(ServerSignal::Shutdown) => return,
Ok(ServerSignal::Reload) => {
info!("master: reload requested");
}
Ok(ServerSignal::ReopenLogs) => {
info!("master: reopen-logs requested");
}
Err(broadcast::error::RecvError::Lagged(n)) => {
warn!("master: missed {n} signal(s)");
}
Err(broadcast::error::RecvError::Closed) => {
return;
}
}
}
}
fn init_logging() {
use tracing_subscriber::EnvFilter;
let filter =
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("veld=info,warn"));
tracing_subscriber::fmt()
.with_env_filter(filter)
.with_target(true)
.compact()
.init();
}
fn print_banner() {
info!("{VERSION} starting");
info!("targeting {}", std::env::consts::OS);
}