mod rotctl; use anyhow::Result; use fern::colors::{Color, ColoredLevelConfig}; use log::{debug, error, info, warn}; use std::io; use tokio::{ self, io::{AsyncBufReadExt, AsyncWriteExt, BufStream}, net::{TcpListener, TcpStream}, sync::{self, mpsc, watch}, time, }; use rotctl::Command; async fn process_socket( socket: TcpStream, cmd_tx: mpsc::Sender, mut pos_rx: watch::Receiver<(f32, f32)>, ) { let mut stream = BufStream::new(socket); let mut line = String::new(); loop { if let Ok(n) = stream.read_line(&mut line).await { if n == 0 { return; } debug!("Received: {}", line.strip_suffix("\n").unwrap()); match rotctl::parse_command(&line) { Ok(cmd) => match cmd { Command::GetPos => { let (az, el) = pos_rx.borrow().clone(); stream .write_all(format!("{}\n{}\n", az, el).as_bytes()) .await .unwrap(); stream.flush().await.unwrap(); } Command::Exit => { stream.write_all("RPRT 0\n".as_bytes()).await.unwrap(); stream.flush().await.unwrap(); return; } cmd => { cmd_tx.send(cmd).await.unwrap(); stream.write_all("RPRT 0\n".as_bytes()).await.unwrap(); stream.flush().await.unwrap(); } }, Err(msg) => { error!("Unable to parse input:\n{}", msg); stream.write_all("RPRT 6\n".as_bytes()).await.unwrap(); stream.flush().await.unwrap(); } } line.clear(); } else { return; } } } async fn control_rotor(mut rx_cmd: mpsc::Receiver, pos_tx: watch::Sender<(f32, f32)>) { let mut actual_az = 0.0; let mut actual_el = 0.0; let mut target_az = 0.0; let mut target_el = 0.0; loop { tokio::select! { Some(command) = rx_cmd.recv() => { match command { Command::SetPos(az, el) => { info!("Received set pos {} {}", az, el); target_az = az; target_el = el; } _ => {} } }, _ = time::sleep(time::Duration::from_millis(100)) => { if target_az < actual_az { actual_az -= 1.0; } else if target_az > actual_az { actual_az += 1.0; } if target_el < actual_el { actual_el -= 1.0; } else if target_el > actual_el { actual_el += 1.0; } pos_tx.send((actual_az, actual_el)).unwrap(); }, else => return }; } } fn setup_logger() -> Result<()> { let colors = ColoredLevelConfig::new() .info(Color::Green) .error(Color::Red) .warn(Color::Yellow) .debug(Color::Blue); fern::Dispatch::new() // Perform allocation-free log formatting .format(move |out, message, record| { out.finish(format_args!( "[{} {} {}] {}", humantime::format_rfc3339_millis(std::time::SystemTime::now()), colors.color(record.level()), record.target(), message )) }) // Add blanket level filter - .level(log::LevelFilter::Debug) // - and per-module overrides .chain(std::io::stdout()) .chain(fern::log_file("output.log")?) // Apply globally .apply()?; Ok(()) } #[tokio::main] async fn main() -> Result<()> { setup_logger()?; let (cmd_tx, cmd_rx) = mpsc::channel::(16); let (pos_tx, pos_rx) = watch::channel::<(f32, f32)>((0.0, 0.0)); tokio::spawn(async move { control_rotor(cmd_rx, pos_tx).await; }); let listener = TcpListener::bind("127.0.0.1:8080").await?; loop { let (socket, _) = listener.accept().await?; let cmd_tx = cmd_tx.clone(); let pos_rx = pos_rx.clone(); tokio::spawn(async move { process_socket(socket, cmd_tx, pos_rx).await; }); } }