show(), hide(), visible. font-light, normal, medium, semibold, bold, extrabold, black. protocol connection pooling. fetch() with GURT. DNS from HTTP to GURT.
This commit is contained in:
@@ -9,6 +9,8 @@ use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::time::{timeout, Duration};
|
||||
use tokio_rustls::{TlsConnector, rustls::{ClientConfig as TlsClientConfig, RootCertStore, pki_types::ServerName}};
|
||||
use std::sync::Arc;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Mutex;
|
||||
use url::Url;
|
||||
use tracing::debug;
|
||||
|
||||
@@ -19,6 +21,19 @@ pub struct GurtClientConfig {
|
||||
pub handshake_timeout: Duration,
|
||||
pub user_agent: String,
|
||||
pub max_redirects: usize,
|
||||
pub enable_connection_pooling: bool,
|
||||
pub max_connections_per_host: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
||||
struct ConnectionKey {
|
||||
host: String,
|
||||
port: u16,
|
||||
}
|
||||
|
||||
struct PooledTlsConnection {
|
||||
connection: tokio_rustls::client::TlsStream<TcpStream>,
|
||||
last_used: std::time::Instant,
|
||||
}
|
||||
|
||||
impl Default for GurtClientConfig {
|
||||
@@ -29,6 +44,8 @@ impl Default for GurtClientConfig {
|
||||
handshake_timeout: Duration::from_secs(DEFAULT_HANDSHAKE_TIMEOUT),
|
||||
user_agent: format!("GURT-Client/{}", crate::GURT_VERSION),
|
||||
max_redirects: 5,
|
||||
enable_connection_pooling: true,
|
||||
max_connections_per_host: 4,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -72,18 +89,69 @@ impl PooledConnection {
|
||||
|
||||
pub struct GurtClient {
|
||||
config: GurtClientConfig,
|
||||
connection_pool: Arc<Mutex<HashMap<ConnectionKey, Vec<PooledTlsConnection>>>>,
|
||||
}
|
||||
|
||||
impl GurtClient {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
config: GurtClientConfig::default(),
|
||||
connection_pool: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_config(config: GurtClientConfig) -> Self {
|
||||
Self {
|
||||
config,
|
||||
connection_pool: Arc::new(Mutex::new(HashMap::new())),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_pooled_connection(&self, host: &str, port: u16) -> Result<tokio_rustls::client::TlsStream<TcpStream>> {
|
||||
if !self.config.enable_connection_pooling {
|
||||
return self.perform_handshake(host, port).await;
|
||||
}
|
||||
|
||||
let key = ConnectionKey {
|
||||
host: host.to_string(),
|
||||
port,
|
||||
};
|
||||
|
||||
if let Ok(mut pool) = self.connection_pool.lock() {
|
||||
if let Some(connections) = pool.get_mut(&key) {
|
||||
connections.retain(|conn| conn.last_used.elapsed().as_secs() < 30);
|
||||
|
||||
if let Some(pooled_conn) = connections.pop() {
|
||||
debug!("Reusing pooled connection for {}:{}", host, port);
|
||||
return Ok(pooled_conn.connection);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
debug!("Creating new connection for {}:{}", host, port);
|
||||
self.perform_handshake(host, port).await
|
||||
}
|
||||
|
||||
fn return_connection_to_pool(&self, host: &str, port: u16, connection: tokio_rustls::client::TlsStream<TcpStream>) {
|
||||
if !self.config.enable_connection_pooling {
|
||||
return;
|
||||
}
|
||||
|
||||
let key = ConnectionKey {
|
||||
host: host.to_string(),
|
||||
port,
|
||||
};
|
||||
|
||||
if let Ok(mut pool) = self.connection_pool.lock() {
|
||||
let connections = pool.entry(key).or_insert_with(Vec::new);
|
||||
|
||||
if connections.len() < self.config.max_connections_per_host {
|
||||
connections.push(PooledTlsConnection {
|
||||
connection,
|
||||
last_used: std::time::Instant::now(),
|
||||
});
|
||||
debug!("Returned connection to pool");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -241,19 +309,66 @@ impl GurtClient {
|
||||
async fn send_request_internal(&self, host: &str, port: u16, request: GurtRequest) -> Result<GurtResponse> {
|
||||
debug!("Sending {} {} to {}:{}", request.method, request.path, host, port);
|
||||
|
||||
let tls_stream = self.perform_handshake(host, port).await?;
|
||||
let mut conn = PooledConnection::with_tls(tls_stream);
|
||||
let mut tls_stream = self.get_pooled_connection(host, port).await?;
|
||||
|
||||
let request_data = request.to_string();
|
||||
conn.connection.write_all(request_data.as_bytes()).await?;
|
||||
tls_stream.write_all(request_data.as_bytes()).await
|
||||
.map_err(|e| GurtError::connection(format!("Failed to write request: {}", e)))?;
|
||||
|
||||
let response_bytes = timeout(
|
||||
self.config.request_timeout,
|
||||
self.read_response_data(&mut conn)
|
||||
).await
|
||||
.map_err(|_| GurtError::timeout("Request timeout"))??;
|
||||
let mut buffer = Vec::new();
|
||||
let mut temp_buffer = [0u8; 8192];
|
||||
|
||||
let response = GurtResponse::parse_bytes(&response_bytes)?;
|
||||
let start_time = std::time::Instant::now();
|
||||
let mut headers_parsed = false;
|
||||
let mut expected_body_length: Option<usize> = None;
|
||||
let mut headers_end_pos: Option<usize> = None;
|
||||
|
||||
loop {
|
||||
if start_time.elapsed() > self.config.request_timeout {
|
||||
return Err(GurtError::timeout("Request timeout"));
|
||||
}
|
||||
|
||||
match timeout(Duration::from_millis(100), tls_stream.read(&mut temp_buffer)).await {
|
||||
Ok(Ok(0)) => break, // Connection closed
|
||||
Ok(Ok(n)) => {
|
||||
buffer.extend_from_slice(&temp_buffer[..n]);
|
||||
|
||||
if !headers_parsed {
|
||||
if let Some(pos) = buffer.windows(4).position(|w| w == b"\r\n\r\n") {
|
||||
headers_end_pos = Some(pos + 4);
|
||||
headers_parsed = true;
|
||||
|
||||
let headers_section = std::str::from_utf8(&buffer[..pos])
|
||||
.map_err(|e| GurtError::invalid_message(format!("Invalid UTF-8 in headers: {}", e)))?;
|
||||
|
||||
for line in headers_section.lines().skip(1) {
|
||||
if line.to_lowercase().starts_with("content-length:") {
|
||||
if let Some(length_str) = line.split(':').nth(1) {
|
||||
expected_body_length = length_str.trim().parse().ok();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if headers_parsed {
|
||||
if let (Some(headers_end), Some(expected_len)) = (headers_end_pos, expected_body_length) {
|
||||
if buffer.len() >= headers_end + expected_len {
|
||||
break;
|
||||
}
|
||||
} else if expected_body_length.is_none() && headers_parsed {
|
||||
break;
|
||||
}
|
||||
}
|
||||
},
|
||||
Ok(Err(e)) => return Err(GurtError::connection(format!("Read error: {}", e))),
|
||||
Err(_) => continue,
|
||||
}
|
||||
}
|
||||
|
||||
let response = GurtResponse::parse_bytes(&buffer)?;
|
||||
|
||||
self.return_connection_to_pool(host, port, tls_stream);
|
||||
|
||||
Ok(response)
|
||||
}
|
||||
@@ -410,6 +525,7 @@ impl Clone for GurtClient {
|
||||
fn clone(&self) -> Self {
|
||||
Self {
|
||||
config: self.config.clone(),
|
||||
connection_pool: self.connection_pool.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user