diff options
author | Keuin <[email protected]> | 2023-09-04 01:57:22 +0800 |
---|---|---|
committer | Keuin <[email protected]> | 2023-09-04 02:03:15 +0800 |
commit | 143014a91e695106d8383ed173c482b3b4519663 (patch) | |
tree | 717d8d34ce9a5857b0293f7fcf7ea9ba13199da7 /src/client.rs |
initial versionv0.1.0
Diffstat (limited to 'src/client.rs')
-rw-r--r-- | src/client.rs | 112 |
1 files changed, 112 insertions, 0 deletions
diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..1e2edc4 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,112 @@ +use std::io; +use std::net::ToSocketAddrs; + +use tokio::io::{copy_bidirectional, AsyncReadExt, AsyncWriteExt}; +use tokio::net::{TcpSocket, TcpStream}; +use tracing::{error, info, warn}; + +pub async fn handle_client( + mut sock: TcpStream, + upstream: &String, + service_name: &String, +) -> io::Result<()> { + let remote = match upstream.to_socket_addrs() { + Ok(mut addrs) => match addrs.next() { + None => { + error!("Cannot resolve addr: {}", &upstream); + return Ok(()); + } + Some(sa) => sa, + }, + Err(why) => { + error!("Failed to resolve addr: {}", why); + return Ok(()); + } + }; + let remote_sock; + if remote.is_ipv4() { + remote_sock = TcpSocket::new_v4(); + } else { + remote_sock = TcpSocket::new_v6(); + } + let remote_sock = match remote_sock { + Ok(rs) => rs, + Err(why) => { + error!("Failed to create socket: {}", why); + return Ok(()); + } + }; + let mut remote_sock = match remote_sock.connect(remote).await { + Ok(s) => s, + Err(why) => { + error!("Failed to connect to upstream: {}", why); + return Ok(()); + } + }; + send_upstream(&mut remote_sock, service_name.as_bytes()).await?; + send_upstream(&mut remote_sock, b"\r\n").await?; + remote_sock.flush().await?; + let status = remote_sock.read_u8().await?; + let mut msg = vec![0_u8; 1024]; + let mut i = 0; + let mut prev_is_cr = false; + loop { + let b = remote_sock.read_u8().await?; + if i < msg.len() { + msg[i] = b; + } + if b == b'\n' && prev_is_cr { + if i < msg.len() { + i -= 2; // remove CRLF from reported message string + } + break; + } + i += 1; + prev_is_cr = b == b'\r'; + } + let msg = match std::str::from_utf8(&msg[..i]) { + Ok(s) => s, + Err(why) => { + warn!( + "Failed to decode server message as UTF-8 string, ignore: {}", + why + ); + "???" + } + }; + match status { + b'+' => { + info!("Upstream service selected successfully: {}", msg); + } + b'-' => { + error!("Upstream responded with negative status: {}", msg); + return Ok(()); + } + b => { + error!("Invalid status returned from upstream, abort: {:x?}", b); + return Ok(()); + } + } + match copy_bidirectional(&mut sock, &mut remote_sock).await { + Ok((to_right, to_left)) => { + info!( + "Proxy session finished. Bytes: client to upstream: {}, upstream to client: {}", + to_right, to_left + ); + } + Err(why) => { + error!("Proxy connection was closed abnormally: {}", why); + } + }; + Ok(()) +} + +async fn send_upstream(remote: &mut TcpStream, data: &[u8]) -> io::Result<()> { + match remote.write_all(data).await { + Ok(_) => Ok(()), + Err(why) => { + error!("Failed to send bytes to upstream: {}", why); + Err(why) + } + } +} |