Ferrit Explore
中文·繁體·EN·日本語 Sign in Register
cielxl / veld / src / core / connection.rs
//! Connection state machine.
//!
//! Each accepted TCP (or TLS) connection is represented by a [`Connection`]
//! that drives the full request lifecycle: read, parse, dispatch, write,
//! and optional keep-alive looping.  The state machine is intentionally
//! linear -- there are no concurrent sub-operations within a single
//! connection, which keeps the hot path simple and predictable.
//!
//! # Design notes
//!
//! * **Transport abstraction** -- [`ConnectionStream`] wraps either a plain
//!   [`TcpStream`] or a [`TlsStream<TcpStream>`] behind a unified
//!   `AsyncRead + AsyncWrite` interface so that the rest of the pipeline
//!   is transport-agnostic.
//! * **Timeout integration** -- every I/O operation is guarded by the
//!   [`ConnectionTimer`], which tracks independent read, write, and idle
//!   deadlines.  The timer is checked before each I/O call and after each
//!   state transition.
//! * **Keep-alive** -- HTTP/1.1 connections are keep-alive by default.
//!   After a complete request/response cycle the parser and timer are
//!   reset and the state machine loops back to `Reading`.
//! * **Graceful close** -- [`Connection::close`] performs a clean TLS
//!   shutdown (when applicable) followed by a TCP half-close so that
//!   in-flight data is flushed to the peer.

use std::fmt;
use std::io;
use std::net::SocketAddr;
use std::path::Path;
use std::time::Duration;

use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::net::TcpStream;
use tokio_rustls::server::TlsStream;
use tracing::{debug, error, info, warn};

use crate::http::parser::{HttpParser, ParseResult};
use crate::http::request::Request;
use crate::http::response::{Body, Response};
use crate::http::status::HttpStatusCode;

use super::timer::ConnectionTimer;

// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------

/// Default read buffer size (8 KiB).  Matches the initial capacity of
/// [`HttpParser`]'s internal buffer, keeping the first read allocation-free.
const READ_BUF_SIZE: usize = 8192;

/// File streaming buffer size (64 KiB).  Used when writing [`Body::File`]
/// responses to the wire.
const FILE_BUF_SIZE: usize = 65536;

/// Default timeout durations when none are supplied by the caller.
const DEFAULT_READ_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_WRITE_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_IDLE_TIMEOUT: Duration = Duration::from_secs(60);

/// Maximum number of requests served on a single keep-alive connection.
/// This prevents a single client from holding a connection indefinitely.
const MAX_KEEP_ALIVE_REQUESTS: u64 = 1000;

// ---------------------------------------------------------------------------
// ConnectionState
// ---------------------------------------------------------------------------

/// Phases of a single connection's lifetime.
///
/// The state machine always progresses forward except for the keep-alive
/// loop that returns from `Writing` back to `Reading`.
///
/// ```text
///   Reading --> Processing --> Writing --+--> Closing --> Closed
///                  ^                    |
///                  |   (keep-alive)     |
///                  +--------------------+
/// ```
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum ConnectionState {
    /// Waiting for data from the client.
    Reading,
    /// A complete request has been received; the handler is executing.
    Processing,
    /// The response is being written to the client.
    Writing,
    /// A graceful shutdown is in progress (TLS close_notify, TCP half-close).
    Closing,
    /// The connection has been fully torn down.
    Closed,
}

impl fmt::Display for ConnectionState {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ConnectionState::Reading => f.write_str("Reading"),
            ConnectionState::Processing => f.write_str("Processing"),
            ConnectionState::Writing => f.write_str("Writing"),
            ConnectionState::Closing => f.write_str("Closing"),
            ConnectionState::Closed => f.write_str("Closed"),
        }
    }
}

// ---------------------------------------------------------------------------
// ConnectionStream -- transport abstraction
// ---------------------------------------------------------------------------

/// Unified wrapper over plain TCP and TLS streams.
///
/// Both variants implement [`AsyncRead`] + [`AsyncWrite`] so the connection
/// state machine can use them interchangeably.
// Exactly one stream exists per connection, so the size difference between the
// plain and TLS variants is irrelevant; boxing the TLS state would only add an
// allocation and indirection on the encrypted hot path.
#[allow(clippy::large_enum_variant)]
pub enum ConnectionStream {
    /// Unencrypted TCP connection.
    Tcp(TcpStream),
    /// TLS-encrypted connection (server-side).
    Tls(TlsStream<TcpStream>),
}

