summaryrefslogtreecommitdiff
path: root/src/client.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/client.rs')
-rw-r--r--src/client.rs66
1 files changed, 50 insertions, 16 deletions
diff --git a/src/client.rs b/src/client.rs
index 3fcf78f..13def31 100644
--- a/src/client.rs
+++ b/src/client.rs
@@ -5,11 +5,32 @@ use tokio::io::{copy_bidirectional, AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpSocket, TcpStream};
use tracing::{error, info, warn};
+use crate::client::Error::{IOError, InvalidProtocol, InvalidService};
+
+use crate::protocol::ServiceName;
+
+pub struct SharedConfig {
+ pub no_ack: bool,
+}
+
+pub enum Error {
+ IOError(io::Error),
+ InvalidService,
+ InvalidProtocol,
+}
+
+impl From<io::Error> for Error {
+ fn from(value: io::Error) -> Self {
+ IOError(value)
+ }
+}
+
pub async fn handle_client(
+ cfg: &SharedConfig,
mut sock: TcpStream,
upstream: &String,
service_name: &String,
-) -> io::Result<()> {
+) -> Result<(), Error> {
let remote = match upstream.to_socket_addrs() {
Ok(mut addrs) => match addrs.next() {
None => {
@@ -43,9 +64,33 @@ pub async fn handle_client(
return Ok(());
}
};
- send_upstream(&mut remote_sock, service_name.as_bytes()).await?;
+ let service_name = ServiceName {
+ service_name,
+ no_ack: cfg.no_ack,
+ };
+ send_upstream(&mut remote_sock, service_name.to_string().as_bytes()).await?;
send_upstream(&mut remote_sock, b"\r\n").await?;
remote_sock.flush().await?;
+
+ if !cfg.no_ack {
+ read_status(&mut remote_sock).await?;
+ }
+
+ match copy_bidirectional(&mut sock, &mut remote_sock).await {
+ Ok((to_right, to_left)) => {
+ info!(
+ "Proxy session finished. Bytes: client to upstream: {}, upstream to client: {}",
+ to_right, to_left
+ );
+ }
+ Err(why) => {
+ error!("Proxy connection was closed abnormally: {}", why);
+ }
+ };
+ Ok(())
+}
+
+async fn read_status(remote_sock: &mut TcpStream) -> Result<(), Error> {
let status = remote_sock.read_u8().await?;
let mut msg = vec![0_u8; 1024];
let mut i = 0;
@@ -78,28 +123,17 @@ pub async fn handle_client(
match status {
b'+' => {
info!("Upstream service selected successfully: {}", msg);
+ Ok(())
}
b'-' => {
error!("Upstream responded with negative status: {}", msg);
- return Ok(());
+ Err(InvalidService)
}
b => {
error!("Invalid status returned from upstream, abort: {:x?}", b);
- return Ok(());
+ Err(InvalidProtocol)
}
}
- match copy_bidirectional(&mut sock, &mut remote_sock).await {
- Ok((to_right, to_left)) => {
- info!(
- "Proxy session finished. Bytes: client to upstream: {}, upstream to client: {}",
- to_right, to_left
- );
- }
- Err(why) => {
- error!("Proxy connection was closed abnormally: {}", why);
- }
- };
- Ok(())
}
async fn send_upstream(remote: &mut TcpStream, data: &[u8]) -> io::Result<()> {