use bytes::{Bytes, BytesMut};
use std::cell::RefCell;
/// A reusable buffer pool for reducing allocations
pub struct BufferPool {
buffers: parking_lot::Mutex<Vec<BytesMut>>,
buf_size: usize,
max_pool_size: usize,
}
impl BufferPool {
pub fn new(buf_size: usize, max_pool_size: usize) -> Self {
Self {
buffers: parking_lot::Mutex::new(Vec::with_capacity(max_pool_size)),
buf_size,
max_pool_size,
}
}
pub fn get(&self) -> BytesMut {
let mut pool = self.buffers.lock();
pool.pop()
.unwrap_or_else(|| BytesMut::with_capacity(self.buf_size))
}
pub fn put(&self, buf: BytesMut) {
if buf.capacity() >= self.buf_size {
let mut pool = self.buffers.lock();
if pool.len() < self.max_pool_size {
pool.push(buf);
}
}
}
}
/// Zero-copy buffer for HTTP parsing
pub struct ParseBuffer {
data: Bytes,
pos: usize,
}
impl ParseBuffer {
pub fn new(data: Bytes) -> Self {
Self { data, pos: 0 }
}
pub fn remaining(&self) -> &[u8] {
&self.data[self.pos..]
}
pub fn advance(&mut self, n: usize) {
self.pos = (self.pos + n).min(self.data.len());
}
pub fn position(&self) -> usize {
self.pos
}
pub fn len(&self) -> usize {
self.data.len()
}
pub fn is_empty(&self) -> bool {
self.pos >= self.data.len()
}
/// Find a byte pattern starting from current position
pub fn find(&self, pattern: &[u8]) -> Option<usize> {
let remaining = self.remaining();
remaining
.windows(pattern.len())
.position(|w| w == pattern)
.map(|p| self.pos + p)
}
/// Read until a delimiter byte, returning the data before it
pub fn read_until(&mut self, delim: u8) -> Option<Bytes> {
let remaining = self.remaining();
if let Some(pos) = remaining.iter().position(|&b| b == delim) {
let start = self.pos;
self.pos += pos + 1; // skip delimiter
Some(self.data.slice(start..start + pos))
} else {
None
}
}
/// Read a line (until \r\n or \n)
pub fn read_line(&mut self) -> Option<Bytes> {
let remaining = &self.data[self.pos..];
// Look for \r\n
if let Some(pos) = remaining.windows(2).position(|w| w == b"\r\n") {
let start = self.pos;
self.pos += pos + 2;
return Some(self.data.slice(start..start + pos));
}
// Fallback: look for \n
if let Some(pos) = remaining.iter().position(|&b| b == b'\n') {
let start = self.pos;
let cr_adjust = if pos > 0 && remaining[pos - 1] == b'\r' {
1
} else {
0
};
self.pos += pos + 1;
return Some(self.data.slice(start..start + pos - cr_adjust));
}
None
}
}
// ---------------------------------------------------------------------------
// Thread-local buffer pool for Vec<u8>
// ---------------------------------------------------------------------------
//
// This provides a simple per-thread pool of `Vec<u8>` buffers that can be
// reused across HTTP request/response cycles. Each thread maintains its
// own pool (no cross-thread synchronization overhead), capped at 32
// buffers to bound memory usage.
/// Maximum number of buffers kept per thread.
const MAX_POOL_SIZE: usize = 32;
thread_local! {
static BUFFER_POOL: RefCell<Vec<Vec<u8>>> = RefCell::new(Vec::with_capacity(16));
}
/// Obtain a buffer from the thread-local pool, or allocate a new one.
///
/// If the pool contains a buffer whose capacity is at least `size_hint`,
/// it is reused (and cleared). Otherwise a fresh `Vec` is allocated.
pub fn get_buffer(size_hint: usize) -> Vec<u8> {
BUFFER_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
if let Some(buf) = pool.pop() {
if buf.capacity() >= size_hint {
// Reuse: clear contents but keep the allocation.
let mut buf = buf;
buf.clear();
return buf;
}
// Too small -- drop it and allocate a new one below.
}
Vec::with_capacity(size_hint)
})
}
/// Return a buffer to the thread-local pool for reuse.
///
/// Buffers are only pooled if the pool has room (max [`MAX_POOL_SIZE`]
/// entries per thread). Excess buffers are dropped normally.
pub fn return_buffer(buf: Vec<u8>) {
BUFFER_POOL.with(|pool| {
let mut pool = pool.borrow_mut();
if pool.len() < MAX_POOL_SIZE {
pool.push(buf);
}
// else: drop buf implicitly
});
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_buffer_read_line() {
let data = Bytes::from("GET / HTTP/1.1\r\nHost: example.com\r\n\r\n");
let mut buf = ParseBuffer::new(data);
let line = buf.read_line().unwrap();
assert_eq!(&line[..], b"GET / HTTP/1.1");
let line = buf.read_line().unwrap();
assert_eq!(&line[..], b"Host: example.com");
let line = buf.read_line().unwrap();
assert_eq!(&line[..], b"");
}
#[test]
fn test_buffer_pool() {
let pool = BufferPool::new(4096, 10);
let buf = pool.get();
assert!(buf.capacity() >= 4096);
pool.put(buf);
let buf2 = pool.get();
assert!(buf2.capacity() >= 4096);
}
#[test]
fn test_thread_local_buffer_pool_get_and_return() {
let buf = get_buffer(4096);
assert!(buf.capacity() >= 4096);
assert!(buf.is_empty());
return_buffer(buf);
// Getting another buffer should reuse the returned one.
let buf2 = get_buffer(4096);
assert!(buf2.capacity() >= 4096);
assert!(buf2.is_empty());
}
#[test]
fn test_thread_local_buffer_pool_size_hint_upsize() {
let buf = get_buffer(1024);
assert!(buf.capacity() >= 1024);
return_buffer(buf);
// Requesting a larger size should allocate a new buffer.
let buf2 = get_buffer(8192);
assert!(buf2.capacity() >= 8192);
}
#[test]
fn test_thread_local_buffer_pool_cap() {
// Fill the pool to MAX_POOL_SIZE, then return one more.
let mut bufs = Vec::new();
for _ in 0..MAX_POOL_SIZE {
bufs.push(get_buffer(64));
}
for buf in bufs {
return_buffer(buf);
}
// Pool is now full. Returning another should not panic.
return_buffer(get_buffer(64));
}
}