summaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
authorKeuin <[email protected]>2023-09-04 01:57:22 +0800
committerKeuin <[email protected]>2023-09-04 02:03:15 +0800
commit143014a91e695106d8383ed173c482b3b4519663 (patch)
tree717d8d34ce9a5857b0293f7fcf7ea9ba13199da7 /src/client.rs
initial versionv0.1.0
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs112
1 files changed, 112 insertions, 0 deletions
diff --git a/src/client.rs b/src/client.rs
new file mode 100644
index 0000000..1e2edc4
--- /dev/null
+++ b/src/client.rs
@@ -0,0 +1,112 @@
+use std::io;
+use std::net::ToSocketAddrs;
+
+use tokio::io::{copy_bidirectional, AsyncReadExt, AsyncWriteExt};
+use tokio::net::{TcpSocket, TcpStream};
+use tracing::{error, info, warn};
+
+pub async fn handle_client(
+ mut sock: TcpStream,
+ upstream: &String,
+ service_name: &String,
+) -> io::Result<()> {
+ let remote = match upstream.to_socket_addrs() {
+ Ok(mut addrs) => match addrs.next() {
+ None => {
+ error!("Cannot resolve addr: {}", &upstream);
+ return Ok(());
+ }
+ Some(sa) => sa,
+ },
+ Err(why) => {
+ error!("Failed to resolve addr: {}", why);
+ return Ok(());
+ }
+ };
+ let remote_sock;
+ if remote.is_ipv4() {
+ remote_sock = TcpSocket::new_v4();
+ } else {
+ remote_sock = TcpSocket::new_v6();
+ }
+ let remote_sock = match remote_sock {
+ Ok(rs) => rs,
+ Err(why) => {
+ error!("Failed to create socket: {}", why);
+ return Ok(());
+ }
+ };
+ let mut remote_sock = match remote_sock.connect(remote).await {
+ Ok(s) => s,
+ Err(why) => {
+ error!("Failed to connect to upstream: {}", why);
+ return Ok(());
+ }
+ };
+ send_upstream(&mut remote_sock, service_name.as_bytes()).await?;
+ send_upstream(&mut remote_sock, b"\r\n").await?;
+ remote_sock.flush().await?;
+ let status = remote_sock.read_u8().await?;
+ let mut msg = vec![0_u8; 1024];
+ let mut i = 0;
+ let mut prev_is_cr = false;
+ loop {
+ let b = remote_sock.read_u8().await?;
+ if i < msg.len() {
+ msg[i] = b;
+ }
+ if b == b'\n' && prev_is_cr {
+ if i < msg.len() {
+ i -= 2; // remove CRLF from reported message string
+ }
+ break;
+ }
+ i += 1;
+ prev_is_cr = b == b'\r';
+ }
+ let msg = match std::str::from_utf8(&msg[..i]) {
+ Ok(s) => s,
+ Err(why) => {
+ warn!(
+ "Failed to decode server message as UTF-8 string, ignore: {}",
+ why
+ );
+ "???"
+ }
+ };
+ match status {
+ b'+' => {
+ info!("Upstream service selected successfully: {}", msg);
+ }
+ b'-' => {
+ error!("Upstream responded with negative status: {}", msg);
+ return Ok(());
+ }
+ b => {
+ error!("Invalid status returned from upstream, abort: {:x?}", b);
+ return Ok(());
+ }
+ }
+ match copy_bidirectional(&mut sock, &mut remote_sock).await {
+ Ok((to_right, to_left)) => {
+ info!(
+ "Proxy session finished. Bytes: client to upstream: {}, upstream to client: {}",
+ to_right, to_left
+ );
+ }
+ Err(why) => {
+ error!("Proxy connection was closed abnormally: {}", why);
+ }
+ };
+ Ok(())
+}
+
+async fn send_upstream(remote: &mut TcpStream, data: &[u8]) -> io::Result<()> {
+ match remote.write_all(data).await {
+ Ok(_) => Ok(()),
+ Err(why) => {
+ error!("Failed to send bytes to upstream: {}", why);
+ Err(why)
+ }
+ }
+}