impl ConnectionStream {
    /// Returns the peer address of the underlying TCP stream.
    pub fn peer_addr(&self) -> io::Result<SocketAddr> {
        match self {
            ConnectionStream::Tcp(s) => s.peer_addr(),
            ConnectionStream::Tls(s) => s.get_ref().0.peer_addr(),
        }
    }

    /// Returns `true` if this stream is TLS-encrypted.
    pub fn is_tls(&self) -> bool {
        matches!(self, ConnectionStream::Tls(_))
    }

    /// Perform a graceful shutdown.
    ///
    /// For TLS streams this sends a `close_notify` alert before shutting
    /// down the underlying TCP socket.  For plain TCP this delegates to
    /// the standard `TcpStream::shutdown`.
    async fn shutdown(&mut self) -> io::Result<()> {
        match self {
            ConnectionStream::Tcp(s) => s.shutdown().await,
            ConnectionStream::Tls(s) => s.shutdown().await,
        }
    }
}

impl AsyncRead for ConnectionStream {
    fn poll_read(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &mut tokio::io::ReadBuf<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        match self.get_mut() {
            ConnectionStream::Tcp(s) => std::pin::Pin::new(s).poll_read(cx, buf),
            ConnectionStream::Tls(s) => std::pin::Pin::new(s).poll_read(cx, buf),
        }
    }
}

impl AsyncWrite for ConnectionStream {
    fn poll_write(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
        buf: &[u8],
    ) -> std::task::Poll<io::Result<usize>> {
        match self.get_mut() {
            ConnectionStream::Tcp(s) => std::pin::Pin::new(s).poll_write(cx, buf),
            ConnectionStream::Tls(s) => std::pin::Pin::new(s).poll_write(cx, buf),
        }
    }

    fn poll_flush(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        match self.get_mut() {
            ConnectionStream::Tcp(s) => std::pin::Pin::new(s).poll_flush(cx),
            ConnectionStream::Tls(s) => std::pin::Pin::new(s).poll_flush(cx),
        }
    }

    fn poll_shutdown(
        self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<io::Result<()>> {
        match self.get_mut() {
            ConnectionStream::Tcp(s) => std::pin::Pin::new(s).poll_shutdown(cx),
            ConnectionStream::Tls(s) => std::pin::Pin::new(s).poll_shutdown(cx),
        }
    }
}

// ---------------------------------------------------------------------------
// ConnectionError
// ---------------------------------------------------------------------------

/// Errors that can arise during connection processing.
#[derive(Debug)]
pub enum ConnectionError {
    /// An I/O error on the underlying stream.
    Io(io::Error),
    /// The HTTP parser encountered a protocol-level error.
    Parse(String),
    /// An armed deadline on the [`ConnectionTimer`] has expired.
    Timeout(&'static str),
    /// The peer closed the connection before a complete request arrived.
    ClientGone,
    /// The connection was explicitly closed by the server (e.g. graceful
    /// shutdown signal).
    ServerShutdown,
}

impl fmt::Display for ConnectionError {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            ConnectionError::Io(e) => write!(f, "I/O error: {e}"),
            ConnectionError::Parse(msg) => write!(f, "parse error: {msg}"),
            ConnectionError::Timeout(kind) => write!(f, "{kind} timeout"),
            ConnectionError::ClientGone => write!(f, "client disconnected"),
            ConnectionError::ServerShutdown => write!(f, "server shutting down"),
        }
    }
}

impl std::error::Error for ConnectionError {
    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
        match self {
            ConnectionError::Io(e) => Some(e),
            _ => None,
        }
    }
}

impl From<io::Error> for ConnectionError {
    fn from(e: io::Error) -> Self {
        ConnectionError::Io(e)
    }
}

// ---------------------------------------------------------------------------
// RequestHandler trait
// ---------------------------------------------------------------------------

/// Trait implemented by request handlers (static files, proxy, etc.).
///
/// The handler receives a reference to the parsed [`Request`] and returns
/// a [`Response`].  Implementations should be stateless or carry only
/// configuration -- per-connection mutable state lives in [`Connection`].
pub trait RequestHandler: Send + Sync + fmt::Debug {
    fn handle(&self, request: &Request) -> Response;
}

// ---------------------------------------------------------------------------
// Connection
// ---------------------------------------------------------------------------

