use anyhow::Result; use bytes::{BufMut, BytesMut}; use futures::{stream::StreamExt, SinkExt}; use log::{debug, error, info, warn}; use postcard::{from_bytes_cobs, to_stdvec_cobs}; use radomctl_protocol::{HostMessage, PositionTarget, RadomMessage}; use std::{env, io, str, time::Duration}; use tokio::time::sleep; use tokio::{ self, io::{AsyncBufReadExt, AsyncWriteExt, BufStream}, net::{TcpListener, TcpStream}, sync::{self, mpsc, watch}, time, }; use tokio_serial::SerialPortBuilderExt; use tokio_util::codec::{Decoder, Encoder}; use crate::rotctlprotocol::{parse_command, Command}; struct ProtocolCodec; impl Decoder for ProtocolCodec { type Item = RadomMessage; type Error = io::Error; fn decode(&mut self, src: &mut BytesMut) -> Result, Self::Error> { let frame_end = src.as_ref().iter().position(|b| *b == 0); if let Some(n) = frame_end { let mut frame = src.split_to(n + 1); let host_msg = from_bytes_cobs::(&mut frame).unwrap(); return Ok(Some(host_msg)); } Ok(None) } } impl Encoder for ProtocolCodec { type Error = io::Error; fn encode(&mut self, item: HostMessage, dst: &mut BytesMut) -> Result<(), Self::Error> { let msg_bytes = to_stdvec_cobs(&item).unwrap(); dst.put(msg_bytes.as_slice()); dst.put_u8(0); Ok(()) } } pub async fn control_rotor( mut rx_cmd: mpsc::Receiver, pos_tx: watch::Sender<(f32, f32)>, radom_port: String, ) -> Result<()> { let port = tokio_serial::new(radom_port, 115_200) .timeout(Duration::from_millis(10)) .open_native_async() .expect("Failed to open port"); let (mut port_writer, mut port_reader) = ProtocolCodec.framed(port).split(); loop { tokio::select! { Some(command) = rx_cmd.recv() => { match command { Command::SetPos(az, el) => { //info!("Received set pos {} {}", az, el); port_writer.send(HostMessage::SetTarget(PositionTarget { az, el })).await?; } _ => {} } }, _ = time::sleep(time::Duration::from_millis(100)) => { //info!("Requesting status"); port_writer.send(HostMessage::RequestStatus).await?; }, msg = port_reader.next() => { match msg { Some(Ok(msg)) => { match msg { RadomMessage::Status(status) => { //info!("Received status {:?}", status); pos_tx.send((status.position.az, status.position.el)).unwrap(); } _ => {} } } _ => {} } } else => return Ok(()) }; } }