Ferrit Explore
中文·繁體·EN·日本語 Sign in Register
cielxl / veld / src / core / mod.rs
pub mod acceptor;
pub mod connection;
pub mod pipeline;
pub mod timer;
pub mod worker;

pub use connection::{
    Connection, ConnectionError, ConnectionState, ConnectionStream, RequestHandler,
};

use std::sync::Arc;
use tokio::net::TcpListener;
use tokio::sync::broadcast;
use tracing::{info, warn};

use crate::config::ParsedConfig;
use crate::core::worker::Worker;
use crate::util::signal::ServerSignal;

/// Main server that manages workers and handles signals
pub struct Server {
    config: Arc<ParsedConfig>,
    shutdown_tx: broadcast::Sender<ServerSignal>,
    reload_tx: broadcast::Sender<ServerSignal>,
}

impl Server {
    pub fn new(config: ParsedConfig) -> Self {
        let (shutdown_tx, _) = broadcast::channel(16);
        let (reload_tx, _) = broadcast::channel(16);
        Self {
            config: Arc::new(config),
            shutdown_tx,
            reload_tx,
        }
    }

    /// Start the server with the given configuration
    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error>> {
        let config = &self.config;

        // Determine listen address
        let listen_addr = config
            .http
            .servers
            .first()
            .and_then(|s| s.listen.first())
            .map(|l| format!("{}:{}", l.addr, l.port))
            .unwrap_or_else(|| "0.0.0.0:80".to_string());

        // Create TCP listener
        let listener = TcpListener::bind(&listen_addr).await?;
        let listener = Arc::new(listener);
        info!("Listening on {}", listen_addr);

        // Determine number of workers
        let num_workers = if config.worker_processes == 0 {
            num_cpus::get()
        } else {
            config.worker_processes
        };

        info!("Starting {} worker(s)", num_workers);

        // Spawn workers
        let mut handles = Vec::new();
        for i in 0..num_workers {
            let listener = listener.clone();
            let config = self.config.clone();
            let shutdown_rx = self.shutdown_tx.subscribe();
            let reload_rx = self.reload_tx.subscribe();

            let handle = tokio::spawn(async move {
                let worker = Worker::new(i, config);
                worker.run(listener, shutdown_rx, reload_rx).await;
            });
            handles.push(handle);
        }

        // Set up signal handler
        let signal_handler = crate::util::signal::SignalHandler::new();
        let mut signal_rx = signal_handler.subscribe();

        // Spawn signal listener
        let shutdown_tx = self.shutdown_tx.clone();
        let reload_tx = self.reload_tx.clone();
        tokio::spawn(async move {
            signal_handler.run().await;
        });

        // Wait for signals
        loop {
            match signal_rx.recv().await {
                Ok(ServerSignal::Shutdown) => {
                    info!("Initiating graceful shutdown");
                    let _ = shutdown_tx.send(ServerSignal::Shutdown);
                    break;
                }
                Ok(ServerSignal::Reload) => {
                    info!("Reloading configuration");
                    let _ = reload_tx.send(ServerSignal::Reload);
                }
                Ok(ServerSignal::ReopenLogs) => {
                    info!("Reopening log files");
                }
                Err(e) => {
                    warn!("Signal receiver error: {}", e);
                    break;
                }
            }
        }

        // Wait for workers to finish
        for handle in handles {
            let _ = handle.await;
        }

        info!("Server shutdown complete");
        Ok(())
    }

    /// Graceful shutdown
    pub fn shutdown(&self) {
        let _ = self.shutdown_tx.send(ServerSignal::Shutdown);
    }

    /// Reload configuration
    pub fn reload(&self, _new_config: ParsedConfig) {
        let _ = self.reload_tx.send(ServerSignal::Reload);
    }
}