/// Per-connection state machine.
///
/// Owns the transport stream, the HTTP parser, the per-connection timer,
/// and all mutable state needed to drive the read-parse-dispatch-write
/// lifecycle.  A `Connection` is created for each accepted TCP (or TLS)
/// connection and consumed when the connection is closed.
pub struct Connection {
    /// Monotonically increasing identifier, unique within a worker.
    pub id: u64,

    /// Underlying transport (TCP or TLS).
    stream: ConnectionStream,

    /// Current phase of the connection lifecycle.
    state: ConnectionState,

    /// Per-connection timeout tracker.
    timer: ConnectionTimer,

    /// Remote peer address (cached from the initial accept).
    remote_addr: SocketAddr,

    /// Streaming HTTP/1.1 parser.
    parser: HttpParser,

    /// Whether this connection should be kept alive after the current
    /// request/response cycle.
    keep_alive: bool,

    /// Counter of requests successfully served on this connection.
    requests_handled: u64,

    /// Stash for the response between the Processing and Writing states.
    pending_response: Option<Response>,
}

impl Connection {
    // ------------------------------------------------------------------
    // Construction
    // ------------------------------------------------------------------

    /// Create a new connection for a plain TCP stream.
    ///
    /// Starts in the [`ConnectionState::Reading`] state with default
    /// timeout durations.
    pub fn new(stream: TcpStream, id: u64, remote_addr: SocketAddr) -> Self {
        Self::with_stream(
            ConnectionStream::Tcp(stream),
            id,
            remote_addr,
            DEFAULT_READ_TIMEOUT,
            DEFAULT_WRITE_TIMEOUT,
            DEFAULT_IDLE_TIMEOUT,
        )
    }

    /// Create a new connection for a TLS stream.
    pub fn new_tls(stream: TlsStream<TcpStream>, id: u64, remote_addr: SocketAddr) -> Self {
        Self::with_stream(
            ConnectionStream::Tls(stream),
            id,
            remote_addr,
            DEFAULT_READ_TIMEOUT,
            DEFAULT_WRITE_TIMEOUT,
            DEFAULT_IDLE_TIMEOUT,
        )
    }

    /// Create a new connection over a pre-wrapped [`ConnectionStream`] with
    /// explicit timeout durations.
    pub fn with_stream(
        stream: ConnectionStream,
        id: u64,
        remote_addr: SocketAddr,
        read_timeout: Duration,
        write_timeout: Duration,
        idle_timeout: Duration,
    ) -> Self {
        let mut timer = ConnectionTimer::new(read_timeout, write_timeout, idle_timeout);
        timer.reset_read();
        timer.reset_idle();

        Self {
            id,
            stream,
            state: ConnectionState::Reading,
            timer,
            remote_addr,
            parser: HttpParser::new(),
            keep_alive: true,
            requests_handled: 0,
            pending_response: None,
        }
    }

    // ------------------------------------------------------------------
    // Accessors
    // ------------------------------------------------------------------

    /// The unique connection identifier.
    pub fn id(&self) -> u64 {
        self.id
    }

    /// Current state of the connection.
    pub fn state(&self) -> ConnectionState {
        self.state
    }

    /// Remote peer address.
    pub fn remote_addr(&self) -> SocketAddr {
        self.remote_addr
    }

    /// Number of requests successfully served on this connection.
    pub fn requests_handled(&self) -> u64 {
        self.requests_handled
    }

    /// Whether the underlying transport is TLS-encrypted.
    pub fn is_tls(&self) -> bool {
        self.stream.is_tls()
    }

    /// Returns a reference to the per-connection timer.
    pub fn timer(&self) -> &ConnectionTimer {
        &self.timer
    }

    // ------------------------------------------------------------------
    // Main processing loop
    // ------------------------------------------------------------------

