From 50dbc034090614d004d097c7a45b0a28a3bbb80b Mon Sep 17 00:00:00 2001 From: Keuin Date: Tue, 5 Sep 2023 01:52:56 +0800 Subject: feature: 0-rtt connection phase extension --- src/client.rs | 66 ++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 50 insertions(+), 16 deletions(-) (limited to 'src/client.rs') diff --git a/src/client.rs b/src/client.rs index 3fcf78f..13def31 100644 --- a/src/client.rs +++ b/src/client.rs @@ -5,11 +5,32 @@ use tokio::io::{copy_bidirectional, AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpSocket, TcpStream}; use tracing::{error, info, warn}; +use crate::client::Error::{IOError, InvalidProtocol, InvalidService}; + +use crate::protocol::ServiceName; + +pub struct SharedConfig { + pub no_ack: bool, +} + +pub enum Error { + IOError(io::Error), + InvalidService, + InvalidProtocol, +} + +impl From for Error { + fn from(value: io::Error) -> Self { + IOError(value) + } +} + pub async fn handle_client( + cfg: &SharedConfig, mut sock: TcpStream, upstream: &String, service_name: &String, -) -> io::Result<()> { +) -> Result<(), Error> { let remote = match upstream.to_socket_addrs() { Ok(mut addrs) => match addrs.next() { None => { @@ -43,9 +64,33 @@ pub async fn handle_client( return Ok(()); } }; - send_upstream(&mut remote_sock, service_name.as_bytes()).await?; + let service_name = ServiceName { + service_name, + no_ack: cfg.no_ack, + }; + send_upstream(&mut remote_sock, service_name.to_string().as_bytes()).await?; send_upstream(&mut remote_sock, b"\r\n").await?; remote_sock.flush().await?; + + if !cfg.no_ack { + read_status(&mut remote_sock).await?; + } + + match copy_bidirectional(&mut sock, &mut remote_sock).await { + Ok((to_right, to_left)) => { + info!( + "Proxy session finished. Bytes: client to upstream: {}, upstream to client: {}", + to_right, to_left + ); + } + Err(why) => { + error!("Proxy connection was closed abnormally: {}", why); + } + }; + Ok(()) +} + +async fn read_status(remote_sock: &mut TcpStream) -> Result<(), Error> { let status = remote_sock.read_u8().await?; let mut msg = vec![0_u8; 1024]; let mut i = 0; @@ -78,28 +123,17 @@ pub async fn handle_client( match status { b'+' => { info!("Upstream service selected successfully: {}", msg); + Ok(()) } b'-' => { error!("Upstream responded with negative status: {}", msg); - return Ok(()); + Err(InvalidService) } b => { error!("Invalid status returned from upstream, abort: {:x?}", b); - return Ok(()); + Err(InvalidProtocol) } } - match copy_bidirectional(&mut sock, &mut remote_sock).await { - Ok((to_right, to_left)) => { - info!( - "Proxy session finished. Bytes: client to upstream: {}, upstream to client: {}", - to_right, to_left - ); - } - Err(why) => { - error!("Proxy connection was closed abnormally: {}", why); - } - }; - Ok(()) } async fn send_upstream(remote: &mut TcpStream, data: &[u8]) -> io::Result<()> { -- cgit v1.2.3