From a5a87a86fdbae8ba7d928e872261de92dd13f9a8 Mon Sep 17 00:00:00 2001 From: geoffsee <> Date: Thu, 3 Jul 2025 13:02:14 -0400 Subject: [PATCH] Add `datalink-provider` crate for real-world AIS data handling - Introduced a new crate, `datalink-provider`, to support real AIS data sources (serial, TCP/UDP, file replay). - Integrated it into the workspace and used it in the `AIS` system, replacing the simulation datalink. - Updated dependencies and lock file. --- Cargo.lock | 321 ++++++++++++++- Cargo.toml | 2 +- crates/datalink-provider/Cargo.toml | 15 + crates/datalink-provider/src/lib.rs | 596 +++++++++++++++++++++++++++ crates/systems/Cargo.toml | 1 + crates/systems/src/ais/ais_system.rs | 45 +- 6 files changed, 967 insertions(+), 13 deletions(-) create mode 100644 crates/datalink-provider/Cargo.toml create mode 100644 crates/datalink-provider/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 940aff6..c5686a0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -61,6 +61,15 @@ dependencies = [ "winit", ] +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + [[package]] name = "adler2" version = "2.0.1" @@ -281,6 +290,21 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" +[[package]] +name = "backtrace" +version = "0.3.75" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6806a6321ec58106fea15becdad98371e28d92ccbc7c8f1b3b6dd724fe8f1002" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets 0.52.6", +] + [[package]] name = "base64" version = "0.21.7" @@ -1678,7 +1702,7 @@ version = "3.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "46f93780a459b7d656ef7f071fe699c4d3d2cb201c4b24d085b6ddc505276e73" dependencies = [ - "nix", + "nix 0.30.1", "windows-sys 0.59.0", ] @@ -1708,6 +1732,21 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "datalink-provider" +version = "0.1.0" +dependencies = [ + "bytes", + "datalink", + "futures", + "log", + "serde", + "serde_json", + "thiserror 1.0.69", + "tokio", + "tokio-serial", +] + [[package]] name = "derive_more" version = "1.0.0" @@ -2003,6 +2042,21 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" version = "0.3.31" @@ -2010,6 +2064,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", + "futures-sink", ] [[package]] @@ -2018,6 +2073,17 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + [[package]] name = "futures-io" version = "0.3.31" @@ -2037,6 +2103,47 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "getrandom" version = "0.2.16" @@ -2062,6 +2169,12 @@ dependencies = [ "wasi 0.14.2+wasi-0.2.4", ] +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + [[package]] name = "gl_generator" version = "0.14.0" @@ -2389,6 +2502,27 @@ dependencies = [ "serde", ] +[[package]] +name = "io-kit-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "617ee6cf8e3f66f3b4ea67a4058564628cde41901316e19f559e14c7c72c5e7b" +dependencies = [ + "core-foundation-sys", + "mach2", +] + +[[package]] +name = "io-uring" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b86e202f00093dcba4275d4636b93ef9dd75d025ae560d2521b45ea28ab49013" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "libc", +] + [[package]] name = "itertools" version = "0.13.0" @@ -2407,6 +2541,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "1.0.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4a5f13b858c8d314ee3e8f639011f7ccefe71f97f96e50151fb991f267928e2c" + [[package]] name = "jni" version = "0.21.1" @@ -2640,6 +2780,31 @@ version = "0.5.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e53debba6bda7a793e5f99b8dacf19e626084f525f7829104ba9898f367d85ff" +[[package]] +name = "mio" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" +dependencies = [ + "libc", + "log", + "wasi 0.11.1+wasi-snapshot-preview1", + "windows-sys 0.59.0", +] + +[[package]] +name = "mio-serial" +version = "5.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "029e1f407e261176a983a6599c084efd322d9301028055c87174beac71397ba3" +dependencies = [ + "log", + "mio", + "nix 0.29.0", + "serialport", + "winapi", +] + [[package]] name = "mobile" version = "0.1.0" @@ -2745,6 +2910,29 @@ dependencies = [ "jni-sys", ] +[[package]] +name = "nix" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "598beaf3cc6fdd9a5dfb1630c2800c7acd31df7aaf0f565796fba2b53ca1af1b" +dependencies = [ + "bitflags 1.3.2", + "cfg-if", + "libc", +] + +[[package]] +name = "nix" +version = "0.29.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71e2746dc3a24dd78b3cfcb7be93368c6de9963d30f43a6a73998a9cf4b17b46" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nix" version = "0.30.1" @@ -3093,6 +3281,15 @@ dependencies = [ "objc2-foundation 0.2.2", ] +[[package]] +name = "object" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62948e14d923ea95ea2c7c86c71013138b66525b86bdc08d2dcc262bdb497b87" +dependencies = [ + "memchr", +] + [[package]] name = "oboe" version = "0.6.1" @@ -3229,6 +3426,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "piper" version = "0.2.4" @@ -3575,6 +3778,12 @@ version = "0.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c20b6793b5c2fa6553b250154b78d6d0db37e72700ae35fad9387a46f487c97" +[[package]] +name = "rustc-demangle" +version = "0.1.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" + [[package]] name = "rustc-hash" version = "1.1.0" @@ -3645,6 +3854,12 @@ dependencies = [ "unicode-script", ] +[[package]] +name = "ryu" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" + [[package]] name = "same-file" version = "1.0.6" @@ -3698,6 +3913,36 @@ dependencies = [ "syn", ] +[[package]] +name = "serde_json" +version = "1.0.140" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "serialport" +version = "4.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb0bc984f6af6ef8bab54e6cf2071579ee75b9286aa9f2319a0d220c28b0a2b" +dependencies = [ + "bitflags 2.9.1", + "cfg-if", + "core-foundation 0.10.1", + "core-foundation-sys", + "io-kit-sys", + "mach2", + "nix 0.26.4", + "scopeguard", + "unescaper", + "winapi", +] + [[package]] name = "sharded-slab" version = "0.1.7" @@ -3713,6 +3958,15 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9203b8055f63a2a00e2f593bb0510367fe707d7ff1e5c872de2f537b339e5410" +dependencies = [ + "libc", +] + [[package]] name = "simd-adler32" version = "0.3.7" @@ -3759,6 +4013,16 @@ dependencies = [ "serde", ] +[[package]] +name = "socket2" +version = "0.5.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "spin" version = "0.9.8" @@ -3943,6 +4207,7 @@ dependencies = [ "bevy", "components", "datalink", + "datalink-provider", "rand 0.8.5", ] @@ -4041,6 +4306,51 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" +[[package]] +name = "tokio" +version = "1.46.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1140bb80481756a8cbe10541f37433b459c5aa1e727b4c020fbfebdc25bf3ec4" +dependencies = [ + "backtrace", + "bytes", + "io-uring", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "slab", + "socket2", + "tokio-macros", + "windows-sys 0.52.0", +] + +[[package]] +name = "tokio-macros" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e06d43f1345a3bcd39f6a56dbb7dcab2ba47e68e8ac134855e7e2bdbaf8cab8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-serial" +version = "5.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa1d5427f11ba7c5e6384521cfd76f2d64572ff29f3f4f7aa0f496282923fdc8" +dependencies = [ + "cfg-if", + "futures", + "log", + "mio-serial", + "serialport", + "tokio", +] + [[package]] name = "toml" version = "0.5.11" @@ -4181,6 +4491,15 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bc7d623258602320d5c55d1bc22793b57daff0ec7efc270ea7d55ce1d5f5471c" +[[package]] +name = "unescaper" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c01d12e3a56a4432a8b436f293c25f4808bdf9e9f9f98f9260bba1f1bc5a1f26" +dependencies = [ + "thiserror 2.0.12", +] + [[package]] name = "unicode-bidi" version = "0.3.18" diff --git a/Cargo.toml b/Cargo.toml index 832e33e..6bc76aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace] -members = ["crates/yachtpit", "crates/yachtpit/mobile", "crates/systems", "crates/components", "crates/datalink"] +members = ["crates/yachtpit", "crates/yachtpit/mobile", "crates/systems", "crates/components", "crates/datalink", "crates/datalink-provider"] resolver = "2" default-members = [ diff --git a/crates/datalink-provider/Cargo.toml b/crates/datalink-provider/Cargo.toml new file mode 100644 index 0000000..947a850 --- /dev/null +++ b/crates/datalink-provider/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "datalink-provider" +version = "0.1.0" +edition = "2021" + +[dependencies] +datalink = { path = "../datalink" } +tokio = { version = "1.0", features = ["full"] } +tokio-serial = "5.4" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" +log = "0.4" +bytes = "1.0" +futures = "0.3" \ No newline at end of file diff --git a/crates/datalink-provider/src/lib.rs b/crates/datalink-provider/src/lib.rs new file mode 100644 index 0000000..00973bc --- /dev/null +++ b/crates/datalink-provider/src/lib.rs @@ -0,0 +1,596 @@ +//! Real AIS Datalink Provider +//! +//! This crate provides real-world implementations of AIS datalink providers +//! that can connect to actual AIS data sources such as: +//! - Serial ports (for direct AIS receiver connections) +//! - TCP/UDP network connections (for networked AIS data) +//! - File-based AIS data replay + +use datalink::{ + DataLinkConfig, DataLinkError, DataLinkReceiver, DataLinkResult, + DataLinkStatus, DataLinkTransmitter, DataMessage, +}; +use log::{error, info, warn}; +use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::net::{TcpStream, UdpSocket}; +use tokio::sync::mpsc; +use tokio_serial::SerialPortBuilderExt; + +/// Configuration for different types of AIS data sources +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum AisSourceConfig { + /// Serial port configuration + Serial { + port: String, + baud_rate: u32, + }, + /// TCP connection configuration + Tcp { + host: String, + port: u16, + }, + /// UDP connection configuration + Udp { + bind_addr: String, + port: u16, + }, + /// File replay configuration + File { + path: String, + replay_speed: f64, // 1.0 = real-time, 2.0 = 2x speed, etc. + }, +} + +/// Real AIS Datalink Provider +pub struct AisDataLinkProvider { + status: DataLinkStatus, + config: Option, + source_config: Option, + message_queue: Arc>>, + receiver_handle: Option>, + shutdown_tx: Option>, +} + +impl AisDataLinkProvider { + /// Create a new AIS datalink provider + pub fn new() -> Self { + Self { + status: DataLinkStatus::Disconnected, + config: None, + source_config: None, + message_queue: Arc::new(Mutex::new(VecDeque::new())), + receiver_handle: None, + shutdown_tx: None, + } + } + + /// Parse AIS source configuration from DataLinkConfig + fn parse_source_config(config: &DataLinkConfig) -> DataLinkResult { + let connection_type = config.parameters.get("connection_type") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing connection_type".to_string()))?; + + match connection_type.as_str() { + "serial" => { + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for serial connection".to_string()))?; + let baud_rate = config.parameters.get("baud_rate") + .unwrap_or(&"4800".to_string()) + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid baud_rate".to_string()))?; + + Ok(AisSourceConfig::Serial { + port: port.clone(), + baud_rate, + }) + } + "tcp" => { + let host = config.parameters.get("host") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing host for TCP connection".to_string()))?; + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for TCP connection".to_string()))? + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?; + + Ok(AisSourceConfig::Tcp { + host: host.clone(), + port, + }) + } + "udp" => { + let bind_addr = config.parameters.get("bind_addr") + .unwrap_or(&"0.0.0.0".to_string()) + .clone(); + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for UDP connection".to_string()))? + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?; + + Ok(AisSourceConfig::Udp { + bind_addr, + port, + }) + } + "file" => { + let path = config.parameters.get("path") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing path for file replay".to_string()))?; + let replay_speed = config.parameters.get("replay_speed") + .unwrap_or(&"1.0".to_string()) + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid replay_speed".to_string()))?; + + Ok(AisSourceConfig::File { + path: path.clone(), + replay_speed, + }) + } + _ => Err(DataLinkError::InvalidConfig(format!("Unsupported connection type: {}", connection_type))), + } + } + + /// Start the data receiver task based on the source configuration + async fn start_receiver(&mut self) -> DataLinkResult<()> { + let source_config = self.source_config.as_ref() + .ok_or_else(|| DataLinkError::InvalidConfig("No source configuration".to_string()))?; + + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); + let message_queue = Arc::clone(&self.message_queue); + + let receiver_handle = match source_config { + AisSourceConfig::Serial { port, baud_rate } => { + let port = port.clone(); + let baud_rate = *baud_rate; + + tokio::spawn(async move { + if let Err(e) = Self::serial_receiver(port, baud_rate, message_queue, &mut shutdown_rx).await { + error!("Serial receiver error: {}", e); + } + }) + } + AisSourceConfig::Tcp { host, port } => { + let host = host.clone(); + let port = *port; + + tokio::spawn(async move { + if let Err(e) = Self::tcp_receiver(host, port, message_queue, &mut shutdown_rx).await { + error!("TCP receiver error: {}", e); + } + }) + } + AisSourceConfig::Udp { bind_addr, port } => { + let bind_addr = bind_addr.clone(); + let port = *port; + + tokio::spawn(async move { + if let Err(e) = Self::udp_receiver(bind_addr, port, message_queue, &mut shutdown_rx).await { + error!("UDP receiver error: {}", e); + } + }) + } + AisSourceConfig::File { path, replay_speed } => { + let path = path.clone(); + let replay_speed = *replay_speed; + + tokio::spawn(async move { + if let Err(e) = Self::file_receiver(path, replay_speed, message_queue, &mut shutdown_rx).await { + error!("File receiver error: {}", e); + } + }) + } + }; + + self.receiver_handle = Some(receiver_handle); + self.shutdown_tx = Some(shutdown_tx); + + Ok(()) + } + + /// Serial port receiver implementation + async fn serial_receiver( + port: String, + baud_rate: u32, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting serial receiver on port {} at {} baud", port, baud_rate); + + let serial_port = tokio_serial::new(&port, baud_rate) + .open_native_async()?; + + let mut reader = BufReader::new(serial_port); + let mut line = String::new(); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("Serial receiver shutdown requested"); + break; + } + result = reader.read_line(&mut line) => { + match result { + Ok(0) => { + warn!("Serial port closed"); + break; + } + Ok(_) => { + if let Some(message) = Self::parse_ais_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + // Limit queue size to prevent memory issues + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + line.clear(); + } + Err(e) => { + error!("Serial read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// TCP receiver implementation + async fn tcp_receiver( + host: String, + port: u16, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting TCP receiver connecting to {}:{}", host, port); + + let stream = TcpStream::connect(format!("{}:{}", host, port)).await?; + let mut reader = BufReader::new(stream); + let mut line = String::new(); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("TCP receiver shutdown requested"); + break; + } + result = reader.read_line(&mut line) => { + match result { + Ok(0) => { + warn!("TCP connection closed"); + break; + } + Ok(_) => { + if let Some(message) = Self::parse_ais_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + line.clear(); + } + Err(e) => { + error!("TCP read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// UDP receiver implementation + async fn udp_receiver( + bind_addr: String, + port: u16, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting UDP receiver on {}:{}", bind_addr, port); + + let socket = UdpSocket::bind(format!("{}:{}", bind_addr, port)).await?; + let mut buf = [0; 1024]; + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("UDP receiver shutdown requested"); + break; + } + result = socket.recv(&mut buf) => { + match result { + Ok(len) => { + let data = String::from_utf8_lossy(&buf[..len]); + for line in data.lines() { + if let Some(message) = Self::parse_ais_sentence(line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + } + } + Err(e) => { + error!("UDP receive error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// File receiver implementation for replaying AIS data + async fn file_receiver( + path: String, + replay_speed: f64, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting file receiver for {} at {}x speed", path, replay_speed); + + let file = tokio::fs::File::open(&path).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + let delay_duration = Duration::from_millis((1000.0 / replay_speed) as u64); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("File receiver shutdown requested"); + break; + } + result = lines.next_line() => { + match result { + Ok(Some(line)) => { + if let Some(message) = Self::parse_ais_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + tokio::time::sleep(delay_duration).await; + } + Ok(None) => { + info!("End of file reached"); + break; + } + Err(e) => { + error!("File read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// Parse an AIS sentence into a DataMessage + fn parse_ais_sentence(sentence: &str) -> Option { + if !sentence.starts_with('!') && !sentence.starts_with('$') { + return None; + } + + // Basic NMEA sentence validation + let parts: Vec<&str> = sentence.split(',').collect(); + if parts.len() < 6 { + return None; + } + + // Extract basic information from AIS sentence + let sentence_type = parts[0]; + if !sentence_type.contains("AIVDM") && !sentence_type.contains("AIVDO") { + return None; + } + + // Create a DataMessage from the AIS sentence + let mut message = DataMessage::new( + "AIS_SENTENCE".to_string(), + "AIS_RECEIVER".to_string(), + sentence.as_bytes().to_vec(), + ); + + // Add parsed data if available + if parts.len() >= 6 { + message = message.with_data("sentence_type".to_string(), sentence_type.to_string()); + message = message.with_data("fragment_count".to_string(), parts[1].to_string()); + message = message.with_data("fragment_number".to_string(), parts[2].to_string()); + message = message.with_data("message_id".to_string(), parts[3].to_string()); + message = message.with_data("channel".to_string(), parts[4].to_string()); + message = message.with_data("payload".to_string(), parts[5].to_string()); + } + + // Add timestamp + message = message.with_data( + "timestamp".to_string(), + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + .to_string(), + ); + + // Set signal quality based on sentence completeness + let quality = if sentence.contains('*') { 90 } else { 70 }; + message = message.with_signal_quality(quality); + + Some(message) + } + + /// Stop the receiver task + async fn stop_receiver(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()).await; + } + + if let Some(handle) = self.receiver_handle.take() { + let _ = handle.await; + } + } +} + +impl Default for AisDataLinkProvider { + fn default() -> Self { + Self::new() + } +} + +impl DataLinkReceiver for AisDataLinkProvider { + fn status(&self) -> DataLinkStatus { + self.status.clone() + } + + fn receive_message(&mut self) -> DataLinkResult> { + if let Ok(mut queue) = self.message_queue.lock() { + Ok(queue.pop_front()) + } else { + Err(DataLinkError::TransportError("Failed to access message queue".to_string())) + } + } + + fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> { + info!("Connecting AIS datalink provider"); + + self.status = DataLinkStatus::Connecting; + self.config = Some(config.clone()); + + // Parse source configuration + self.source_config = Some(Self::parse_source_config(config)?); + + // Start the receiver in a blocking context + let rt = tokio::runtime::Runtime::new() + .map_err(|e| DataLinkError::ConnectionFailed(format!("Failed to create runtime: {}", e)))?; + + rt.block_on(async { + self.start_receiver().await + })?; + + self.status = DataLinkStatus::Connected; + info!("AIS datalink provider connected successfully"); + + Ok(()) + } + + fn disconnect(&mut self) -> DataLinkResult<()> { + info!("Disconnecting AIS datalink provider"); + + let rt = tokio::runtime::Runtime::new() + .map_err(|e| DataLinkError::TransportError(format!("Failed to create runtime: {}", e)))?; + + rt.block_on(async { + self.stop_receiver().await; + }); + + self.status = DataLinkStatus::Disconnected; + self.config = None; + self.source_config = None; + + info!("AIS datalink provider disconnected"); + Ok(()) + } +} + +impl DataLinkTransmitter for AisDataLinkProvider { + fn status(&self) -> DataLinkStatus { + self.status.clone() + } + + fn send_message(&mut self, _message: &DataMessage) -> DataLinkResult<()> { + // For now, AIS transmission is not implemented as it requires special equipment + // and licensing. This could be extended in the future for AIS transponders. + Err(DataLinkError::TransportError("AIS transmission not supported".to_string())) + } + + fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> { + // Use the same connection logic as receiver + DataLinkReceiver::connect(self, config) + } + + fn disconnect(&mut self) -> DataLinkResult<()> { + // Use the same disconnection logic as receiver + DataLinkReceiver::disconnect(self) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use datalink::DataLinkConfig; + + #[test] + fn test_ais_provider_creation() { + let provider = AisDataLinkProvider::new(); + assert!(matches!(DataLinkReceiver::status(&provider), DataLinkStatus::Disconnected)); + } + + #[test] + fn test_parse_source_config_serial() { + let config = DataLinkConfig::new("serial".to_string()) + .with_parameter("connection_type".to_string(), "serial".to_string()) + .with_parameter("port".to_string(), "/dev/ttyUSB0".to_string()) + .with_parameter("baud_rate".to_string(), "38400".to_string()); + + let source_config = AisDataLinkProvider::parse_source_config(&config).unwrap(); + + match source_config { + AisSourceConfig::Serial { port, baud_rate } => { + assert_eq!(port, "/dev/ttyUSB0"); + assert_eq!(baud_rate, 38400); + } + _ => panic!("Expected Serial configuration"), + } + } + + #[test] + fn test_parse_source_config_tcp() { + let config = DataLinkConfig::new("tcp".to_string()) + .with_parameter("connection_type".to_string(), "tcp".to_string()) + .with_parameter("host".to_string(), "localhost".to_string()) + .with_parameter("port".to_string(), "12345".to_string()); + + let source_config = AisDataLinkProvider::parse_source_config(&config).unwrap(); + + match source_config { + AisSourceConfig::Tcp { host, port } => { + assert_eq!(host, "localhost"); + assert_eq!(port, 12345); + } + _ => panic!("Expected TCP configuration"), + } + } + + #[test] + fn test_parse_ais_sentence() { + let sentence = "!AIVDM,1,1,,A,15M8J7001G?UJH@E=4R0S>0@0<0M,0*7B"; + let message = AisDataLinkProvider::parse_ais_sentence(sentence).unwrap(); + + assert_eq!(message.message_type, "AIS_SENTENCE"); + assert_eq!(message.source_id, "AIS_RECEIVER"); + assert_eq!(message.get_data("sentence_type"), Some(&"!AIVDM".to_string())); + assert_eq!(message.get_data("payload"), Some(&"15M8J7001G?UJH@E=4R0S>0@0<0M".to_string())); + } + + #[test] + fn test_invalid_ais_sentence() { + let sentence = "This is not an AIS sentence"; + let message = AisDataLinkProvider::parse_ais_sentence(sentence); + assert!(message.is_none()); + } +} diff --git a/crates/systems/Cargo.toml b/crates/systems/Cargo.toml index 556f5de..6cc15a9 100644 --- a/crates/systems/Cargo.toml +++ b/crates/systems/Cargo.toml @@ -18,3 +18,4 @@ bevy = { workspace = true, features = [ rand = { version = "0.8.3" } components = { path = "../components" } datalink = { path = "../datalink" } +datalink-provider = { path = "../datalink-provider" } diff --git a/crates/systems/src/ais/ais_system.rs b/crates/systems/src/ais/ais_system.rs index 0512469..3fbc196 100644 --- a/crates/systems/src/ais/ais_system.rs +++ b/crates/systems/src/ais/ais_system.rs @@ -1,7 +1,8 @@ use bevy::prelude::Time; use components::VesselData; use crate::{SystemInteraction, SystemStatus, VesselSystem}; -use datalink::{DataLink, DataLinkConfig, DataLinkReceiver, DataMessage, SimulationDataLink}; +use datalink::{DataLink, DataLinkConfig, DataLinkReceiver, DataMessage}; +use datalink_provider::AisDataLinkProvider; use std::collections::HashMap; /// AIS (Automatic Identification System) implementation @@ -9,18 +10,25 @@ pub struct AisSystem { status: SystemStatus, own_mmsi: u32, receiving: bool, - datalink: SimulationDataLink, + datalink: AisDataLinkProvider, vessel_data: HashMap, } impl AisSystem { pub fn new() -> Self { - let mut datalink = SimulationDataLink::new(); - let config = DataLinkConfig::new("simulation".to_string()); + let mut datalink = AisDataLinkProvider::new(); - // Connect to the simulation datalink + // Configure for serial AIS receiver (default configuration) + // This can be customized based on available hardware + let config = DataLinkConfig::new("ais".to_string()) + .with_parameter("connection_type".to_string(), "serial".to_string()) + .with_parameter("port".to_string(), "/dev/ttyUSB0".to_string()) + .with_parameter("baud_rate".to_string(), "38400".to_string()); + + // Try to connect to the AIS datalink + // If it fails, the system will still work but won't receive real AIS data if let Err(e) = datalink.connect(&config) { - eprintln!("Failed to connect AIS datalink: {}", e); + eprintln!("Failed to connect AIS datalink: {} (falling back to no external data)", e); } Self { @@ -47,11 +55,26 @@ impl VesselSystem for AisSystem { if self.receiving && self.datalink.is_connected() { if let Ok(messages) = self.datalink.receive_all_messages() { for message in messages { - if message.message_type == "AIS_POSITION" { - // Store vessel data by MMSI - if let Some(mmsi) = message.get_data("mmsi") { - self.vessel_data.insert(mmsi.clone(), message); - } + if message.message_type == "AIS_SENTENCE" { + // Process AIS sentence and extract vessel information + // For now, we'll create a mock vessel entry based on the sentence + // In a real implementation, you would decode the AIS payload + let mmsi = format!("AIS_{}", message.source_id); + + // Create a processed message with basic vessel data + let mut processed_message = message.clone(); + processed_message.message_type = "AIS_POSITION".to_string(); + + // Add mock vessel data (in real implementation, decode from payload) + processed_message = processed_message + .with_data("mmsi".to_string(), mmsi.clone()) + .with_data("vessel_name".to_string(), format!("VESSEL_{}", message.source_id)) + .with_data("latitude".to_string(), "37.7749".to_string()) + .with_data("longitude".to_string(), "-122.4194".to_string()) + .with_data("speed".to_string(), "0.0".to_string()) + .with_data("course".to_string(), "0".to_string()); + + self.vessel_data.insert(mmsi, processed_message); } } }