    /// Drive the connection through its full lifecycle.
    ///
    /// This is the main entry point called by the worker task after
    /// accepting a connection.  It loops through the
    /// read -> parse -> dispatch -> write cycle until the client closes
    /// the connection, a timeout fires, an error occurs, or keep-alive
    /// is disabled.
    pub async fn process(&mut self, handler: &dyn RequestHandler) -> Result<(), ConnectionError> {
        loop {
            // -- Check timeouts before each state transition ----------
            if let Some(kind) = self.timer.check_expired() {
                warn!(
                    conn = self.id,
                    state = %self.state,
                    remote = %self.remote_addr,
                    timeout = ?kind,
                    "connection timed out"
                );
                self.close().await;
                return Err(ConnectionError::Timeout(match kind {
                    super::timer::TimeoutKind::Read => "read",
                    super::timer::TimeoutKind::Write => "write",
                    super::timer::TimeoutKind::Idle => "idle",
                }));
            }

            match self.state {
                ConnectionState::Reading => {
                    self.do_read().await?;
                }
                ConnectionState::Processing => {
                    self.do_process(handler);
                }
                ConnectionState::Writing => {
                    self.do_write().await?;
                }
                ConnectionState::Closing => {
                    self.close().await;
                    return Ok(());
                }
                ConnectionState::Closed => {
                    return Ok(());
                }
            }
        }
    }

    // ------------------------------------------------------------------
    // State: Reading
    // ------------------------------------------------------------------

    /// Read data from the stream and feed it to the HTTP parser.
    ///
    /// Transitions to [`ConnectionState::Processing`] once the parser
    /// signals a complete request.  On EOF (zero bytes read) the client
    /// has disconnected -- the connection transitions to `Closing`.
    async fn do_read(&mut self) -> Result<(), ConnectionError> {
        self.timer.reset_read();

        let mut buf = vec![0u8; READ_BUF_SIZE];
        let n = match self.stream.read(&mut buf).await {
            Ok(0) => {
                debug!(
                    conn = self.id,
                    remote = %self.remote_addr,
                    "client disconnected (EOF)"
                );
                self.state = ConnectionState::Closing;
                return Ok(());
            }
            Ok(n) => n,
            Err(e) => {
                if e.kind() == io::ErrorKind::ConnectionReset {
                    debug!(
                        conn = self.id,
                        remote = %self.remote_addr,
                        "connection reset by peer"
                    );
                    self.state = ConnectionState::Closing;
                    return Ok(());
                }
                return Err(ConnectionError::Io(e));
            }
        };

        self.timer.clear_read();

        match self.parser.feed(&buf[..n]) {
            ParseResult::NeedMore => {
                debug!(
                    conn = self.id,
                    bytes = n,
                    "partial parse, waiting for more data"
                );
                Ok(())
            }
            ParseResult::Complete(_consumed) => {
                debug!(
                    conn = self.id,
                    remote = %self.remote_addr,
                    "request parse complete"
                );
                self.state = ConnectionState::Processing;
                Ok(())
            }
            ParseResult::Error(e) => {
                warn!(
                    conn = self.id,
                    remote = %self.remote_addr,
                    error = %e,
                    "HTTP parse error"
                );
                self.send_error_response(HttpStatusCode::BAD_REQUEST).await;
                self.state = ConnectionState::Closing;
                Err(ConnectionError::Parse(e.message))
            }
        }
    }

    // ------------------------------------------------------------------
    // State: Processing
    // ------------------------------------------------------------------

    /// Extract the parsed request, dispatch it to the handler, stash the
    /// response, and transition to the Writing state.
    fn do_process(&mut self, handler: &dyn RequestHandler) {
        let mut request = match self.parser.take_request() {
            Some(r) => r,
            None => {
                error!(
                    conn = self.id,
                    "parser reported complete but no request available"
                );
                self.state = ConnectionState::Closing;
                return;
            }
        };

        // Annotate the request with connection metadata.
        request.remote_addr = Some(self.remote_addr);
        request.connection_id = self.id;

        // Determine keep-alive policy from the request.
        self.keep_alive = request.is_keep_alive();

        debug!(
            conn = self.id,
            method = %request.method,
            uri = %request.uri,
            keep_alive = self.keep_alive,
            "dispatching request to handler"
        );

        let mut response = handler.handle(&request);

        // Respect the response's close_connection flag.
        if response.close_connection {
            self.keep_alive = false;
        }

        // Ensure the response carries a Content-Length header for bytes
        // bodies.  File bodies are sent with the length already set by
        // the handler.
        if let Body::Bytes(ref body) = response.body {
            if !response.headers.contains("content-length") {
                response
                    .headers
                    .insert("Content-Length", body.len().to_string());
            }
        }

        self.pending_response = Some(response);
        self.requests_handled += 1;
        self.state = ConnectionState::Writing;
    }

    // ------------------------------------------------------------------
    // State: Writing
    // ------------------------------------------------------------------

