From 9bb7cc7a2ceb822eed20fee6382153d4c46f9445 Mon Sep 17 00:00:00 2001 From: Sebastian Date: Sun, 5 May 2024 17:38:28 +0200 Subject: [PATCH] Split tcp handler and rotocontrol into seperate tasks --- src/main.rs | 81 +++++++++++++++++++++++++++++++++++++++++---------- src/rotctl.rs | 1 - 2 files changed, 65 insertions(+), 17 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1947a3c..ab3ffab 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,20 +4,23 @@ use tokio::{ self, io::{AsyncBufReadExt, AsyncWriteExt, BufStream}, net::{TcpListener, TcpStream}, + sync::{self, mpsc, watch}, + time, }; use rotctl::Command; use std::io; -async fn process_socket(socket: TcpStream) { +async fn process_socket( + socket: TcpStream, + cmd_tx: mpsc::Sender, + mut pos_rx: watch::Receiver<(f32, f32)>, +) { let mut stream = BufStream::new(socket); let mut line = String::new(); - let mut az = 0.0; - let mut el = 0.0; - loop { if let Ok(n) = stream.read_line(&mut line).await { if n == 0 { @@ -29,28 +32,24 @@ async fn process_socket(socket: TcpStream) { match rotctl::parse_command(&line) { Ok(cmd) => match cmd { Command::GetPos => { + let (az, el) = pos_rx.borrow().clone(); + stream .write_all(format!("{}\n{}\n", az, el).as_bytes()) .await .unwrap(); stream.flush().await.unwrap(); } - Command::SetPos(new_az, new_el) => { - az = new_az; - el = new_el; - stream.write_all("RPRT 0\n".as_bytes()).await.unwrap(); - stream.flush().await.unwrap(); - } - Command::Stop => { - stream.write_all("RPRT 0\n".as_bytes()).await.unwrap(); - stream.flush().await.unwrap(); - } Command::Exit => { stream.write_all("RPRT 0\n".as_bytes()).await.unwrap(); stream.flush().await.unwrap(); return; } - _ => {} + cmd => { + cmd_tx.send(cmd).await.unwrap(); + stream.write_all("RPRT 0\n".as_bytes()).await.unwrap(); + stream.flush().await.unwrap(); + } }, Err(msg) => { stream.write_all(msg.as_bytes()).await.unwrap(); @@ -65,14 +64,64 @@ async fn process_socket(socket: TcpStream) { } } +async fn control_rotor(mut rx_cmd: mpsc::Receiver, pos_tx: watch::Sender<(f32, f32)>) { + let mut actual_az = 0.0; + let mut actual_el = 0.0; + + let mut target_az = 0.0; + let mut target_el = 0.0; + + loop { + tokio::select! { + Some(command) = rx_cmd.recv() => { + match command { + Command::SetPos(az, el) => { + println!("Received set pos {} {}", az, el); + target_az = az; + target_el = el; + } + _ => {} + } + }, + _ = time::sleep(time::Duration::from_millis(100)) => { + if target_az < actual_az { + actual_az -= 1.0; + } else if target_az > actual_az { + actual_az += 1.0; + } + + if target_el < actual_el { + actual_el -= 1.0; + } else if target_el > actual_el { + actual_el += 1.0; + } + + pos_tx.send((actual_az, actual_el)).unwrap(); + }, + else => return + }; + } +} + #[tokio::main] async fn main() -> io::Result<()> { + let (cmd_tx, cmd_rx) = mpsc::channel::(16); + + let (pos_tx, pos_rx) = watch::channel::<(f32, f32)>((0.0, 0.0)); + + tokio::spawn(async move { + control_rotor(cmd_rx, pos_tx).await; + }); + let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (socket, _) = listener.accept().await?; + + let cmd_tx = cmd_tx.clone(); + let pos_rx = pos_rx.clone(); tokio::spawn(async move { - process_socket(socket).await; + process_socket(socket, cmd_tx, pos_rx).await; }); } } diff --git a/src/rotctl.rs b/src/rotctl.rs index 1bc36f3..fc70ab3 100644 --- a/src/rotctl.rs +++ b/src/rotctl.rs @@ -197,7 +197,6 @@ fn line(input: &str) -> IResult<&str, Command, VerboseError<&str>> { pub fn parse_command(input: &str) -> Result { let result = line(input); - println!("Input: {} Result: {:?}", input, result); match result { Ok(("", cmd)) => Ok(cmd), Ok((rest, _)) => Err("Unable to parse rest".to_owned()),