pub mod acceptor;
pub mod connection;
pub mod pipeline;
pub mod timer;
pub mod worker;
pub use connection::{
Connection, ConnectionError, ConnectionState, ConnectionStream, RequestHandler,
};
use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tracing::{info, warn};
use crate::config::ParsedConfig;
use crate::core::worker::Worker;
use crate::util::signal::ServerSignal;
/// Main server that manages workers and handles signals
pub struct Server {
config: Arc<ParsedConfig>,
shutdown_tx: broadcast::Sender<ServerSignal>,
reload_tx: broadcast::Sender<ServerSignal>,
}
impl Server {
pub fn new(config: ParsedConfig) -> Self {
let (shutdown_tx, _) = broadcast::channel(16);
let (reload_tx, _) = broadcast::channel(16);
Self {
config: Arc::new(config),
shutdown_tx,
reload_tx,
}
}
/// Start the server with the given configuration
pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
let config = &self.config;
// Determine listen address
let listen_addr = config
.http
.servers
.first()
.and_then(|s| s.listen.first())
.map(|l| format!("{}:{}", l.addr, l.port))
.unwrap_or_else(|| "0.0.0.0:80".to_string());
// Create TCP listener
let listener = TcpListener::bind(&listen_addr).await?;
let listener = Arc::new(listener);
info!("Listening on {}", listen_addr);
// Determine number of workers
let num_workers = if config.worker_processes == 0 {
num_cpus::get()
} else {
config.worker_processes
};
info!("Starting {} worker(s)", num_workers);
// Spawn workers
let mut handles = Vec::new();
for i in 0..num_workers {
let listener = listener.clone();
let config = self.config.clone();
let shutdown_rx = self.shutdown_tx.subscribe();
let reload_rx = self.reload_tx.subscribe();
let handle = tokio::spawn(async move {
let worker = Worker::new(i, config);
worker.run(listener, shutdown_rx, reload_rx).await;
});
handles.push(handle);
}
// Set up signal handler
let signal_handler = crate::util::signal::SignalHandler::new();
let mut signal_rx = signal_handler.subscribe();
// Spawn signal listener
let shutdown_tx = self.shutdown_tx.clone();
let reload_tx = self.reload_tx.clone();
tokio::spawn(async move {
signal_handler.run().await;
});
// Wait for signals
loop {
match signal_rx.recv().await {
Ok(ServerSignal::Shutdown) => {
info!("Initiating graceful shutdown");
let _ = shutdown_tx.send(ServerSignal::Shutdown);
break;
}
Ok(ServerSignal::Reload) => {
info!("Reloading configuration");
let _ = reload_tx.send(ServerSignal::Reload);
}
Ok(ServerSignal::ReopenLogs) => {
info!("Reopening log files");
}
Err(e) => {
warn!("Signal receiver error: {}", e);
break;
}
}
}
// Wait for workers to finish
for handle in handles {
let _ = handle.await;
}
info!("Server shutdown complete");
Ok(())
}
/// Graceful shutdown
pub fn shutdown(&self) {
let _ = self.shutdown_tx.send(ServerSignal::Shutdown);
}
/// Reload configuration
pub fn reload(&self, _new_config: ParsedConfig) {
let _ = self.reload_tx.send(ServerSignal::Reload);
}
}