    /// Serialise and write the response to the client.
    ///
    /// After a successful write the connection either resets for the
    /// next keep-alive request or transitions to `Closing`.
    async fn do_write(&mut self) -> Result<(), ConnectionError> {
        let response = match self.pending_response.take() {
            Some(r) => r,
            None => {
                error!(conn = self.id, "no pending response in Writing state");
                self.state = ConnectionState::Closing;
                return Ok(());
            }
        };

        self.timer.reset_write();

        // Serialise status line + headers (+ bytes body if present).
        let response_bytes = response.to_bytes();

        if let Err(e) = self.stream.write_all(&response_bytes).await {
            warn!(
                conn = self.id,
                remote = %self.remote_addr,
                error = %e,
                "failed to write response"
            );
            self.state = ConnectionState::Closing;
            return Err(ConnectionError::Io(e));
        }

        // For file bodies, stream the file contents after the headers.
        if let Body::File(ref path, _size) = response.body {
            if let Err(e) = self.send_file(path).await {
                warn!(
                    conn = self.id,
                    remote = %self.remote_addr,
                    error = %e,
                    "failed to stream file body"
                );
                self.state = ConnectionState::Closing;
                return Err(ConnectionError::Io(e));
            }
        }

        // Flush to ensure the peer receives the data promptly.
        if let Err(e) = self.stream.flush().await {
            warn!(conn = self.id, error = %e, "failed to flush stream");
            self.state = ConnectionState::Closing;
            return Err(ConnectionError::Io(e));
        }

        self.timer.clear_write();

        debug!(
            conn = self.id,
            requests = self.requests_handled,
            keep_alive = self.keep_alive,
            "response written"
        );

        // Decide what to do next.
        if self.should_keep_alive() {
            self.handle_keep_alive();
        } else {
            debug!(conn = self.id, "keep-alive disabled, closing connection");
            self.state = ConnectionState::Closing;
        }

        Ok(())
    }

    // ------------------------------------------------------------------
    // Keep-alive
    // ------------------------------------------------------------------

    /// Reset the connection for the next request (keep-alive).
    ///
    /// Clears the parser, resets the idle timer, and transitions back
    /// to [`ConnectionState::Reading`].
    pub fn handle_keep_alive(&mut self) {
        debug!(
            conn = self.id,
            requests = self.requests_handled,
            "resetting for next keep-alive request"
        );

        self.parser.reset();
        self.timer.reset_read();
        self.timer.reset_idle();
        self.keep_alive = true;
        self.pending_response = None;
        self.state = ConnectionState::Reading;
    }

    /// Returns `true` if the connection should be kept alive for another
    /// request cycle.
    fn should_keep_alive(&self) -> bool {
        if !self.keep_alive {
            return false;
        }

        // Enforce the maximum request limit to prevent resource exhaustion.
        if self.requests_handled >= MAX_KEEP_ALIVE_REQUESTS {
            info!(
                conn = self.id,
                requests = self.requests_handled,
                limit = MAX_KEEP_ALIVE_REQUESTS,
                "keep-alive request limit reached"
            );
            return false;
        }

        true
    }

    // ------------------------------------------------------------------
    // Graceful close
    // ------------------------------------------------------------------

    /// Perform a graceful connection shutdown.
    ///
    /// Sends a TLS `close_notify` (for TLS streams), half-closes the
    /// TCP socket, and transitions to [`ConnectionState::Closed`].
    pub async fn close(&mut self) {
        if self.state == ConnectionState::Closed {
            return;
        }

        self.state = ConnectionState::Closing;

        debug!(
            conn = self.id,
            remote = %self.remote_addr,
            requests = self.requests_handled,
            "closing connection"
        );

        if let Err(e) = self.stream.shutdown().await {
            debug!(
                conn = self.id,
                error = %e,
                "stream shutdown returned error (non-fatal)"
            );
        }

        self.state = ConnectionState::Closed;
    }

    // ------------------------------------------------------------------
    // Helpers
    // ------------------------------------------------------------------

    /// Stream a file body to the client in chunks.
    async fn send_file(&mut self, path: &Path) -> Result<(), io::Error> {
        let mut file = tokio::fs::File::open(path).await?;
        let mut buf = vec![0u8; FILE_BUF_SIZE];

        loop {
            let n = file.read(&mut buf).await?;
            if n == 0 {
                break;
            }
            self.stream.write_all(&buf[..n]).await?;
        }

        Ok(())
    }

