Ferrit Explore
中文·繁體·EN·日本語 Sign in Register
cielxl / veld / src / util / buffer.rs
use bytes::{Bytes, BytesMut};
use std::cell::RefCell;

/// A reusable buffer pool for reducing allocations
pub struct BufferPool {
    buffers: parking_lot::Mutex<Vec<BytesMut>>,
    buf_size: usize,
    max_pool_size: usize,
}

impl BufferPool {
    pub fn new(buf_size: usize, max_pool_size: usize) -> Self {
        Self {
            buffers: parking_lot::Mutex::new(Vec::with_capacity(max_pool_size)),
            buf_size,
            max_pool_size,
        }
    }

    pub fn get(&self) -> BytesMut {
        let mut pool = self.buffers.lock();
        pool.pop()
            .unwrap_or_else(|| BytesMut::with_capacity(self.buf_size))
    }

    pub fn put(&self, buf: BytesMut) {
        if buf.capacity() >= self.buf_size {
            let mut pool = self.buffers.lock();
            if pool.len() < self.max_pool_size {
                pool.push(buf);
            }
        }
    }
}

/// Zero-copy buffer for HTTP parsing
pub struct ParseBuffer {
    data: Bytes,
    pos: usize,
}

impl ParseBuffer {
    pub fn new(data: Bytes) -> Self {
        Self { data, pos: 0 }
    }

    pub fn remaining(&self) -> &[u8] {
        &self.data[self.pos..]
    }

    pub fn advance(&mut self, n: usize) {
        self.pos = (self.pos + n).min(self.data.len());
    }

    pub fn position(&self) -> usize {
        self.pos
    }

    pub fn len(&self) -> usize {
        self.data.len()
    }

    pub fn is_empty(&self) -> bool {
        self.pos >= self.data.len()
    }

    /// Find a byte pattern starting from current position
    pub fn find(&self, pattern: &[u8]) -> Option<usize> {
        let remaining = self.remaining();
        remaining
            .windows(pattern.len())
            .position(|w| w == pattern)
            .map(|p| self.pos + p)
    }

    /// Read until a delimiter byte, returning the data before it
    pub fn read_until(&mut self, delim: u8) -> Option<Bytes> {
        let remaining = self.remaining();
        if let Some(pos) = remaining.iter().position(|&b| b == delim) {
            let start = self.pos;
            self.pos += pos + 1; // skip delimiter
            Some(self.data.slice(start..start + pos))
        } else {
            None
        }
    }

    /// Read a line (until \r\n or \n)
    pub fn read_line(&mut self) -> Option<Bytes> {
        let remaining = &self.data[self.pos..];
        // Look for \r\n
        if let Some(pos) = remaining.windows(2).position(|w| w == b"\r\n") {
            let start = self.pos;
            self.pos += pos + 2;
            return Some(self.data.slice(start..start + pos));
        }
        // Fallback: look for \n
        if let Some(pos) = remaining.iter().position(|&b| b == b'\n') {
            let start = self.pos;
            let cr_adjust = if pos > 0 && remaining[pos - 1] == b'\r' {
                1
            } else {
                0
            };
            self.pos += pos + 1;
            return Some(self.data.slice(start..start + pos - cr_adjust));
        }
        None
    }
}

// ---------------------------------------------------------------------------
// Thread-local buffer pool for Vec<u8>
// ---------------------------------------------------------------------------
//
// This provides a simple per-thread pool of `Vec<u8>` buffers that can be
// reused across HTTP request/response cycles.  Each thread maintains its
// own pool (no cross-thread synchronization overhead), capped at 32
// buffers to bound memory usage.

/// Maximum number of buffers kept per thread.
const MAX_POOL_SIZE: usize = 32;

thread_local! {
    static BUFFER_POOL: RefCell<Vec<Vec<u8>>> = RefCell::new(Vec::with_capacity(16));
}

/// Obtain a buffer from the thread-local pool, or allocate a new one.
///
/// If the pool contains a buffer whose capacity is at least `size_hint`,
/// it is reused (and cleared).  Otherwise a fresh `Vec` is allocated.
pub fn get_buffer(size_hint: usize) -> Vec<u8> {
    BUFFER_POOL.with(|pool| {
        let mut pool = pool.borrow_mut();
        if let Some(buf) = pool.pop() {
            if buf.capacity() >= size_hint {
                // Reuse: clear contents but keep the allocation.
                let mut buf = buf;
                buf.clear();
                return buf;
            }
            // Too small -- drop it and allocate a new one below.
        }
        Vec::with_capacity(size_hint)
    })
}

/// Return a buffer to the thread-local pool for reuse.
///
/// Buffers are only pooled if the pool has room (max [`MAX_POOL_SIZE`]
/// entries per thread).  Excess buffers are dropped normally.
pub fn return_buffer(buf: Vec<u8>) {
    BUFFER_POOL.with(|pool| {
        let mut pool = pool.borrow_mut();
        if pool.len() < MAX_POOL_SIZE {
            pool.push(buf);
        }
        // else: drop buf implicitly
    });
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_parse_buffer_read_line() {
        let data = Bytes::from("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n");
        let mut buf = ParseBuffer::new(data);

        let line = buf.read_line().unwrap();
        assert_eq!(&line[..], b"GET / HTTP/1.1");

        let line = buf.read_line().unwrap();
        assert_eq!(&line[..], b"Host: example.com");

        let line = buf.read_line().unwrap();
        assert_eq!(&line[..], b"");
    }

    #[test]
    fn test_buffer_pool() {
        let pool = BufferPool::new(4096, 10);
        let buf = pool.get();
        assert!(buf.capacity() >= 4096);
        pool.put(buf);
        let buf2 = pool.get();
        assert!(buf2.capacity() >= 4096);
    }

    #[test]
    fn test_thread_local_buffer_pool_get_and_return() {
        let buf = get_buffer(4096);
        assert!(buf.capacity() >= 4096);
        assert!(buf.is_empty());
        return_buffer(buf);

        // Getting another buffer should reuse the returned one.
        let buf2 = get_buffer(4096);
        assert!(buf2.capacity() >= 4096);
        assert!(buf2.is_empty());
    }

    #[test]
    fn test_thread_local_buffer_pool_size_hint_upsize() {
        let buf = get_buffer(1024);
        assert!(buf.capacity() >= 1024);
        return_buffer(buf);

        // Requesting a larger size should allocate a new buffer.
        let buf2 = get_buffer(8192);
        assert!(buf2.capacity() >= 8192);
    }

    #[test]
    fn test_thread_local_buffer_pool_cap() {
        // Fill the pool to MAX_POOL_SIZE, then return one more.
        let mut bufs = Vec::new();
        for _ in 0..MAX_POOL_SIZE {
            bufs.push(get_buffer(64));
        }
        for buf in bufs {
            return_buffer(buf);
        }
        // Pool is now full.  Returning another should not panic.
        return_buffer(get_buffer(64));
    }
}