diff --git a/crates/datalink-provider/src/ais/mod.rs b/crates/datalink-provider/src/ais/mod.rs new file mode 100644 index 0000000..14b8ce9 --- /dev/null +++ b/crates/datalink-provider/src/ais/mod.rs @@ -0,0 +1,519 @@ +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; +use log::{error, info, warn}; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::net::{TcpStream, UdpSocket}; +use tokio::sync::mpsc; +use tokio_serial::SerialPortBuilderExt; +use datalink::{DataLinkConfig, DataLinkError, DataLinkReceiver, DataLinkResult, DataLinkStatus, DataLinkTransmitter, DataMessage}; + +/// Configuration for different types of AIS data sources +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum AisSourceConfig { + /// Serial port configuration + Serial { + port: String, + baud_rate: u32, + }, + /// TCP connection configuration + Tcp { + host: String, + port: u16, + }, + /// UDP connection configuration + Udp { + bind_addr: String, + port: u16, + }, + /// File replay configuration + File { + path: String, + replay_speed: f64, // 1.0 = real-time, 2.0 = 2x speed, etc. + }, +} + +/// Real AIS Datalink Provider +pub struct AisDataLinkProvider { + status: DataLinkStatus, + config: Option, + source_config: Option, + message_queue: Arc>>, + receiver_handle: Option>, + shutdown_tx: Option>, +} + +impl AisDataLinkProvider { + /// Create a new AIS datalink provider + pub fn new() -> Self { + Self { + status: DataLinkStatus::Disconnected, + config: None, + source_config: None, + message_queue: Arc::new(Mutex::new(VecDeque::new())), + receiver_handle: None, + shutdown_tx: None, + } + } + + /// Parse AIS source configuration from DataLinkConfig + pub fn parse_source_config(config: &DataLinkConfig) -> DataLinkResult { + let connection_type = config.parameters.get("connection_type") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing connection_type".to_string()))?; + + match connection_type.as_str() { + "serial" => { + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for serial connection".to_string()))?; + let baud_rate = config.parameters.get("baud_rate") + .unwrap_or(&"4800".to_string()) + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid baud_rate".to_string()))?; + + Ok(AisSourceConfig::Serial { + port: port.clone(), + baud_rate, + }) + } + "tcp" => { + let host = config.parameters.get("host") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing host for TCP connection".to_string()))?; + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for TCP connection".to_string()))? + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?; + + Ok(AisSourceConfig::Tcp { + host: host.clone(), + port, + }) + } + "udp" => { + let bind_addr = config.parameters.get("bind_addr") + .unwrap_or(&"0.0.0.0".to_string()) + .clone(); + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for UDP connection".to_string()))? + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?; + + Ok(AisSourceConfig::Udp { + bind_addr, + port, + }) + } + "file" => { + let path = config.parameters.get("path") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing path for file replay".to_string()))?; + let replay_speed = config.parameters.get("replay_speed") + .unwrap_or(&"1.0".to_string()) + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid replay_speed".to_string()))?; + + Ok(AisSourceConfig::File { + path: path.clone(), + replay_speed, + }) + } + _ => Err(DataLinkError::InvalidConfig(format!("Unsupported connection type: {}", connection_type))), + } + } + + /// Start the data receiver task based on the source configuration + async fn start_receiver(&mut self) -> DataLinkResult<()> { + let source_config = self.source_config.as_ref() + .ok_or_else(|| DataLinkError::InvalidConfig("No source configuration".to_string()))?; + + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); + let message_queue = Arc::clone(&self.message_queue); + + let receiver_handle = match source_config { + AisSourceConfig::Serial { port, baud_rate } => { + let port = port.clone(); + let baud_rate = *baud_rate; + + tokio::spawn(async move { + if let Err(e) = Self::serial_receiver(port, baud_rate, message_queue, &mut shutdown_rx).await { + error!("Serial receiver error: {}", e); + } + }) + } + AisSourceConfig::Tcp { host, port } => { + let host = host.clone(); + let port = *port; + + tokio::spawn(async move { + if let Err(e) = Self::tcp_receiver(host, port, message_queue, &mut shutdown_rx).await { + error!("TCP receiver error: {}", e); + } + }) + } + AisSourceConfig::Udp { bind_addr, port } => { + let bind_addr = bind_addr.clone(); + let port = *port; + + tokio::spawn(async move { + if let Err(e) = Self::udp_receiver(bind_addr, port, message_queue, &mut shutdown_rx).await { + error!("UDP receiver error: {}", e); + } + }) + } + AisSourceConfig::File { path, replay_speed } => { + let path = path.clone(); + let replay_speed = *replay_speed; + + tokio::spawn(async move { + if let Err(e) = Self::file_receiver(path, replay_speed, message_queue, &mut shutdown_rx).await { + error!("File receiver error: {}", e); + } + }) + } + }; + + self.receiver_handle = Some(receiver_handle); + self.shutdown_tx = Some(shutdown_tx); + + Ok(()) + } + + /// Serial port receiver implementation + async fn serial_receiver( + port: String, + baud_rate: u32, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting serial receiver on port {} at {} baud", port, baud_rate); + + let serial_port = tokio_serial::new(&port, baud_rate) + .open_native_async()?; + + let mut reader = BufReader::new(serial_port); + let mut line = String::new(); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("Serial receiver shutdown requested"); + break; + } + result = reader.read_line(&mut line) => { + match result { + Ok(0) => { + warn!("Serial port closed"); + break; + } + Ok(_) => { + if let Some(message) = Self::parse_ais_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + // Limit queue size to prevent memory issues + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + line.clear(); + } + Err(e) => { + error!("Serial read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// TCP receiver implementation + async fn tcp_receiver( + host: String, + port: u16, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting TCP receiver connecting to {}:{}", host, port); + + let stream = TcpStream::connect(format!("{}:{}", host, port)).await?; + let mut reader = BufReader::new(stream); + let mut line = String::new(); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("TCP receiver shutdown requested"); + break; + } + result = reader.read_line(&mut line) => { + match result { + Ok(0) => { + warn!("TCP connection closed"); + break; + } + Ok(_) => { + if let Some(message) = Self::parse_ais_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + line.clear(); + } + Err(e) => { + error!("TCP read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// UDP receiver implementation + async fn udp_receiver( + bind_addr: String, + port: u16, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting UDP receiver on {}:{}", bind_addr, port); + + let socket = UdpSocket::bind(format!("{}:{}", bind_addr, port)).await?; + let mut buf = [0; 1024]; + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("UDP receiver shutdown requested"); + break; + } + result = socket.recv(&mut buf) => { + match result { + Ok(len) => { + let data = String::from_utf8_lossy(&buf[..len]); + for line in data.lines() { + if let Some(message) = Self::parse_ais_sentence(line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + } + } + Err(e) => { + error!("UDP receive error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// File receiver implementation for replaying AIS data + async fn file_receiver( + path: String, + replay_speed: f64, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting file receiver for {} at {}x speed", path, replay_speed); + + let file = tokio::fs::File::open(&path).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + let delay_duration = Duration::from_millis((1000.0 / replay_speed) as u64); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("File receiver shutdown requested"); + break; + } + result = lines.next_line() => { + match result { + Ok(Some(line)) => { + if let Some(message) = Self::parse_ais_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + tokio::time::sleep(delay_duration).await; + } + Ok(None) => { + info!("End of file reached"); + break; + } + Err(e) => { + error!("File read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// Parse an AIS sentence into a DataMessage + pub fn parse_ais_sentence(sentence: &str) -> Option { + if !sentence.starts_with('!') && !sentence.starts_with('$') { + return None; + } + + // Basic NMEA sentence validation + let parts: Vec<&str> = sentence.split(',').collect(); + if parts.len() < 6 { + return None; + } + + // Extract basic information from AIS sentence + let sentence_type = parts[0]; + if !sentence_type.contains("AIVDM") && !sentence_type.contains("AIVDO") { + return None; + } + + // Create a DataMessage from the AIS sentence + let mut message = DataMessage::new( + "AIS_SENTENCE".to_string(), + "AIS_RECEIVER".to_string(), + sentence.as_bytes().to_vec(), + ); + + // Add parsed data if available + if parts.len() >= 6 { + message = message.with_data("sentence_type".to_string(), sentence_type.to_string()); + message = message.with_data("fragment_count".to_string(), parts[1].to_string()); + message = message.with_data("fragment_number".to_string(), parts[2].to_string()); + message = message.with_data("message_id".to_string(), parts[3].to_string()); + message = message.with_data("channel".to_string(), parts[4].to_string()); + message = message.with_data("payload".to_string(), parts[5].to_string()); + } + + // Add timestamp + message = message.with_data( + "timestamp".to_string(), + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + .to_string(), + ); + + // Set signal quality based on sentence completeness + let quality = if sentence.contains('*') { 90 } else { 70 }; + message = message.with_signal_quality(quality); + + Some(message) + } + + /// Stop the receiver task + async fn stop_receiver(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()).await; + } + + if let Some(handle) = self.receiver_handle.take() { + let _ = handle.await; + } + } +} + +impl Default for AisDataLinkProvider { + fn default() -> Self { + Self::new() + } +} + +impl DataLinkReceiver for AisDataLinkProvider { + fn status(&self) -> DataLinkStatus { + self.status.clone() + } + + fn receive_message(&mut self) -> DataLinkResult> { + if let Ok(mut queue) = self.message_queue.lock() { + Ok(queue.pop_front()) + } else { + Err(DataLinkError::TransportError("Failed to access message queue".to_string())) + } + } + + fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> { + info!("Connecting AIS datalink provider"); + + self.status = DataLinkStatus::Connecting; + self.config = Some(config.clone()); + + // Parse source configuration + self.source_config = Some(Self::parse_source_config(config)?); + + // Start the receiver in a blocking context + let rt = tokio::runtime::Runtime::new() + .map_err(|e| DataLinkError::ConnectionFailed(format!("Failed to create runtime: {}", e)))?; + + rt.block_on(async { + self.start_receiver().await + })?; + + self.status = DataLinkStatus::Connected; + info!("AIS datalink provider connected successfully"); + + Ok(()) + } + + fn disconnect(&mut self) -> DataLinkResult<()> { + info!("Disconnecting AIS datalink provider"); + + let rt = tokio::runtime::Runtime::new() + .map_err(|e| DataLinkError::TransportError(format!("Failed to create runtime: {}", e)))?; + + rt.block_on(async { + self.stop_receiver().await; + }); + + self.status = DataLinkStatus::Disconnected; + self.config = None; + self.source_config = None; + + info!("AIS datalink provider disconnected"); + Ok(()) + } +} + +impl DataLinkTransmitter for AisDataLinkProvider { + fn status(&self) -> DataLinkStatus { + self.status.clone() + } + + fn send_message(&mut self, _message: &DataMessage) -> DataLinkResult<()> { + // For now, AIS transmission is not implemented as it requires special equipment + // and licensing. This could be extended in the future for AIS transponders. + Err(DataLinkError::TransportError("AIS transmission not supported".to_string())) + } + + fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> { + // Use the same connection logic as receiver + DataLinkReceiver::connect(self, config) + } + + fn disconnect(&mut self) -> DataLinkResult<()> { + // Use the same disconnection logic as receiver + DataLinkReceiver::disconnect(self) + } +} diff --git a/crates/datalink-provider/src/gps/mod.rs b/crates/datalink-provider/src/gps/mod.rs new file mode 100644 index 0000000..3e5ed50 --- /dev/null +++ b/crates/datalink-provider/src/gps/mod.rs @@ -0,0 +1,572 @@ +use std::collections::VecDeque; +use std::sync::{Arc, Mutex}; +use std::time::{Duration, SystemTime}; +use log::{error, info, warn}; +use serde::{Deserialize, Serialize}; +use tokio::io::{AsyncBufReadExt, BufReader}; +use tokio::net::{TcpStream, UdpSocket}; +use tokio::sync::mpsc; +use tokio_serial::SerialPortBuilderExt; +use datalink::{DataLinkConfig, DataLinkError, DataLinkReceiver, DataLinkResult, DataLinkStatus, DataLinkTransmitter, DataMessage}; + +/// Configuration for different types of GPS data sources +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum GpsSourceConfig { + /// Serial port configuration + Serial { + port: String, + baud_rate: u32, + }, + /// TCP connection configuration + Tcp { + host: String, + port: u16, + }, + /// UDP connection configuration + Udp { + bind_addr: String, + port: u16, + }, + /// File replay configuration + File { + path: String, + replay_speed: f64, // 1.0 = real-time, 2.0 = 2x speed, etc. + }, +} + +/// Real GPS Datalink Provider +pub struct GpsDataLinkProvider { + status: DataLinkStatus, + config: Option, + source_config: Option, + message_queue: Arc>>, + receiver_handle: Option>, + shutdown_tx: Option>, +} + +impl GpsDataLinkProvider { + /// Create a new GPS datalink provider + pub fn new() -> Self { + Self { + status: DataLinkStatus::Disconnected, + config: None, + source_config: None, + message_queue: Arc::new(Mutex::new(VecDeque::new())), + receiver_handle: None, + shutdown_tx: None, + } + } + + /// Parse GPS source configuration from DataLinkConfig + pub fn parse_source_config(config: &DataLinkConfig) -> DataLinkResult { + let connection_type = config.parameters.get("connection_type") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing connection_type".to_string()))?; + + match connection_type.as_str() { + "serial" => { + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for serial connection".to_string()))?; + let baud_rate = config.parameters.get("baud_rate") + .unwrap_or(&"4800".to_string()) + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid baud_rate".to_string()))?; + + Ok(GpsSourceConfig::Serial { + port: port.clone(), + baud_rate, + }) + } + "tcp" => { + let host = config.parameters.get("host") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing host for TCP connection".to_string()))?; + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for TCP connection".to_string()))? + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?; + + Ok(GpsSourceConfig::Tcp { + host: host.clone(), + port, + }) + } + "udp" => { + let bind_addr = config.parameters.get("bind_addr") + .unwrap_or(&"0.0.0.0".to_string()) + .clone(); + let port = config.parameters.get("port") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for UDP connection".to_string()))? + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?; + + Ok(GpsSourceConfig::Udp { + bind_addr, + port, + }) + } + "file" => { + let path = config.parameters.get("path") + .ok_or_else(|| DataLinkError::InvalidConfig("Missing path for file replay".to_string()))?; + let replay_speed = config.parameters.get("replay_speed") + .unwrap_or(&"1.0".to_string()) + .parse::() + .map_err(|_| DataLinkError::InvalidConfig("Invalid replay_speed".to_string()))?; + + Ok(GpsSourceConfig::File { + path: path.clone(), + replay_speed, + }) + } + _ => Err(DataLinkError::InvalidConfig(format!("Unsupported connection type: {}", connection_type))), + } + } + + /// Start the data receiver task based on the source configuration + async fn start_receiver(&mut self) -> DataLinkResult<()> { + let source_config = self.source_config.as_ref() + .ok_or_else(|| DataLinkError::InvalidConfig("No source configuration".to_string()))?; + + let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); + let message_queue = Arc::clone(&self.message_queue); + + let receiver_handle = match source_config { + GpsSourceConfig::Serial { port, baud_rate } => { + let port = port.clone(); + let baud_rate = *baud_rate; + + tokio::spawn(async move { + if let Err(e) = Self::serial_receiver(port, baud_rate, message_queue, &mut shutdown_rx).await { + error!("GPS Serial receiver error: {}", e); + } + }) + } + GpsSourceConfig::Tcp { host, port } => { + let host = host.clone(); + let port = *port; + + tokio::spawn(async move { + if let Err(e) = Self::tcp_receiver(host, port, message_queue, &mut shutdown_rx).await { + error!("GPS TCP receiver error: {}", e); + } + }) + } + GpsSourceConfig::Udp { bind_addr, port } => { + let bind_addr = bind_addr.clone(); + let port = *port; + + tokio::spawn(async move { + if let Err(e) = Self::udp_receiver(bind_addr, port, message_queue, &mut shutdown_rx).await { + error!("GPS UDP receiver error: {}", e); + } + }) + } + GpsSourceConfig::File { path, replay_speed } => { + let path = path.clone(); + let replay_speed = *replay_speed; + + tokio::spawn(async move { + if let Err(e) = Self::file_receiver(path, replay_speed, message_queue, &mut shutdown_rx).await { + error!("GPS File receiver error: {}", e); + } + }) + } + }; + + self.receiver_handle = Some(receiver_handle); + self.shutdown_tx = Some(shutdown_tx); + + Ok(()) + } + + /// Serial port receiver implementation + async fn serial_receiver( + port: String, + baud_rate: u32, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting GPS serial receiver on port {} at {} baud", port, baud_rate); + + let serial_port = tokio_serial::new(&port, baud_rate) + .open_native_async()?; + + let mut reader = BufReader::new(serial_port); + let mut line = String::new(); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("GPS Serial receiver shutdown requested"); + break; + } + result = reader.read_line(&mut line) => { + match result { + Ok(0) => { + warn!("GPS Serial port closed"); + break; + } + Ok(_) => { + if let Some(message) = Self::parse_gps_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + // Limit queue size to prevent memory issues + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + line.clear(); + } + Err(e) => { + error!("GPS Serial read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// TCP receiver implementation + async fn tcp_receiver( + host: String, + port: u16, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting GPS TCP receiver connecting to {}:{}", host, port); + + let stream = TcpStream::connect(format!("{}:{}", host, port)).await?; + let mut reader = BufReader::new(stream); + let mut line = String::new(); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("GPS TCP receiver shutdown requested"); + break; + } + result = reader.read_line(&mut line) => { + match result { + Ok(0) => { + warn!("GPS TCP connection closed"); + break; + } + Ok(_) => { + if let Some(message) = Self::parse_gps_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + line.clear(); + } + Err(e) => { + error!("GPS TCP read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// UDP receiver implementation + async fn udp_receiver( + bind_addr: String, + port: u16, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting GPS UDP receiver on {}:{}", bind_addr, port); + + let socket = UdpSocket::bind(format!("{}:{}", bind_addr, port)).await?; + let mut buf = [0; 1024]; + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("GPS UDP receiver shutdown requested"); + break; + } + result = socket.recv(&mut buf) => { + match result { + Ok(len) => { + let data = String::from_utf8_lossy(&buf[..len]); + for line in data.lines() { + if let Some(message) = Self::parse_gps_sentence(line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + } + } + Err(e) => { + error!("GPS UDP receive error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// File receiver implementation for replaying GPS data + async fn file_receiver( + path: String, + replay_speed: f64, + message_queue: Arc>>, + shutdown_rx: &mut mpsc::Receiver<()>, + ) -> Result<(), Box> { + info!("Starting GPS file receiver for {} at {}x speed", path, replay_speed); + + let file = tokio::fs::File::open(&path).await?; + let reader = BufReader::new(file); + let mut lines = reader.lines(); + + let delay_duration = Duration::from_millis((1000.0 / replay_speed) as u64); + + loop { + tokio::select! { + _ = shutdown_rx.recv() => { + info!("GPS File receiver shutdown requested"); + break; + } + result = lines.next_line() => { + match result { + Ok(Some(line)) => { + if let Some(message) = Self::parse_gps_sentence(&line.trim()) { + if let Ok(mut queue) = message_queue.lock() { + queue.push_back(message); + if queue.len() > 1000 { + queue.pop_front(); + } + } + } + tokio::time::sleep(delay_duration).await; + } + Ok(None) => { + info!("GPS End of file reached"); + break; + } + Err(e) => { + error!("GPS File read error: {}", e); + break; + } + } + } + } + } + + Ok(()) + } + + /// Parse a GPS NMEA sentence into a DataMessage + pub fn parse_gps_sentence(sentence: &str) -> Option { + if !sentence.starts_with('$') { + return None; + } + + // Basic NMEA sentence validation + let parts: Vec<&str> = sentence.split(',').collect(); + if parts.len() < 3 { + return None; + } + + // Extract sentence type (first part after $) + let sentence_type = parts[0]; + + // Check for common GPS sentence types + if !sentence_type.contains("GPGGA") && + !sentence_type.contains("GPRMC") && + !sentence_type.contains("GPGLL") && + !sentence_type.contains("GPVTG") && + !sentence_type.contains("GPGSA") && + !sentence_type.contains("GPGSV") && + !sentence_type.contains("GNRMC") && + !sentence_type.contains("GNGGA") && + !sentence_type.contains("GNGLL") { + return None; + } + + // Create a DataMessage from the GPS sentence + let mut message = DataMessage::new( + "GPS_SENTENCE".to_string(), + "GPS_RECEIVER".to_string(), + sentence.as_bytes().to_vec(), + ); + + // Add parsed data based on sentence type + message = message.with_data("sentence_type".to_string(), sentence_type.to_string()); + + // Parse specific GPS sentence types + match sentence_type { + s if s.contains("GPGGA") || s.contains("GNGGA") => { + // Global Positioning System Fix Data + if parts.len() >= 15 { + message = message.with_data("time".to_string(), parts[1].to_string()); + message = message.with_data("latitude".to_string(), parts[2].to_string()); + message = message.with_data("lat_direction".to_string(), parts[3].to_string()); + message = message.with_data("longitude".to_string(), parts[4].to_string()); + message = message.with_data("lon_direction".to_string(), parts[5].to_string()); + message = message.with_data("fix_quality".to_string(), parts[6].to_string()); + message = message.with_data("satellites".to_string(), parts[7].to_string()); + message = message.with_data("hdop".to_string(), parts[8].to_string()); + message = message.with_data("altitude".to_string(), parts[9].to_string()); + message = message.with_data("altitude_unit".to_string(), parts[10].to_string()); + } + } + s if s.contains("GPRMC") || s.contains("GNRMC") => { + // Recommended Minimum Course + if parts.len() >= 12 { + message = message.with_data("time".to_string(), parts[1].to_string()); + message = message.with_data("status".to_string(), parts[2].to_string()); + message = message.with_data("latitude".to_string(), parts[3].to_string()); + message = message.with_data("lat_direction".to_string(), parts[4].to_string()); + message = message.with_data("longitude".to_string(), parts[5].to_string()); + message = message.with_data("lon_direction".to_string(), parts[6].to_string()); + message = message.with_data("speed".to_string(), parts[7].to_string()); + message = message.with_data("course".to_string(), parts[8].to_string()); + message = message.with_data("date".to_string(), parts[9].to_string()); + } + } + s if s.contains("GPGLL") || s.contains("GNGLL") => { + // Geographic Position - Latitude/Longitude + if parts.len() >= 7 { + message = message.with_data("latitude".to_string(), parts[1].to_string()); + message = message.with_data("lat_direction".to_string(), parts[2].to_string()); + message = message.with_data("longitude".to_string(), parts[3].to_string()); + message = message.with_data("lon_direction".to_string(), parts[4].to_string()); + message = message.with_data("time".to_string(), parts[5].to_string()); + message = message.with_data("status".to_string(), parts[6].to_string()); + } + } + _ => { + // For other sentence types, just store the raw parts + for (i, part) in parts.iter().enumerate() { + message = message.with_data(format!("field_{}", i), part.to_string()); + } + } + } + + // Add timestamp + message = message.with_data( + "timestamp".to_string(), + SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + .to_string(), + ); + + // Set signal quality based on sentence completeness and checksum + let quality = if sentence.contains('*') { 95 } else { 75 }; + message = message.with_signal_quality(quality); + + Some(message) + } + + /// Stop the receiver task + async fn stop_receiver(&mut self) { + if let Some(shutdown_tx) = self.shutdown_tx.take() { + let _ = shutdown_tx.send(()).await; + } + + if let Some(handle) = self.receiver_handle.take() { + let _ = handle.await; + } + } +} + +impl Default for GpsDataLinkProvider { + fn default() -> Self { + Self::new() + } +} + +impl DataLinkReceiver for GpsDataLinkProvider { + fn status(&self) -> DataLinkStatus { + self.status.clone() + } + + fn receive_message(&mut self) -> DataLinkResult> { + if let Ok(mut queue) = self.message_queue.lock() { + Ok(queue.pop_front()) + } else { + Err(DataLinkError::TransportError("Failed to access message queue".to_string())) + } + } + + fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> { + info!("Connecting GPS datalink provider"); + + self.status = DataLinkStatus::Connecting; + self.config = Some(config.clone()); + + // Parse source configuration + self.source_config = Some(Self::parse_source_config(config)?); + + // Start the receiver in a blocking context + let rt = tokio::runtime::Runtime::new() + .map_err(|e| DataLinkError::ConnectionFailed(format!("Failed to create runtime: {}", e)))?; + + rt.block_on(async { + self.start_receiver().await + })?; + + self.status = DataLinkStatus::Connected; + info!("GPS datalink provider connected successfully"); + + Ok(()) + } + + fn disconnect(&mut self) -> DataLinkResult<()> { + info!("Disconnecting GPS datalink provider"); + + let rt = tokio::runtime::Runtime::new() + .map_err(|e| DataLinkError::TransportError(format!("Failed to create runtime: {}", e)))?; + + rt.block_on(async { + self.stop_receiver().await; + }); + + self.status = DataLinkStatus::Disconnected; + self.config = None; + self.source_config = None; + + info!("GPS datalink provider disconnected"); + Ok(()) + } +} + +impl DataLinkTransmitter for GpsDataLinkProvider { + fn status(&self) -> DataLinkStatus { + self.status.clone() + } + + fn send_message(&mut self, _message: &DataMessage) -> DataLinkResult<()> { + // GPS transmission is typically not supported for consumer devices + // This could be extended in the future for specialized GPS equipment + Err(DataLinkError::TransportError("GPS transmission not supported".to_string())) + } + + fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> { + // Use the same connection logic as receiver + DataLinkReceiver::connect(self, config) + } + + fn disconnect(&mut self) -> DataLinkResult<()> { + // Use the same disconnection logic as receiver + DataLinkReceiver::disconnect(self) + } +} diff --git a/crates/datalink-provider/src/lib.rs b/crates/datalink-provider/src/lib.rs index 00973bc..8c15d94 100644 --- a/crates/datalink-provider/src/lib.rs +++ b/crates/datalink-provider/src/lib.rs @@ -1,538 +1,29 @@ -//! Real AIS Datalink Provider +//! Real AIS and GPS Datalink Providers //! -//! This crate provides real-world implementations of AIS datalink providers -//! that can connect to actual AIS data sources such as: -//! - Serial ports (for direct AIS receiver connections) -//! - TCP/UDP network connections (for networked AIS data) -//! - File-based AIS data replay +//! This crate provides real-world implementations of AIS and GPS datalink providers +//! that can connect to actual data sources such as: +//! - Serial ports (for direct AIS/GPS receiver connections) +//! - TCP/UDP network connections (for networked AIS/GPS data) +//! - File-based AIS/GPS data replay -use datalink::{ - DataLinkConfig, DataLinkError, DataLinkReceiver, DataLinkResult, - DataLinkStatus, DataLinkTransmitter, DataMessage, -}; -use log::{error, info, warn}; -use serde::{Deserialize, Serialize}; -use std::collections::VecDeque; -use std::sync::{Arc, Mutex}; -use std::time::{Duration, SystemTime}; -use tokio::io::{AsyncBufReadExt, BufReader}; -use tokio::net::{TcpStream, UdpSocket}; -use tokio::sync::mpsc; -use tokio_serial::SerialPortBuilderExt; +mod ais; +mod gps; -/// Configuration for different types of AIS data sources -#[derive(Debug, Clone, Serialize, Deserialize)] -pub enum AisSourceConfig { - /// Serial port configuration - Serial { - port: String, - baud_rate: u32, - }, - /// TCP connection configuration - Tcp { - host: String, - port: u16, - }, - /// UDP connection configuration - Udp { - bind_addr: String, - port: u16, - }, - /// File replay configuration - File { - path: String, - replay_speed: f64, // 1.0 = real-time, 2.0 = 2x speed, etc. - }, -} +// Re-export the main types for external use +pub use ais::{AisDataLinkProvider, AisSourceConfig}; +pub use gps::{GpsDataLinkProvider, GpsSourceConfig}; -/// Real AIS Datalink Provider -pub struct AisDataLinkProvider { - status: DataLinkStatus, - config: Option, - source_config: Option, - message_queue: Arc>>, - receiver_handle: Option>, - shutdown_tx: Option>, -} +use datalink::{DataLinkConfig, DataLinkReceiver, DataLinkStatus}; -impl AisDataLinkProvider { - /// Create a new AIS datalink provider - pub fn new() -> Self { - Self { - status: DataLinkStatus::Disconnected, - config: None, - source_config: None, - message_queue: Arc::new(Mutex::new(VecDeque::new())), - receiver_handle: None, - shutdown_tx: None, - } - } - /// Parse AIS source configuration from DataLinkConfig - fn parse_source_config(config: &DataLinkConfig) -> DataLinkResult { - let connection_type = config.parameters.get("connection_type") - .ok_or_else(|| DataLinkError::InvalidConfig("Missing connection_type".to_string()))?; - match connection_type.as_str() { - "serial" => { - let port = config.parameters.get("port") - .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for serial connection".to_string()))?; - let baud_rate = config.parameters.get("baud_rate") - .unwrap_or(&"4800".to_string()) - .parse::() - .map_err(|_| DataLinkError::InvalidConfig("Invalid baud_rate".to_string()))?; - - Ok(AisSourceConfig::Serial { - port: port.clone(), - baud_rate, - }) - } - "tcp" => { - let host = config.parameters.get("host") - .ok_or_else(|| DataLinkError::InvalidConfig("Missing host for TCP connection".to_string()))?; - let port = config.parameters.get("port") - .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for TCP connection".to_string()))? - .parse::() - .map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?; - - Ok(AisSourceConfig::Tcp { - host: host.clone(), - port, - }) - } - "udp" => { - let bind_addr = config.parameters.get("bind_addr") - .unwrap_or(&"0.0.0.0".to_string()) - .clone(); - let port = config.parameters.get("port") - .ok_or_else(|| DataLinkError::InvalidConfig("Missing port for UDP connection".to_string()))? - .parse::() - .map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?; - - Ok(AisSourceConfig::Udp { - bind_addr, - port, - }) - } - "file" => { - let path = config.parameters.get("path") - .ok_or_else(|| DataLinkError::InvalidConfig("Missing path for file replay".to_string()))?; - let replay_speed = config.parameters.get("replay_speed") - .unwrap_or(&"1.0".to_string()) - .parse::() - .map_err(|_| DataLinkError::InvalidConfig("Invalid replay_speed".to_string()))?; - - Ok(AisSourceConfig::File { - path: path.clone(), - replay_speed, - }) - } - _ => Err(DataLinkError::InvalidConfig(format!("Unsupported connection type: {}", connection_type))), - } - } - - /// Start the data receiver task based on the source configuration - async fn start_receiver(&mut self) -> DataLinkResult<()> { - let source_config = self.source_config.as_ref() - .ok_or_else(|| DataLinkError::InvalidConfig("No source configuration".to_string()))?; - - let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1); - let message_queue = Arc::clone(&self.message_queue); - - let receiver_handle = match source_config { - AisSourceConfig::Serial { port, baud_rate } => { - let port = port.clone(); - let baud_rate = *baud_rate; - - tokio::spawn(async move { - if let Err(e) = Self::serial_receiver(port, baud_rate, message_queue, &mut shutdown_rx).await { - error!("Serial receiver error: {}", e); - } - }) - } - AisSourceConfig::Tcp { host, port } => { - let host = host.clone(); - let port = *port; - - tokio::spawn(async move { - if let Err(e) = Self::tcp_receiver(host, port, message_queue, &mut shutdown_rx).await { - error!("TCP receiver error: {}", e); - } - }) - } - AisSourceConfig::Udp { bind_addr, port } => { - let bind_addr = bind_addr.clone(); - let port = *port; - - tokio::spawn(async move { - if let Err(e) = Self::udp_receiver(bind_addr, port, message_queue, &mut shutdown_rx).await { - error!("UDP receiver error: {}", e); - } - }) - } - AisSourceConfig::File { path, replay_speed } => { - let path = path.clone(); - let replay_speed = *replay_speed; - - tokio::spawn(async move { - if let Err(e) = Self::file_receiver(path, replay_speed, message_queue, &mut shutdown_rx).await { - error!("File receiver error: {}", e); - } - }) - } - }; - - self.receiver_handle = Some(receiver_handle); - self.shutdown_tx = Some(shutdown_tx); - - Ok(()) - } - - /// Serial port receiver implementation - async fn serial_receiver( - port: String, - baud_rate: u32, - message_queue: Arc>>, - shutdown_rx: &mut mpsc::Receiver<()>, - ) -> Result<(), Box> { - info!("Starting serial receiver on port {} at {} baud", port, baud_rate); - - let serial_port = tokio_serial::new(&port, baud_rate) - .open_native_async()?; - - let mut reader = BufReader::new(serial_port); - let mut line = String::new(); - - loop { - tokio::select! { - _ = shutdown_rx.recv() => { - info!("Serial receiver shutdown requested"); - break; - } - result = reader.read_line(&mut line) => { - match result { - Ok(0) => { - warn!("Serial port closed"); - break; - } - Ok(_) => { - if let Some(message) = Self::parse_ais_sentence(&line.trim()) { - if let Ok(mut queue) = message_queue.lock() { - queue.push_back(message); - // Limit queue size to prevent memory issues - if queue.len() > 1000 { - queue.pop_front(); - } - } - } - line.clear(); - } - Err(e) => { - error!("Serial read error: {}", e); - break; - } - } - } - } - } - - Ok(()) - } - - /// TCP receiver implementation - async fn tcp_receiver( - host: String, - port: u16, - message_queue: Arc>>, - shutdown_rx: &mut mpsc::Receiver<()>, - ) -> Result<(), Box> { - info!("Starting TCP receiver connecting to {}:{}", host, port); - - let stream = TcpStream::connect(format!("{}:{}", host, port)).await?; - let mut reader = BufReader::new(stream); - let mut line = String::new(); - - loop { - tokio::select! { - _ = shutdown_rx.recv() => { - info!("TCP receiver shutdown requested"); - break; - } - result = reader.read_line(&mut line) => { - match result { - Ok(0) => { - warn!("TCP connection closed"); - break; - } - Ok(_) => { - if let Some(message) = Self::parse_ais_sentence(&line.trim()) { - if let Ok(mut queue) = message_queue.lock() { - queue.push_back(message); - if queue.len() > 1000 { - queue.pop_front(); - } - } - } - line.clear(); - } - Err(e) => { - error!("TCP read error: {}", e); - break; - } - } - } - } - } - - Ok(()) - } - - /// UDP receiver implementation - async fn udp_receiver( - bind_addr: String, - port: u16, - message_queue: Arc>>, - shutdown_rx: &mut mpsc::Receiver<()>, - ) -> Result<(), Box> { - info!("Starting UDP receiver on {}:{}", bind_addr, port); - - let socket = UdpSocket::bind(format!("{}:{}", bind_addr, port)).await?; - let mut buf = [0; 1024]; - - loop { - tokio::select! { - _ = shutdown_rx.recv() => { - info!("UDP receiver shutdown requested"); - break; - } - result = socket.recv(&mut buf) => { - match result { - Ok(len) => { - let data = String::from_utf8_lossy(&buf[..len]); - for line in data.lines() { - if let Some(message) = Self::parse_ais_sentence(line.trim()) { - if let Ok(mut queue) = message_queue.lock() { - queue.push_back(message); - if queue.len() > 1000 { - queue.pop_front(); - } - } - } - } - } - Err(e) => { - error!("UDP receive error: {}", e); - break; - } - } - } - } - } - - Ok(()) - } - - /// File receiver implementation for replaying AIS data - async fn file_receiver( - path: String, - replay_speed: f64, - message_queue: Arc>>, - shutdown_rx: &mut mpsc::Receiver<()>, - ) -> Result<(), Box> { - info!("Starting file receiver for {} at {}x speed", path, replay_speed); - - let file = tokio::fs::File::open(&path).await?; - let reader = BufReader::new(file); - let mut lines = reader.lines(); - - let delay_duration = Duration::from_millis((1000.0 / replay_speed) as u64); - - loop { - tokio::select! { - _ = shutdown_rx.recv() => { - info!("File receiver shutdown requested"); - break; - } - result = lines.next_line() => { - match result { - Ok(Some(line)) => { - if let Some(message) = Self::parse_ais_sentence(&line.trim()) { - if let Ok(mut queue) = message_queue.lock() { - queue.push_back(message); - if queue.len() > 1000 { - queue.pop_front(); - } - } - } - tokio::time::sleep(delay_duration).await; - } - Ok(None) => { - info!("End of file reached"); - break; - } - Err(e) => { - error!("File read error: {}", e); - break; - } - } - } - } - } - - Ok(()) - } - - /// Parse an AIS sentence into a DataMessage - fn parse_ais_sentence(sentence: &str) -> Option { - if !sentence.starts_with('!') && !sentence.starts_with('$') { - return None; - } - - // Basic NMEA sentence validation - let parts: Vec<&str> = sentence.split(',').collect(); - if parts.len() < 6 { - return None; - } - - // Extract basic information from AIS sentence - let sentence_type = parts[0]; - if !sentence_type.contains("AIVDM") && !sentence_type.contains("AIVDO") { - return None; - } - - // Create a DataMessage from the AIS sentence - let mut message = DataMessage::new( - "AIS_SENTENCE".to_string(), - "AIS_RECEIVER".to_string(), - sentence.as_bytes().to_vec(), - ); - - // Add parsed data if available - if parts.len() >= 6 { - message = message.with_data("sentence_type".to_string(), sentence_type.to_string()); - message = message.with_data("fragment_count".to_string(), parts[1].to_string()); - message = message.with_data("fragment_number".to_string(), parts[2].to_string()); - message = message.with_data("message_id".to_string(), parts[3].to_string()); - message = message.with_data("channel".to_string(), parts[4].to_string()); - message = message.with_data("payload".to_string(), parts[5].to_string()); - } - - // Add timestamp - message = message.with_data( - "timestamp".to_string(), - SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_secs() - .to_string(), - ); - - // Set signal quality based on sentence completeness - let quality = if sentence.contains('*') { 90 } else { 70 }; - message = message.with_signal_quality(quality); - - Some(message) - } - - /// Stop the receiver task - async fn stop_receiver(&mut self) { - if let Some(shutdown_tx) = self.shutdown_tx.take() { - let _ = shutdown_tx.send(()).await; - } - - if let Some(handle) = self.receiver_handle.take() { - let _ = handle.await; - } - } -} - -impl Default for AisDataLinkProvider { - fn default() -> Self { - Self::new() - } -} - -impl DataLinkReceiver for AisDataLinkProvider { - fn status(&self) -> DataLinkStatus { - self.status.clone() - } - - fn receive_message(&mut self) -> DataLinkResult> { - if let Ok(mut queue) = self.message_queue.lock() { - Ok(queue.pop_front()) - } else { - Err(DataLinkError::TransportError("Failed to access message queue".to_string())) - } - } - - fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> { - info!("Connecting AIS datalink provider"); - - self.status = DataLinkStatus::Connecting; - self.config = Some(config.clone()); - - // Parse source configuration - self.source_config = Some(Self::parse_source_config(config)?); - - // Start the receiver in a blocking context - let rt = tokio::runtime::Runtime::new() - .map_err(|e| DataLinkError::ConnectionFailed(format!("Failed to create runtime: {}", e)))?; - - rt.block_on(async { - self.start_receiver().await - })?; - - self.status = DataLinkStatus::Connected; - info!("AIS datalink provider connected successfully"); - - Ok(()) - } - - fn disconnect(&mut self) -> DataLinkResult<()> { - info!("Disconnecting AIS datalink provider"); - - let rt = tokio::runtime::Runtime::new() - .map_err(|e| DataLinkError::TransportError(format!("Failed to create runtime: {}", e)))?; - - rt.block_on(async { - self.stop_receiver().await; - }); - - self.status = DataLinkStatus::Disconnected; - self.config = None; - self.source_config = None; - - info!("AIS datalink provider disconnected"); - Ok(()) - } -} - -impl DataLinkTransmitter for AisDataLinkProvider { - fn status(&self) -> DataLinkStatus { - self.status.clone() - } - - fn send_message(&mut self, _message: &DataMessage) -> DataLinkResult<()> { - // For now, AIS transmission is not implemented as it requires special equipment - // and licensing. This could be extended in the future for AIS transponders. - Err(DataLinkError::TransportError("AIS transmission not supported".to_string())) - } - - fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> { - // Use the same connection logic as receiver - DataLinkReceiver::connect(self, config) - } - - fn disconnect(&mut self) -> DataLinkResult<()> { - // Use the same disconnection logic as receiver - DataLinkReceiver::disconnect(self) - } -} #[cfg(test)] mod tests { use super::*; use datalink::DataLinkConfig; + use crate::ais::{AisDataLinkProvider, AisSourceConfig}; + use crate::gps::{GpsDataLinkProvider, GpsSourceConfig}; #[test] fn test_ais_provider_creation() { @@ -593,4 +84,130 @@ mod tests { let message = AisDataLinkProvider::parse_ais_sentence(sentence); assert!(message.is_none()); } + + // GPS Provider Tests + #[test] + fn test_gps_provider_creation() { + let provider = GpsDataLinkProvider::new(); + assert!(matches!(DataLinkReceiver::status(&provider), DataLinkStatus::Disconnected)); + } + + #[test] + fn test_parse_gps_source_config_serial() { + let config = DataLinkConfig::new("serial".to_string()) + .with_parameter("connection_type".to_string(), "serial".to_string()) + .with_parameter("port".to_string(), "/dev/ttyUSB0".to_string()) + .with_parameter("baud_rate".to_string(), "9600".to_string()); + + let source_config = GpsDataLinkProvider::parse_source_config(&config).unwrap(); + + match source_config { + GpsSourceConfig::Serial { port, baud_rate } => { + assert_eq!(port, "/dev/ttyUSB0"); + assert_eq!(baud_rate, 9600); + } + _ => panic!("Expected Serial configuration"), + } + } + + #[test] + fn test_parse_gps_source_config_tcp() { + let config = DataLinkConfig::new("tcp".to_string()) + .with_parameter("connection_type".to_string(), "tcp".to_string()) + .with_parameter("host".to_string(), "gps.example.com".to_string()) + .with_parameter("port".to_string(), "2947".to_string()); + + let source_config = GpsDataLinkProvider::parse_source_config(&config).unwrap(); + + match source_config { + GpsSourceConfig::Tcp { host, port } => { + assert_eq!(host, "gps.example.com"); + assert_eq!(port, 2947); + } + _ => panic!("Expected TCP configuration"), + } + } + + #[test] + fn test_parse_gps_gga_sentence() { + let sentence = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47"; + let message = GpsDataLinkProvider::parse_gps_sentence(sentence).unwrap(); + + assert_eq!(message.message_type, "GPS_SENTENCE"); + assert_eq!(message.source_id, "GPS_RECEIVER"); + assert_eq!(message.get_data("sentence_type"), Some(&"$GPGGA".to_string())); + assert_eq!(message.get_data("time"), Some(&"123519".to_string())); + assert_eq!(message.get_data("latitude"), Some(&"4807.038".to_string())); + assert_eq!(message.get_data("lat_direction"), Some(&"N".to_string())); + assert_eq!(message.get_data("longitude"), Some(&"01131.000".to_string())); + assert_eq!(message.get_data("lon_direction"), Some(&"E".to_string())); + assert_eq!(message.get_data("fix_quality"), Some(&"1".to_string())); + assert_eq!(message.get_data("satellites"), Some(&"08".to_string())); + assert_eq!(message.get_data("hdop"), Some(&"0.9".to_string())); + assert_eq!(message.get_data("altitude"), Some(&"545.4".to_string())); + } + + #[test] + fn test_parse_gps_rmc_sentence() { + let sentence = "$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A"; + let message = GpsDataLinkProvider::parse_gps_sentence(sentence).unwrap(); + + assert_eq!(message.message_type, "GPS_SENTENCE"); + assert_eq!(message.source_id, "GPS_RECEIVER"); + assert_eq!(message.get_data("sentence_type"), Some(&"$GPRMC".to_string())); + assert_eq!(message.get_data("time"), Some(&"123519".to_string())); + assert_eq!(message.get_data("status"), Some(&"A".to_string())); + assert_eq!(message.get_data("latitude"), Some(&"4807.038".to_string())); + assert_eq!(message.get_data("speed"), Some(&"022.4".to_string())); + assert_eq!(message.get_data("course"), Some(&"084.4".to_string())); + assert_eq!(message.get_data("date"), Some(&"230394".to_string())); + } + + #[test] + fn test_parse_gps_gll_sentence() { + let sentence = "$GPGLL,4916.45,N,12311.12,W,225444,A,*1D"; + let message = GpsDataLinkProvider::parse_gps_sentence(sentence).unwrap(); + + assert_eq!(message.message_type, "GPS_SENTENCE"); + assert_eq!(message.source_id, "GPS_RECEIVER"); + assert_eq!(message.get_data("sentence_type"), Some(&"$GPGLL".to_string())); + assert_eq!(message.get_data("latitude"), Some(&"4916.45".to_string())); + assert_eq!(message.get_data("lat_direction"), Some(&"N".to_string())); + assert_eq!(message.get_data("longitude"), Some(&"12311.12".to_string())); + assert_eq!(message.get_data("lon_direction"), Some(&"W".to_string())); + assert_eq!(message.get_data("time"), Some(&"225444".to_string())); + assert_eq!(message.get_data("status"), Some(&"A".to_string())); + } + + #[test] + fn test_parse_gnss_sentence() { + let sentence = "$GNGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47"; + let message = GpsDataLinkProvider::parse_gps_sentence(sentence).unwrap(); + + assert_eq!(message.message_type, "GPS_SENTENCE"); + assert_eq!(message.source_id, "GPS_RECEIVER"); + assert_eq!(message.get_data("sentence_type"), Some(&"$GNGGA".to_string())); + assert_eq!(message.get_data("latitude"), Some(&"4807.038".to_string())); + } + + #[test] + fn test_invalid_gps_sentence() { + let sentence = "This is not a GPS sentence"; + let message = GpsDataLinkProvider::parse_gps_sentence(sentence); + assert!(message.is_none()); + } + + #[test] + fn test_invalid_gps_sentence_no_dollar() { + let sentence = "GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47"; + let message = GpsDataLinkProvider::parse_gps_sentence(sentence); + assert!(message.is_none()); + } + + #[test] + fn test_unsupported_gps_sentence() { + let sentence = "$GPXXX,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47"; + let message = GpsDataLinkProvider::parse_gps_sentence(sentence); + assert!(message.is_none()); + } }