    /// Build and send a minimal error response, then transition to
    /// `Closing`.  Used for protocol errors where we want to inform the
    /// client before disconnecting.  This is best-effort -- write errors
    /// are silently ignored.
    async fn send_error_response(&mut self, status: HttpStatusCode) {
        let response = Response::builder()
            .status(status)
            .header("Content-Type", "text/html; charset=utf-8")
            .header("Connection", "close")
            .body_str(&format!(
                "<html><body><h1>{} {}</h1></body></html>",
                status.as_u16(),
                status.reason_phrase(),
            ));

        let bytes = response.to_bytes();
        let _ = self.stream.write_all(&bytes).await;
        let _ = self.stream.flush().await;
    }
}

impl fmt::Debug for Connection {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Connection")
            .field("id", &self.id)
            .field("state", &self.state)
            .field("remote_addr", &self.remote_addr)
            .field("keep_alive", &self.keep_alive)
            .field("requests_handled", &self.requests_handled)
            .field("is_tls", &self.stream.is_tls())
            .finish_non_exhaustive()
    }
}

// ===========================================================================
// Tests
// ===========================================================================

#[cfg(test)]
mod tests {
    use super::*;
    use std::error::Error;

    // -- ConnectionState --------------------------------------------------

    #[test]
    fn connection_state_display() {
        assert_eq!(ConnectionState::Reading.to_string(), "Reading");
        assert_eq!(ConnectionState::Processing.to_string(), "Processing");
        assert_eq!(ConnectionState::Writing.to_string(), "Writing");
        assert_eq!(ConnectionState::Closing.to_string(), "Closing");
        assert_eq!(ConnectionState::Closed.to_string(), "Closed");
    }

    #[test]
    fn connection_state_is_eq() {
        assert_eq!(ConnectionState::Reading, ConnectionState::Reading);
        assert_ne!(ConnectionState::Reading, ConnectionState::Closed);
    }

    // -- ConnectionError --------------------------------------------------

    #[test]
    fn connection_error_display_io() {
        let err = ConnectionError::Io(io::Error::new(io::ErrorKind::BrokenPipe, "test"));
        assert!(err.to_string().contains("I/O error"));
    }

    #[test]
    fn connection_error_display_parse() {
        let err = ConnectionError::Parse("bad request line".into());
        assert!(err.to_string().contains("parse error"));
    }

    #[test]
    fn connection_error_display_timeout() {
        let err = ConnectionError::Timeout("read");
        assert!(err.to_string().contains("read timeout"));
    }

    #[test]
    fn connection_error_display_client_gone() {
        let err = ConnectionError::ClientGone;
        assert!(err.to_string().contains("disconnected"));
    }

    #[test]
    fn connection_error_display_server_shutdown() {
        let err = ConnectionError::ServerShutdown;
        assert!(err.to_string().contains("server shutting down"));
    }

    #[test]
    fn connection_error_from_io() {
        let io_err = io::Error::other("test");
        let conn_err: ConnectionError = io_err.into();
        assert!(matches!(conn_err, ConnectionError::Io(_)));
    }

    #[test]
    fn connection_error_source_io() {
        let err = ConnectionError::Io(io::Error::other("test"));
        assert!(err.source().is_some());
    }

    #[test]
    fn connection_error_source_parse() {
        let err = ConnectionError::Parse("test".into());
        assert!(err.source().is_none());
    }

    // -- Constants --------------------------------------------------------

    #[test]
    fn default_timeouts() {
        assert_eq!(DEFAULT_READ_TIMEOUT, Duration::from_secs(30));
        assert_eq!(DEFAULT_WRITE_TIMEOUT, Duration::from_secs(30));
        assert_eq!(DEFAULT_IDLE_TIMEOUT, Duration::from_secs(60));
    }

    #[test]
    fn max_keep_alive_requests() {
        assert_eq!(MAX_KEEP_ALIVE_REQUESTS, 1000);
    }

    #[test]
    fn read_buf_size() {
        assert_eq!(READ_BUF_SIZE, 8192);
    }

    #[test]
    fn file_buf_size() {
        assert_eq!(FILE_BUF_SIZE, 65536);
    }

    // -- ConnectionStream -------------------------------------------------

    #[test]
    fn connection_stream_is_tls_false_for_tcp() {
        // We cannot construct a real TcpStream outside tokio, but we can
        // verify the enum discriminant logic via is_tls().
        // This is tested implicitly through integration tests.
    }
}