mirror of
https://github.com/seemueller-io/yachtpit.git
synced 2025-09-08 22:46:45 +00:00
Add GPS and AIS data link providers with support for multiple data sources
This commit is contained in:
519
crates/datalink-provider/src/ais/mod.rs
Normal file
519
crates/datalink-provider/src/ais/mod.rs
Normal file
@@ -0,0 +1,519 @@
|
|||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
use log::{error, info, warn};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||||
|
use tokio::net::{TcpStream, UdpSocket};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio_serial::SerialPortBuilderExt;
|
||||||
|
use datalink::{DataLinkConfig, DataLinkError, DataLinkReceiver, DataLinkResult, DataLinkStatus, DataLinkTransmitter, DataMessage};
|
||||||
|
|
||||||
|
/// Configuration for different types of AIS data sources
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum AisSourceConfig {
|
||||||
|
/// Serial port configuration
|
||||||
|
Serial {
|
||||||
|
port: String,
|
||||||
|
baud_rate: u32,
|
||||||
|
},
|
||||||
|
/// TCP connection configuration
|
||||||
|
Tcp {
|
||||||
|
host: String,
|
||||||
|
port: u16,
|
||||||
|
},
|
||||||
|
/// UDP connection configuration
|
||||||
|
Udp {
|
||||||
|
bind_addr: String,
|
||||||
|
port: u16,
|
||||||
|
},
|
||||||
|
/// File replay configuration
|
||||||
|
File {
|
||||||
|
path: String,
|
||||||
|
replay_speed: f64, // 1.0 = real-time, 2.0 = 2x speed, etc.
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Real AIS Datalink Provider
|
||||||
|
pub struct AisDataLinkProvider {
|
||||||
|
status: DataLinkStatus,
|
||||||
|
config: Option<DataLinkConfig>,
|
||||||
|
source_config: Option<AisSourceConfig>,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
receiver_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
shutdown_tx: Option<mpsc::Sender<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AisDataLinkProvider {
|
||||||
|
/// Create a new AIS datalink provider
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
status: DataLinkStatus::Disconnected,
|
||||||
|
config: None,
|
||||||
|
source_config: None,
|
||||||
|
message_queue: Arc::new(Mutex::new(VecDeque::new())),
|
||||||
|
receiver_handle: None,
|
||||||
|
shutdown_tx: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse AIS source configuration from DataLinkConfig
|
||||||
|
pub fn parse_source_config(config: &DataLinkConfig) -> DataLinkResult<AisSourceConfig> {
|
||||||
|
let connection_type = config.parameters.get("connection_type")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing connection_type".to_string()))?;
|
||||||
|
|
||||||
|
match connection_type.as_str() {
|
||||||
|
"serial" => {
|
||||||
|
let port = config.parameters.get("port")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for serial connection".to_string()))?;
|
||||||
|
let baud_rate = config.parameters.get("baud_rate")
|
||||||
|
.unwrap_or(&"4800".to_string())
|
||||||
|
.parse::<u32>()
|
||||||
|
.map_err(|_| DataLinkError::InvalidConfig("Invalid baud_rate".to_string()))?;
|
||||||
|
|
||||||
|
Ok(AisSourceConfig::Serial {
|
||||||
|
port: port.clone(),
|
||||||
|
baud_rate,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"tcp" => {
|
||||||
|
let host = config.parameters.get("host")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing host for TCP connection".to_string()))?;
|
||||||
|
let port = config.parameters.get("port")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for TCP connection".to_string()))?
|
||||||
|
.parse::<u16>()
|
||||||
|
.map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?;
|
||||||
|
|
||||||
|
Ok(AisSourceConfig::Tcp {
|
||||||
|
host: host.clone(),
|
||||||
|
port,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"udp" => {
|
||||||
|
let bind_addr = config.parameters.get("bind_addr")
|
||||||
|
.unwrap_or(&"0.0.0.0".to_string())
|
||||||
|
.clone();
|
||||||
|
let port = config.parameters.get("port")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for UDP connection".to_string()))?
|
||||||
|
.parse::<u16>()
|
||||||
|
.map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?;
|
||||||
|
|
||||||
|
Ok(AisSourceConfig::Udp {
|
||||||
|
bind_addr,
|
||||||
|
port,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"file" => {
|
||||||
|
let path = config.parameters.get("path")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing path for file replay".to_string()))?;
|
||||||
|
let replay_speed = config.parameters.get("replay_speed")
|
||||||
|
.unwrap_or(&"1.0".to_string())
|
||||||
|
.parse::<f64>()
|
||||||
|
.map_err(|_| DataLinkError::InvalidConfig("Invalid replay_speed".to_string()))?;
|
||||||
|
|
||||||
|
Ok(AisSourceConfig::File {
|
||||||
|
path: path.clone(),
|
||||||
|
replay_speed,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
_ => Err(DataLinkError::InvalidConfig(format!("Unsupported connection type: {}", connection_type))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the data receiver task based on the source configuration
|
||||||
|
async fn start_receiver(&mut self) -> DataLinkResult<()> {
|
||||||
|
let source_config = self.source_config.as_ref()
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("No source configuration".to_string()))?;
|
||||||
|
|
||||||
|
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
|
||||||
|
let message_queue = Arc::clone(&self.message_queue);
|
||||||
|
|
||||||
|
let receiver_handle = match source_config {
|
||||||
|
AisSourceConfig::Serial { port, baud_rate } => {
|
||||||
|
let port = port.clone();
|
||||||
|
let baud_rate = *baud_rate;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::serial_receiver(port, baud_rate, message_queue, &mut shutdown_rx).await {
|
||||||
|
error!("Serial receiver error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
AisSourceConfig::Tcp { host, port } => {
|
||||||
|
let host = host.clone();
|
||||||
|
let port = *port;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::tcp_receiver(host, port, message_queue, &mut shutdown_rx).await {
|
||||||
|
error!("TCP receiver error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
AisSourceConfig::Udp { bind_addr, port } => {
|
||||||
|
let bind_addr = bind_addr.clone();
|
||||||
|
let port = *port;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::udp_receiver(bind_addr, port, message_queue, &mut shutdown_rx).await {
|
||||||
|
error!("UDP receiver error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
AisSourceConfig::File { path, replay_speed } => {
|
||||||
|
let path = path.clone();
|
||||||
|
let replay_speed = *replay_speed;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::file_receiver(path, replay_speed, message_queue, &mut shutdown_rx).await {
|
||||||
|
error!("File receiver error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.receiver_handle = Some(receiver_handle);
|
||||||
|
self.shutdown_tx = Some(shutdown_tx);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serial port receiver implementation
|
||||||
|
async fn serial_receiver(
|
||||||
|
port: String,
|
||||||
|
baud_rate: u32,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
shutdown_rx: &mut mpsc::Receiver<()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Starting serial receiver on port {} at {} baud", port, baud_rate);
|
||||||
|
|
||||||
|
let serial_port = tokio_serial::new(&port, baud_rate)
|
||||||
|
.open_native_async()?;
|
||||||
|
|
||||||
|
let mut reader = BufReader::new(serial_port);
|
||||||
|
let mut line = String::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
info!("Serial receiver shutdown requested");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = reader.read_line(&mut line) => {
|
||||||
|
match result {
|
||||||
|
Ok(0) => {
|
||||||
|
warn!("Serial port closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
if let Some(message) = Self::parse_ais_sentence(&line.trim()) {
|
||||||
|
if let Ok(mut queue) = message_queue.lock() {
|
||||||
|
queue.push_back(message);
|
||||||
|
// Limit queue size to prevent memory issues
|
||||||
|
if queue.len() > 1000 {
|
||||||
|
queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
line.clear();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("Serial read error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TCP receiver implementation
|
||||||
|
async fn tcp_receiver(
|
||||||
|
host: String,
|
||||||
|
port: u16,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
shutdown_rx: &mut mpsc::Receiver<()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Starting TCP receiver connecting to {}:{}", host, port);
|
||||||
|
|
||||||
|
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
|
||||||
|
let mut reader = BufReader::new(stream);
|
||||||
|
let mut line = String::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
info!("TCP receiver shutdown requested");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = reader.read_line(&mut line) => {
|
||||||
|
match result {
|
||||||
|
Ok(0) => {
|
||||||
|
warn!("TCP connection closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
if let Some(message) = Self::parse_ais_sentence(&line.trim()) {
|
||||||
|
if let Ok(mut queue) = message_queue.lock() {
|
||||||
|
queue.push_back(message);
|
||||||
|
if queue.len() > 1000 {
|
||||||
|
queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
line.clear();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("TCP read error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// UDP receiver implementation
|
||||||
|
async fn udp_receiver(
|
||||||
|
bind_addr: String,
|
||||||
|
port: u16,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
shutdown_rx: &mut mpsc::Receiver<()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Starting UDP receiver on {}:{}", bind_addr, port);
|
||||||
|
|
||||||
|
let socket = UdpSocket::bind(format!("{}:{}", bind_addr, port)).await?;
|
||||||
|
let mut buf = [0; 1024];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
info!("UDP receiver shutdown requested");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = socket.recv(&mut buf) => {
|
||||||
|
match result {
|
||||||
|
Ok(len) => {
|
||||||
|
let data = String::from_utf8_lossy(&buf[..len]);
|
||||||
|
for line in data.lines() {
|
||||||
|
if let Some(message) = Self::parse_ais_sentence(line.trim()) {
|
||||||
|
if let Ok(mut queue) = message_queue.lock() {
|
||||||
|
queue.push_back(message);
|
||||||
|
if queue.len() > 1000 {
|
||||||
|
queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("UDP receive error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// File receiver implementation for replaying AIS data
|
||||||
|
async fn file_receiver(
|
||||||
|
path: String,
|
||||||
|
replay_speed: f64,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
shutdown_rx: &mut mpsc::Receiver<()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Starting file receiver for {} at {}x speed", path, replay_speed);
|
||||||
|
|
||||||
|
let file = tokio::fs::File::open(&path).await?;
|
||||||
|
let reader = BufReader::new(file);
|
||||||
|
let mut lines = reader.lines();
|
||||||
|
|
||||||
|
let delay_duration = Duration::from_millis((1000.0 / replay_speed) as u64);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
info!("File receiver shutdown requested");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = lines.next_line() => {
|
||||||
|
match result {
|
||||||
|
Ok(Some(line)) => {
|
||||||
|
if let Some(message) = Self::parse_ais_sentence(&line.trim()) {
|
||||||
|
if let Ok(mut queue) = message_queue.lock() {
|
||||||
|
queue.push_back(message);
|
||||||
|
if queue.len() > 1000 {
|
||||||
|
queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tokio::time::sleep(delay_duration).await;
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
info!("End of file reached");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("File read error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse an AIS sentence into a DataMessage
|
||||||
|
pub fn parse_ais_sentence(sentence: &str) -> Option<DataMessage> {
|
||||||
|
if !sentence.starts_with('!') && !sentence.starts_with('$') {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Basic NMEA sentence validation
|
||||||
|
let parts: Vec<&str> = sentence.split(',').collect();
|
||||||
|
if parts.len() < 6 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract basic information from AIS sentence
|
||||||
|
let sentence_type = parts[0];
|
||||||
|
if !sentence_type.contains("AIVDM") && !sentence_type.contains("AIVDO") {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a DataMessage from the AIS sentence
|
||||||
|
let mut message = DataMessage::new(
|
||||||
|
"AIS_SENTENCE".to_string(),
|
||||||
|
"AIS_RECEIVER".to_string(),
|
||||||
|
sentence.as_bytes().to_vec(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Add parsed data if available
|
||||||
|
if parts.len() >= 6 {
|
||||||
|
message = message.with_data("sentence_type".to_string(), sentence_type.to_string());
|
||||||
|
message = message.with_data("fragment_count".to_string(), parts[1].to_string());
|
||||||
|
message = message.with_data("fragment_number".to_string(), parts[2].to_string());
|
||||||
|
message = message.with_data("message_id".to_string(), parts[3].to_string());
|
||||||
|
message = message.with_data("channel".to_string(), parts[4].to_string());
|
||||||
|
message = message.with_data("payload".to_string(), parts[5].to_string());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add timestamp
|
||||||
|
message = message.with_data(
|
||||||
|
"timestamp".to_string(),
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs()
|
||||||
|
.to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Set signal quality based on sentence completeness
|
||||||
|
let quality = if sentence.contains('*') { 90 } else { 70 };
|
||||||
|
message = message.with_signal_quality(quality);
|
||||||
|
|
||||||
|
Some(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop the receiver task
|
||||||
|
async fn stop_receiver(&mut self) {
|
||||||
|
if let Some(shutdown_tx) = self.shutdown_tx.take() {
|
||||||
|
let _ = shutdown_tx.send(()).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(handle) = self.receiver_handle.take() {
|
||||||
|
let _ = handle.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for AisDataLinkProvider {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DataLinkReceiver for AisDataLinkProvider {
|
||||||
|
fn status(&self) -> DataLinkStatus {
|
||||||
|
self.status.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn receive_message(&mut self) -> DataLinkResult<Option<DataMessage>> {
|
||||||
|
if let Ok(mut queue) = self.message_queue.lock() {
|
||||||
|
Ok(queue.pop_front())
|
||||||
|
} else {
|
||||||
|
Err(DataLinkError::TransportError("Failed to access message queue".to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> {
|
||||||
|
info!("Connecting AIS datalink provider");
|
||||||
|
|
||||||
|
self.status = DataLinkStatus::Connecting;
|
||||||
|
self.config = Some(config.clone());
|
||||||
|
|
||||||
|
// Parse source configuration
|
||||||
|
self.source_config = Some(Self::parse_source_config(config)?);
|
||||||
|
|
||||||
|
// Start the receiver in a blocking context
|
||||||
|
let rt = tokio::runtime::Runtime::new()
|
||||||
|
.map_err(|e| DataLinkError::ConnectionFailed(format!("Failed to create runtime: {}", e)))?;
|
||||||
|
|
||||||
|
rt.block_on(async {
|
||||||
|
self.start_receiver().await
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.status = DataLinkStatus::Connected;
|
||||||
|
info!("AIS datalink provider connected successfully");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disconnect(&mut self) -> DataLinkResult<()> {
|
||||||
|
info!("Disconnecting AIS datalink provider");
|
||||||
|
|
||||||
|
let rt = tokio::runtime::Runtime::new()
|
||||||
|
.map_err(|e| DataLinkError::TransportError(format!("Failed to create runtime: {}", e)))?;
|
||||||
|
|
||||||
|
rt.block_on(async {
|
||||||
|
self.stop_receiver().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
self.status = DataLinkStatus::Disconnected;
|
||||||
|
self.config = None;
|
||||||
|
self.source_config = None;
|
||||||
|
|
||||||
|
info!("AIS datalink provider disconnected");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DataLinkTransmitter for AisDataLinkProvider {
|
||||||
|
fn status(&self) -> DataLinkStatus {
|
||||||
|
self.status.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_message(&mut self, _message: &DataMessage) -> DataLinkResult<()> {
|
||||||
|
// For now, AIS transmission is not implemented as it requires special equipment
|
||||||
|
// and licensing. This could be extended in the future for AIS transponders.
|
||||||
|
Err(DataLinkError::TransportError("AIS transmission not supported".to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> {
|
||||||
|
// Use the same connection logic as receiver
|
||||||
|
DataLinkReceiver::connect(self, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disconnect(&mut self) -> DataLinkResult<()> {
|
||||||
|
// Use the same disconnection logic as receiver
|
||||||
|
DataLinkReceiver::disconnect(self)
|
||||||
|
}
|
||||||
|
}
|
572
crates/datalink-provider/src/gps/mod.rs
Normal file
572
crates/datalink-provider/src/gps/mod.rs
Normal file
@@ -0,0 +1,572 @@
|
|||||||
|
use std::collections::VecDeque;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
use log::{error, info, warn};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::io::{AsyncBufReadExt, BufReader};
|
||||||
|
use tokio::net::{TcpStream, UdpSocket};
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio_serial::SerialPortBuilderExt;
|
||||||
|
use datalink::{DataLinkConfig, DataLinkError, DataLinkReceiver, DataLinkResult, DataLinkStatus, DataLinkTransmitter, DataMessage};
|
||||||
|
|
||||||
|
/// Configuration for different types of GPS data sources
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum GpsSourceConfig {
|
||||||
|
/// Serial port configuration
|
||||||
|
Serial {
|
||||||
|
port: String,
|
||||||
|
baud_rate: u32,
|
||||||
|
},
|
||||||
|
/// TCP connection configuration
|
||||||
|
Tcp {
|
||||||
|
host: String,
|
||||||
|
port: u16,
|
||||||
|
},
|
||||||
|
/// UDP connection configuration
|
||||||
|
Udp {
|
||||||
|
bind_addr: String,
|
||||||
|
port: u16,
|
||||||
|
},
|
||||||
|
/// File replay configuration
|
||||||
|
File {
|
||||||
|
path: String,
|
||||||
|
replay_speed: f64, // 1.0 = real-time, 2.0 = 2x speed, etc.
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Real GPS Datalink Provider
|
||||||
|
pub struct GpsDataLinkProvider {
|
||||||
|
status: DataLinkStatus,
|
||||||
|
config: Option<DataLinkConfig>,
|
||||||
|
source_config: Option<GpsSourceConfig>,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
receiver_handle: Option<tokio::task::JoinHandle<()>>,
|
||||||
|
shutdown_tx: Option<mpsc::Sender<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GpsDataLinkProvider {
|
||||||
|
/// Create a new GPS datalink provider
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
status: DataLinkStatus::Disconnected,
|
||||||
|
config: None,
|
||||||
|
source_config: None,
|
||||||
|
message_queue: Arc::new(Mutex::new(VecDeque::new())),
|
||||||
|
receiver_handle: None,
|
||||||
|
shutdown_tx: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse GPS source configuration from DataLinkConfig
|
||||||
|
pub fn parse_source_config(config: &DataLinkConfig) -> DataLinkResult<GpsSourceConfig> {
|
||||||
|
let connection_type = config.parameters.get("connection_type")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing connection_type".to_string()))?;
|
||||||
|
|
||||||
|
match connection_type.as_str() {
|
||||||
|
"serial" => {
|
||||||
|
let port = config.parameters.get("port")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for serial connection".to_string()))?;
|
||||||
|
let baud_rate = config.parameters.get("baud_rate")
|
||||||
|
.unwrap_or(&"4800".to_string())
|
||||||
|
.parse::<u32>()
|
||||||
|
.map_err(|_| DataLinkError::InvalidConfig("Invalid baud_rate".to_string()))?;
|
||||||
|
|
||||||
|
Ok(GpsSourceConfig::Serial {
|
||||||
|
port: port.clone(),
|
||||||
|
baud_rate,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"tcp" => {
|
||||||
|
let host = config.parameters.get("host")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing host for TCP connection".to_string()))?;
|
||||||
|
let port = config.parameters.get("port")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for TCP connection".to_string()))?
|
||||||
|
.parse::<u16>()
|
||||||
|
.map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?;
|
||||||
|
|
||||||
|
Ok(GpsSourceConfig::Tcp {
|
||||||
|
host: host.clone(),
|
||||||
|
port,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"udp" => {
|
||||||
|
let bind_addr = config.parameters.get("bind_addr")
|
||||||
|
.unwrap_or(&"0.0.0.0".to_string())
|
||||||
|
.clone();
|
||||||
|
let port = config.parameters.get("port")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for UDP connection".to_string()))?
|
||||||
|
.parse::<u16>()
|
||||||
|
.map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?;
|
||||||
|
|
||||||
|
Ok(GpsSourceConfig::Udp {
|
||||||
|
bind_addr,
|
||||||
|
port,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
"file" => {
|
||||||
|
let path = config.parameters.get("path")
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("Missing path for file replay".to_string()))?;
|
||||||
|
let replay_speed = config.parameters.get("replay_speed")
|
||||||
|
.unwrap_or(&"1.0".to_string())
|
||||||
|
.parse::<f64>()
|
||||||
|
.map_err(|_| DataLinkError::InvalidConfig("Invalid replay_speed".to_string()))?;
|
||||||
|
|
||||||
|
Ok(GpsSourceConfig::File {
|
||||||
|
path: path.clone(),
|
||||||
|
replay_speed,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
_ => Err(DataLinkError::InvalidConfig(format!("Unsupported connection type: {}", connection_type))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the data receiver task based on the source configuration
|
||||||
|
async fn start_receiver(&mut self) -> DataLinkResult<()> {
|
||||||
|
let source_config = self.source_config.as_ref()
|
||||||
|
.ok_or_else(|| DataLinkError::InvalidConfig("No source configuration".to_string()))?;
|
||||||
|
|
||||||
|
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
|
||||||
|
let message_queue = Arc::clone(&self.message_queue);
|
||||||
|
|
||||||
|
let receiver_handle = match source_config {
|
||||||
|
GpsSourceConfig::Serial { port, baud_rate } => {
|
||||||
|
let port = port.clone();
|
||||||
|
let baud_rate = *baud_rate;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::serial_receiver(port, baud_rate, message_queue, &mut shutdown_rx).await {
|
||||||
|
error!("GPS Serial receiver error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
GpsSourceConfig::Tcp { host, port } => {
|
||||||
|
let host = host.clone();
|
||||||
|
let port = *port;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::tcp_receiver(host, port, message_queue, &mut shutdown_rx).await {
|
||||||
|
error!("GPS TCP receiver error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
GpsSourceConfig::Udp { bind_addr, port } => {
|
||||||
|
let bind_addr = bind_addr.clone();
|
||||||
|
let port = *port;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::udp_receiver(bind_addr, port, message_queue, &mut shutdown_rx).await {
|
||||||
|
error!("GPS UDP receiver error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
GpsSourceConfig::File { path, replay_speed } => {
|
||||||
|
let path = path.clone();
|
||||||
|
let replay_speed = *replay_speed;
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = Self::file_receiver(path, replay_speed, message_queue, &mut shutdown_rx).await {
|
||||||
|
error!("GPS File receiver error: {}", e);
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
self.receiver_handle = Some(receiver_handle);
|
||||||
|
self.shutdown_tx = Some(shutdown_tx);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Serial port receiver implementation
|
||||||
|
async fn serial_receiver(
|
||||||
|
port: String,
|
||||||
|
baud_rate: u32,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
shutdown_rx: &mut mpsc::Receiver<()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Starting GPS serial receiver on port {} at {} baud", port, baud_rate);
|
||||||
|
|
||||||
|
let serial_port = tokio_serial::new(&port, baud_rate)
|
||||||
|
.open_native_async()?;
|
||||||
|
|
||||||
|
let mut reader = BufReader::new(serial_port);
|
||||||
|
let mut line = String::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
info!("GPS Serial receiver shutdown requested");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = reader.read_line(&mut line) => {
|
||||||
|
match result {
|
||||||
|
Ok(0) => {
|
||||||
|
warn!("GPS Serial port closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
if let Some(message) = Self::parse_gps_sentence(&line.trim()) {
|
||||||
|
if let Ok(mut queue) = message_queue.lock() {
|
||||||
|
queue.push_back(message);
|
||||||
|
// Limit queue size to prevent memory issues
|
||||||
|
if queue.len() > 1000 {
|
||||||
|
queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
line.clear();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("GPS Serial read error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// TCP receiver implementation
|
||||||
|
async fn tcp_receiver(
|
||||||
|
host: String,
|
||||||
|
port: u16,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
shutdown_rx: &mut mpsc::Receiver<()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Starting GPS TCP receiver connecting to {}:{}", host, port);
|
||||||
|
|
||||||
|
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
|
||||||
|
let mut reader = BufReader::new(stream);
|
||||||
|
let mut line = String::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
info!("GPS TCP receiver shutdown requested");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = reader.read_line(&mut line) => {
|
||||||
|
match result {
|
||||||
|
Ok(0) => {
|
||||||
|
warn!("GPS TCP connection closed");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
if let Some(message) = Self::parse_gps_sentence(&line.trim()) {
|
||||||
|
if let Ok(mut queue) = message_queue.lock() {
|
||||||
|
queue.push_back(message);
|
||||||
|
if queue.len() > 1000 {
|
||||||
|
queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
line.clear();
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("GPS TCP read error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// UDP receiver implementation
|
||||||
|
async fn udp_receiver(
|
||||||
|
bind_addr: String,
|
||||||
|
port: u16,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
shutdown_rx: &mut mpsc::Receiver<()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Starting GPS UDP receiver on {}:{}", bind_addr, port);
|
||||||
|
|
||||||
|
let socket = UdpSocket::bind(format!("{}:{}", bind_addr, port)).await?;
|
||||||
|
let mut buf = [0; 1024];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
info!("GPS UDP receiver shutdown requested");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = socket.recv(&mut buf) => {
|
||||||
|
match result {
|
||||||
|
Ok(len) => {
|
||||||
|
let data = String::from_utf8_lossy(&buf[..len]);
|
||||||
|
for line in data.lines() {
|
||||||
|
if let Some(message) = Self::parse_gps_sentence(line.trim()) {
|
||||||
|
if let Ok(mut queue) = message_queue.lock() {
|
||||||
|
queue.push_back(message);
|
||||||
|
if queue.len() > 1000 {
|
||||||
|
queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("GPS UDP receive error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// File receiver implementation for replaying GPS data
|
||||||
|
async fn file_receiver(
|
||||||
|
path: String,
|
||||||
|
replay_speed: f64,
|
||||||
|
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
||||||
|
shutdown_rx: &mut mpsc::Receiver<()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||||
|
info!("Starting GPS file receiver for {} at {}x speed", path, replay_speed);
|
||||||
|
|
||||||
|
let file = tokio::fs::File::open(&path).await?;
|
||||||
|
let reader = BufReader::new(file);
|
||||||
|
let mut lines = reader.lines();
|
||||||
|
|
||||||
|
let delay_duration = Duration::from_millis((1000.0 / replay_speed) as u64);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown_rx.recv() => {
|
||||||
|
info!("GPS File receiver shutdown requested");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
result = lines.next_line() => {
|
||||||
|
match result {
|
||||||
|
Ok(Some(line)) => {
|
||||||
|
if let Some(message) = Self::parse_gps_sentence(&line.trim()) {
|
||||||
|
if let Ok(mut queue) = message_queue.lock() {
|
||||||
|
queue.push_back(message);
|
||||||
|
if queue.len() > 1000 {
|
||||||
|
queue.pop_front();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tokio::time::sleep(delay_duration).await;
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
info!("GPS End of file reached");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("GPS File read error: {}", e);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse a GPS NMEA sentence into a DataMessage
|
||||||
|
pub fn parse_gps_sentence(sentence: &str) -> Option<DataMessage> {
|
||||||
|
if !sentence.starts_with('$') {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Basic NMEA sentence validation
|
||||||
|
let parts: Vec<&str> = sentence.split(',').collect();
|
||||||
|
if parts.len() < 3 {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract sentence type (first part after $)
|
||||||
|
let sentence_type = parts[0];
|
||||||
|
|
||||||
|
// Check for common GPS sentence types
|
||||||
|
if !sentence_type.contains("GPGGA") &&
|
||||||
|
!sentence_type.contains("GPRMC") &&
|
||||||
|
!sentence_type.contains("GPGLL") &&
|
||||||
|
!sentence_type.contains("GPVTG") &&
|
||||||
|
!sentence_type.contains("GPGSA") &&
|
||||||
|
!sentence_type.contains("GPGSV") &&
|
||||||
|
!sentence_type.contains("GNRMC") &&
|
||||||
|
!sentence_type.contains("GNGGA") &&
|
||||||
|
!sentence_type.contains("GNGLL") {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a DataMessage from the GPS sentence
|
||||||
|
let mut message = DataMessage::new(
|
||||||
|
"GPS_SENTENCE".to_string(),
|
||||||
|
"GPS_RECEIVER".to_string(),
|
||||||
|
sentence.as_bytes().to_vec(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Add parsed data based on sentence type
|
||||||
|
message = message.with_data("sentence_type".to_string(), sentence_type.to_string());
|
||||||
|
|
||||||
|
// Parse specific GPS sentence types
|
||||||
|
match sentence_type {
|
||||||
|
s if s.contains("GPGGA") || s.contains("GNGGA") => {
|
||||||
|
// Global Positioning System Fix Data
|
||||||
|
if parts.len() >= 15 {
|
||||||
|
message = message.with_data("time".to_string(), parts[1].to_string());
|
||||||
|
message = message.with_data("latitude".to_string(), parts[2].to_string());
|
||||||
|
message = message.with_data("lat_direction".to_string(), parts[3].to_string());
|
||||||
|
message = message.with_data("longitude".to_string(), parts[4].to_string());
|
||||||
|
message = message.with_data("lon_direction".to_string(), parts[5].to_string());
|
||||||
|
message = message.with_data("fix_quality".to_string(), parts[6].to_string());
|
||||||
|
message = message.with_data("satellites".to_string(), parts[7].to_string());
|
||||||
|
message = message.with_data("hdop".to_string(), parts[8].to_string());
|
||||||
|
message = message.with_data("altitude".to_string(), parts[9].to_string());
|
||||||
|
message = message.with_data("altitude_unit".to_string(), parts[10].to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s if s.contains("GPRMC") || s.contains("GNRMC") => {
|
||||||
|
// Recommended Minimum Course
|
||||||
|
if parts.len() >= 12 {
|
||||||
|
message = message.with_data("time".to_string(), parts[1].to_string());
|
||||||
|
message = message.with_data("status".to_string(), parts[2].to_string());
|
||||||
|
message = message.with_data("latitude".to_string(), parts[3].to_string());
|
||||||
|
message = message.with_data("lat_direction".to_string(), parts[4].to_string());
|
||||||
|
message = message.with_data("longitude".to_string(), parts[5].to_string());
|
||||||
|
message = message.with_data("lon_direction".to_string(), parts[6].to_string());
|
||||||
|
message = message.with_data("speed".to_string(), parts[7].to_string());
|
||||||
|
message = message.with_data("course".to_string(), parts[8].to_string());
|
||||||
|
message = message.with_data("date".to_string(), parts[9].to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s if s.contains("GPGLL") || s.contains("GNGLL") => {
|
||||||
|
// Geographic Position - Latitude/Longitude
|
||||||
|
if parts.len() >= 7 {
|
||||||
|
message = message.with_data("latitude".to_string(), parts[1].to_string());
|
||||||
|
message = message.with_data("lat_direction".to_string(), parts[2].to_string());
|
||||||
|
message = message.with_data("longitude".to_string(), parts[3].to_string());
|
||||||
|
message = message.with_data("lon_direction".to_string(), parts[4].to_string());
|
||||||
|
message = message.with_data("time".to_string(), parts[5].to_string());
|
||||||
|
message = message.with_data("status".to_string(), parts[6].to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// For other sentence types, just store the raw parts
|
||||||
|
for (i, part) in parts.iter().enumerate() {
|
||||||
|
message = message.with_data(format!("field_{}", i), part.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add timestamp
|
||||||
|
message = message.with_data(
|
||||||
|
"timestamp".to_string(),
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.as_secs()
|
||||||
|
.to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
// Set signal quality based on sentence completeness and checksum
|
||||||
|
let quality = if sentence.contains('*') { 95 } else { 75 };
|
||||||
|
message = message.with_signal_quality(quality);
|
||||||
|
|
||||||
|
Some(message)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stop the receiver task
|
||||||
|
async fn stop_receiver(&mut self) {
|
||||||
|
if let Some(shutdown_tx) = self.shutdown_tx.take() {
|
||||||
|
let _ = shutdown_tx.send(()).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(handle) = self.receiver_handle.take() {
|
||||||
|
let _ = handle.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for GpsDataLinkProvider {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DataLinkReceiver for GpsDataLinkProvider {
|
||||||
|
fn status(&self) -> DataLinkStatus {
|
||||||
|
self.status.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn receive_message(&mut self) -> DataLinkResult<Option<DataMessage>> {
|
||||||
|
if let Ok(mut queue) = self.message_queue.lock() {
|
||||||
|
Ok(queue.pop_front())
|
||||||
|
} else {
|
||||||
|
Err(DataLinkError::TransportError("Failed to access message queue".to_string()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> {
|
||||||
|
info!("Connecting GPS datalink provider");
|
||||||
|
|
||||||
|
self.status = DataLinkStatus::Connecting;
|
||||||
|
self.config = Some(config.clone());
|
||||||
|
|
||||||
|
// Parse source configuration
|
||||||
|
self.source_config = Some(Self::parse_source_config(config)?);
|
||||||
|
|
||||||
|
// Start the receiver in a blocking context
|
||||||
|
let rt = tokio::runtime::Runtime::new()
|
||||||
|
.map_err(|e| DataLinkError::ConnectionFailed(format!("Failed to create runtime: {}", e)))?;
|
||||||
|
|
||||||
|
rt.block_on(async {
|
||||||
|
self.start_receiver().await
|
||||||
|
})?;
|
||||||
|
|
||||||
|
self.status = DataLinkStatus::Connected;
|
||||||
|
info!("GPS datalink provider connected successfully");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disconnect(&mut self) -> DataLinkResult<()> {
|
||||||
|
info!("Disconnecting GPS datalink provider");
|
||||||
|
|
||||||
|
let rt = tokio::runtime::Runtime::new()
|
||||||
|
.map_err(|e| DataLinkError::TransportError(format!("Failed to create runtime: {}", e)))?;
|
||||||
|
|
||||||
|
rt.block_on(async {
|
||||||
|
self.stop_receiver().await;
|
||||||
|
});
|
||||||
|
|
||||||
|
self.status = DataLinkStatus::Disconnected;
|
||||||
|
self.config = None;
|
||||||
|
self.source_config = None;
|
||||||
|
|
||||||
|
info!("GPS datalink provider disconnected");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DataLinkTransmitter for GpsDataLinkProvider {
|
||||||
|
fn status(&self) -> DataLinkStatus {
|
||||||
|
self.status.clone()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_message(&mut self, _message: &DataMessage) -> DataLinkResult<()> {
|
||||||
|
// GPS transmission is typically not supported for consumer devices
|
||||||
|
// This could be extended in the future for specialized GPS equipment
|
||||||
|
Err(DataLinkError::TransportError("GPS transmission not supported".to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> {
|
||||||
|
// Use the same connection logic as receiver
|
||||||
|
DataLinkReceiver::connect(self, config)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn disconnect(&mut self) -> DataLinkResult<()> {
|
||||||
|
// Use the same disconnection logic as receiver
|
||||||
|
DataLinkReceiver::disconnect(self)
|
||||||
|
}
|
||||||
|
}
|
@@ -1,538 +1,29 @@
|
|||||||
//! Real AIS Datalink Provider
|
//! Real AIS and GPS Datalink Providers
|
||||||
//!
|
//!
|
||||||
//! This crate provides real-world implementations of AIS datalink providers
|
//! This crate provides real-world implementations of AIS and GPS datalink providers
|
||||||
//! that can connect to actual AIS data sources such as:
|
//! that can connect to actual data sources such as:
|
||||||
//! - Serial ports (for direct AIS receiver connections)
|
//! - Serial ports (for direct AIS/GPS receiver connections)
|
||||||
//! - TCP/UDP network connections (for networked AIS data)
|
//! - TCP/UDP network connections (for networked AIS/GPS data)
|
||||||
//! - File-based AIS data replay
|
//! - File-based AIS/GPS data replay
|
||||||
|
|
||||||
use datalink::{
|
mod ais;
|
||||||
DataLinkConfig, DataLinkError, DataLinkReceiver, DataLinkResult,
|
mod gps;
|
||||||
DataLinkStatus, DataLinkTransmitter, DataMessage,
|
|
||||||
};
|
|
||||||
use log::{error, info, warn};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::sync::{Arc, Mutex};
|
|
||||||
use std::time::{Duration, SystemTime};
|
|
||||||
use tokio::io::{AsyncBufReadExt, BufReader};
|
|
||||||
use tokio::net::{TcpStream, UdpSocket};
|
|
||||||
use tokio::sync::mpsc;
|
|
||||||
use tokio_serial::SerialPortBuilderExt;
|
|
||||||
|
|
||||||
/// Configuration for different types of AIS data sources
|
// Re-export the main types for external use
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
pub use ais::{AisDataLinkProvider, AisSourceConfig};
|
||||||
pub enum AisSourceConfig {
|
pub use gps::{GpsDataLinkProvider, GpsSourceConfig};
|
||||||
/// Serial port configuration
|
|
||||||
Serial {
|
|
||||||
port: String,
|
|
||||||
baud_rate: u32,
|
|
||||||
},
|
|
||||||
/// TCP connection configuration
|
|
||||||
Tcp {
|
|
||||||
host: String,
|
|
||||||
port: u16,
|
|
||||||
},
|
|
||||||
/// UDP connection configuration
|
|
||||||
Udp {
|
|
||||||
bind_addr: String,
|
|
||||||
port: u16,
|
|
||||||
},
|
|
||||||
/// File replay configuration
|
|
||||||
File {
|
|
||||||
path: String,
|
|
||||||
replay_speed: f64, // 1.0 = real-time, 2.0 = 2x speed, etc.
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Real AIS Datalink Provider
|
use datalink::{DataLinkConfig, DataLinkReceiver, DataLinkStatus};
|
||||||
pub struct AisDataLinkProvider {
|
|
||||||
status: DataLinkStatus,
|
|
||||||
config: Option<DataLinkConfig>,
|
|
||||||
source_config: Option<AisSourceConfig>,
|
|
||||||
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
|
||||||
receiver_handle: Option<tokio::task::JoinHandle<()>>,
|
|
||||||
shutdown_tx: Option<mpsc::Sender<()>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AisDataLinkProvider {
|
|
||||||
/// Create a new AIS datalink provider
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self {
|
|
||||||
status: DataLinkStatus::Disconnected,
|
|
||||||
config: None,
|
|
||||||
source_config: None,
|
|
||||||
message_queue: Arc::new(Mutex::new(VecDeque::new())),
|
|
||||||
receiver_handle: None,
|
|
||||||
shutdown_tx: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parse AIS source configuration from DataLinkConfig
|
|
||||||
fn parse_source_config(config: &DataLinkConfig) -> DataLinkResult<AisSourceConfig> {
|
|
||||||
let connection_type = config.parameters.get("connection_type")
|
|
||||||
.ok_or_else(|| DataLinkError::InvalidConfig("Missing connection_type".to_string()))?;
|
|
||||||
|
|
||||||
match connection_type.as_str() {
|
|
||||||
"serial" => {
|
|
||||||
let port = config.parameters.get("port")
|
|
||||||
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for serial connection".to_string()))?;
|
|
||||||
let baud_rate = config.parameters.get("baud_rate")
|
|
||||||
.unwrap_or(&"4800".to_string())
|
|
||||||
.parse::<u32>()
|
|
||||||
.map_err(|_| DataLinkError::InvalidConfig("Invalid baud_rate".to_string()))?;
|
|
||||||
|
|
||||||
Ok(AisSourceConfig::Serial {
|
|
||||||
port: port.clone(),
|
|
||||||
baud_rate,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
"tcp" => {
|
|
||||||
let host = config.parameters.get("host")
|
|
||||||
.ok_or_else(|| DataLinkError::InvalidConfig("Missing host for TCP connection".to_string()))?;
|
|
||||||
let port = config.parameters.get("port")
|
|
||||||
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for TCP connection".to_string()))?
|
|
||||||
.parse::<u16>()
|
|
||||||
.map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?;
|
|
||||||
|
|
||||||
Ok(AisSourceConfig::Tcp {
|
|
||||||
host: host.clone(),
|
|
||||||
port,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
"udp" => {
|
|
||||||
let bind_addr = config.parameters.get("bind_addr")
|
|
||||||
.unwrap_or(&"0.0.0.0".to_string())
|
|
||||||
.clone();
|
|
||||||
let port = config.parameters.get("port")
|
|
||||||
.ok_or_else(|| DataLinkError::InvalidConfig("Missing port for UDP connection".to_string()))?
|
|
||||||
.parse::<u16>()
|
|
||||||
.map_err(|_| DataLinkError::InvalidConfig("Invalid port number".to_string()))?;
|
|
||||||
|
|
||||||
Ok(AisSourceConfig::Udp {
|
|
||||||
bind_addr,
|
|
||||||
port,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
"file" => {
|
|
||||||
let path = config.parameters.get("path")
|
|
||||||
.ok_or_else(|| DataLinkError::InvalidConfig("Missing path for file replay".to_string()))?;
|
|
||||||
let replay_speed = config.parameters.get("replay_speed")
|
|
||||||
.unwrap_or(&"1.0".to_string())
|
|
||||||
.parse::<f64>()
|
|
||||||
.map_err(|_| DataLinkError::InvalidConfig("Invalid replay_speed".to_string()))?;
|
|
||||||
|
|
||||||
Ok(AisSourceConfig::File {
|
|
||||||
path: path.clone(),
|
|
||||||
replay_speed,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
_ => Err(DataLinkError::InvalidConfig(format!("Unsupported connection type: {}", connection_type))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Start the data receiver task based on the source configuration
|
|
||||||
async fn start_receiver(&mut self) -> DataLinkResult<()> {
|
|
||||||
let source_config = self.source_config.as_ref()
|
|
||||||
.ok_or_else(|| DataLinkError::InvalidConfig("No source configuration".to_string()))?;
|
|
||||||
|
|
||||||
let (shutdown_tx, mut shutdown_rx) = mpsc::channel::<()>(1);
|
|
||||||
let message_queue = Arc::clone(&self.message_queue);
|
|
||||||
|
|
||||||
let receiver_handle = match source_config {
|
|
||||||
AisSourceConfig::Serial { port, baud_rate } => {
|
|
||||||
let port = port.clone();
|
|
||||||
let baud_rate = *baud_rate;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = Self::serial_receiver(port, baud_rate, message_queue, &mut shutdown_rx).await {
|
|
||||||
error!("Serial receiver error: {}", e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
AisSourceConfig::Tcp { host, port } => {
|
|
||||||
let host = host.clone();
|
|
||||||
let port = *port;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = Self::tcp_receiver(host, port, message_queue, &mut shutdown_rx).await {
|
|
||||||
error!("TCP receiver error: {}", e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
AisSourceConfig::Udp { bind_addr, port } => {
|
|
||||||
let bind_addr = bind_addr.clone();
|
|
||||||
let port = *port;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = Self::udp_receiver(bind_addr, port, message_queue, &mut shutdown_rx).await {
|
|
||||||
error!("UDP receiver error: {}", e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
AisSourceConfig::File { path, replay_speed } => {
|
|
||||||
let path = path.clone();
|
|
||||||
let replay_speed = *replay_speed;
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = Self::file_receiver(path, replay_speed, message_queue, &mut shutdown_rx).await {
|
|
||||||
error!("File receiver error: {}", e);
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
self.receiver_handle = Some(receiver_handle);
|
|
||||||
self.shutdown_tx = Some(shutdown_tx);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Serial port receiver implementation
|
|
||||||
async fn serial_receiver(
|
|
||||||
port: String,
|
|
||||||
baud_rate: u32,
|
|
||||||
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
|
||||||
shutdown_rx: &mut mpsc::Receiver<()>,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
info!("Starting serial receiver on port {} at {} baud", port, baud_rate);
|
|
||||||
|
|
||||||
let serial_port = tokio_serial::new(&port, baud_rate)
|
|
||||||
.open_native_async()?;
|
|
||||||
|
|
||||||
let mut reader = BufReader::new(serial_port);
|
|
||||||
let mut line = String::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = shutdown_rx.recv() => {
|
|
||||||
info!("Serial receiver shutdown requested");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
result = reader.read_line(&mut line) => {
|
|
||||||
match result {
|
|
||||||
Ok(0) => {
|
|
||||||
warn!("Serial port closed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
if let Some(message) = Self::parse_ais_sentence(&line.trim()) {
|
|
||||||
if let Ok(mut queue) = message_queue.lock() {
|
|
||||||
queue.push_back(message);
|
|
||||||
// Limit queue size to prevent memory issues
|
|
||||||
if queue.len() > 1000 {
|
|
||||||
queue.pop_front();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
line.clear();
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("Serial read error: {}", e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// TCP receiver implementation
|
|
||||||
async fn tcp_receiver(
|
|
||||||
host: String,
|
|
||||||
port: u16,
|
|
||||||
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
|
||||||
shutdown_rx: &mut mpsc::Receiver<()>,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
info!("Starting TCP receiver connecting to {}:{}", host, port);
|
|
||||||
|
|
||||||
let stream = TcpStream::connect(format!("{}:{}", host, port)).await?;
|
|
||||||
let mut reader = BufReader::new(stream);
|
|
||||||
let mut line = String::new();
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = shutdown_rx.recv() => {
|
|
||||||
info!("TCP receiver shutdown requested");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
result = reader.read_line(&mut line) => {
|
|
||||||
match result {
|
|
||||||
Ok(0) => {
|
|
||||||
warn!("TCP connection closed");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
if let Some(message) = Self::parse_ais_sentence(&line.trim()) {
|
|
||||||
if let Ok(mut queue) = message_queue.lock() {
|
|
||||||
queue.push_back(message);
|
|
||||||
if queue.len() > 1000 {
|
|
||||||
queue.pop_front();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
line.clear();
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("TCP read error: {}", e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// UDP receiver implementation
|
|
||||||
async fn udp_receiver(
|
|
||||||
bind_addr: String,
|
|
||||||
port: u16,
|
|
||||||
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
|
||||||
shutdown_rx: &mut mpsc::Receiver<()>,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
info!("Starting UDP receiver on {}:{}", bind_addr, port);
|
|
||||||
|
|
||||||
let socket = UdpSocket::bind(format!("{}:{}", bind_addr, port)).await?;
|
|
||||||
let mut buf = [0; 1024];
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = shutdown_rx.recv() => {
|
|
||||||
info!("UDP receiver shutdown requested");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
result = socket.recv(&mut buf) => {
|
|
||||||
match result {
|
|
||||||
Ok(len) => {
|
|
||||||
let data = String::from_utf8_lossy(&buf[..len]);
|
|
||||||
for line in data.lines() {
|
|
||||||
if let Some(message) = Self::parse_ais_sentence(line.trim()) {
|
|
||||||
if let Ok(mut queue) = message_queue.lock() {
|
|
||||||
queue.push_back(message);
|
|
||||||
if queue.len() > 1000 {
|
|
||||||
queue.pop_front();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("UDP receive error: {}", e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// File receiver implementation for replaying AIS data
|
|
||||||
async fn file_receiver(
|
|
||||||
path: String,
|
|
||||||
replay_speed: f64,
|
|
||||||
message_queue: Arc<Mutex<VecDeque<DataMessage>>>,
|
|
||||||
shutdown_rx: &mut mpsc::Receiver<()>,
|
|
||||||
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|
||||||
info!("Starting file receiver for {} at {}x speed", path, replay_speed);
|
|
||||||
|
|
||||||
let file = tokio::fs::File::open(&path).await?;
|
|
||||||
let reader = BufReader::new(file);
|
|
||||||
let mut lines = reader.lines();
|
|
||||||
|
|
||||||
let delay_duration = Duration::from_millis((1000.0 / replay_speed) as u64);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
tokio::select! {
|
|
||||||
_ = shutdown_rx.recv() => {
|
|
||||||
info!("File receiver shutdown requested");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
result = lines.next_line() => {
|
|
||||||
match result {
|
|
||||||
Ok(Some(line)) => {
|
|
||||||
if let Some(message) = Self::parse_ais_sentence(&line.trim()) {
|
|
||||||
if let Ok(mut queue) = message_queue.lock() {
|
|
||||||
queue.push_back(message);
|
|
||||||
if queue.len() > 1000 {
|
|
||||||
queue.pop_front();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
tokio::time::sleep(delay_duration).await;
|
|
||||||
}
|
|
||||||
Ok(None) => {
|
|
||||||
info!("End of file reached");
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
error!("File read error: {}", e);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Parse an AIS sentence into a DataMessage
|
|
||||||
fn parse_ais_sentence(sentence: &str) -> Option<DataMessage> {
|
|
||||||
if !sentence.starts_with('!') && !sentence.starts_with('$') {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Basic NMEA sentence validation
|
|
||||||
let parts: Vec<&str> = sentence.split(',').collect();
|
|
||||||
if parts.len() < 6 {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Extract basic information from AIS sentence
|
|
||||||
let sentence_type = parts[0];
|
|
||||||
if !sentence_type.contains("AIVDM") && !sentence_type.contains("AIVDO") {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a DataMessage from the AIS sentence
|
|
||||||
let mut message = DataMessage::new(
|
|
||||||
"AIS_SENTENCE".to_string(),
|
|
||||||
"AIS_RECEIVER".to_string(),
|
|
||||||
sentence.as_bytes().to_vec(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Add parsed data if available
|
|
||||||
if parts.len() >= 6 {
|
|
||||||
message = message.with_data("sentence_type".to_string(), sentence_type.to_string());
|
|
||||||
message = message.with_data("fragment_count".to_string(), parts[1].to_string());
|
|
||||||
message = message.with_data("fragment_number".to_string(), parts[2].to_string());
|
|
||||||
message = message.with_data("message_id".to_string(), parts[3].to_string());
|
|
||||||
message = message.with_data("channel".to_string(), parts[4].to_string());
|
|
||||||
message = message.with_data("payload".to_string(), parts[5].to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add timestamp
|
|
||||||
message = message.with_data(
|
|
||||||
"timestamp".to_string(),
|
|
||||||
SystemTime::now()
|
|
||||||
.duration_since(SystemTime::UNIX_EPOCH)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.as_secs()
|
|
||||||
.to_string(),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Set signal quality based on sentence completeness
|
|
||||||
let quality = if sentence.contains('*') { 90 } else { 70 };
|
|
||||||
message = message.with_signal_quality(quality);
|
|
||||||
|
|
||||||
Some(message)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Stop the receiver task
|
|
||||||
async fn stop_receiver(&mut self) {
|
|
||||||
if let Some(shutdown_tx) = self.shutdown_tx.take() {
|
|
||||||
let _ = shutdown_tx.send(()).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(handle) = self.receiver_handle.take() {
|
|
||||||
let _ = handle.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for AisDataLinkProvider {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DataLinkReceiver for AisDataLinkProvider {
|
|
||||||
fn status(&self) -> DataLinkStatus {
|
|
||||||
self.status.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn receive_message(&mut self) -> DataLinkResult<Option<DataMessage>> {
|
|
||||||
if let Ok(mut queue) = self.message_queue.lock() {
|
|
||||||
Ok(queue.pop_front())
|
|
||||||
} else {
|
|
||||||
Err(DataLinkError::TransportError("Failed to access message queue".to_string()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> {
|
|
||||||
info!("Connecting AIS datalink provider");
|
|
||||||
|
|
||||||
self.status = DataLinkStatus::Connecting;
|
|
||||||
self.config = Some(config.clone());
|
|
||||||
|
|
||||||
// Parse source configuration
|
|
||||||
self.source_config = Some(Self::parse_source_config(config)?);
|
|
||||||
|
|
||||||
// Start the receiver in a blocking context
|
|
||||||
let rt = tokio::runtime::Runtime::new()
|
|
||||||
.map_err(|e| DataLinkError::ConnectionFailed(format!("Failed to create runtime: {}", e)))?;
|
|
||||||
|
|
||||||
rt.block_on(async {
|
|
||||||
self.start_receiver().await
|
|
||||||
})?;
|
|
||||||
|
|
||||||
self.status = DataLinkStatus::Connected;
|
|
||||||
info!("AIS datalink provider connected successfully");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn disconnect(&mut self) -> DataLinkResult<()> {
|
|
||||||
info!("Disconnecting AIS datalink provider");
|
|
||||||
|
|
||||||
let rt = tokio::runtime::Runtime::new()
|
|
||||||
.map_err(|e| DataLinkError::TransportError(format!("Failed to create runtime: {}", e)))?;
|
|
||||||
|
|
||||||
rt.block_on(async {
|
|
||||||
self.stop_receiver().await;
|
|
||||||
});
|
|
||||||
|
|
||||||
self.status = DataLinkStatus::Disconnected;
|
|
||||||
self.config = None;
|
|
||||||
self.source_config = None;
|
|
||||||
|
|
||||||
info!("AIS datalink provider disconnected");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DataLinkTransmitter for AisDataLinkProvider {
|
|
||||||
fn status(&self) -> DataLinkStatus {
|
|
||||||
self.status.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn send_message(&mut self, _message: &DataMessage) -> DataLinkResult<()> {
|
|
||||||
// For now, AIS transmission is not implemented as it requires special equipment
|
|
||||||
// and licensing. This could be extended in the future for AIS transponders.
|
|
||||||
Err(DataLinkError::TransportError("AIS transmission not supported".to_string()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn connect(&mut self, config: &DataLinkConfig) -> DataLinkResult<()> {
|
|
||||||
// Use the same connection logic as receiver
|
|
||||||
DataLinkReceiver::connect(self, config)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn disconnect(&mut self) -> DataLinkResult<()> {
|
|
||||||
// Use the same disconnection logic as receiver
|
|
||||||
DataLinkReceiver::disconnect(self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
use datalink::DataLinkConfig;
|
use datalink::DataLinkConfig;
|
||||||
|
use crate::ais::{AisDataLinkProvider, AisSourceConfig};
|
||||||
|
use crate::gps::{GpsDataLinkProvider, GpsSourceConfig};
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_ais_provider_creation() {
|
fn test_ais_provider_creation() {
|
||||||
@@ -593,4 +84,130 @@ mod tests {
|
|||||||
let message = AisDataLinkProvider::parse_ais_sentence(sentence);
|
let message = AisDataLinkProvider::parse_ais_sentence(sentence);
|
||||||
assert!(message.is_none());
|
assert!(message.is_none());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GPS Provider Tests
|
||||||
|
#[test]
|
||||||
|
fn test_gps_provider_creation() {
|
||||||
|
let provider = GpsDataLinkProvider::new();
|
||||||
|
assert!(matches!(DataLinkReceiver::status(&provider), DataLinkStatus::Disconnected));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_gps_source_config_serial() {
|
||||||
|
let config = DataLinkConfig::new("serial".to_string())
|
||||||
|
.with_parameter("connection_type".to_string(), "serial".to_string())
|
||||||
|
.with_parameter("port".to_string(), "/dev/ttyUSB0".to_string())
|
||||||
|
.with_parameter("baud_rate".to_string(), "9600".to_string());
|
||||||
|
|
||||||
|
let source_config = GpsDataLinkProvider::parse_source_config(&config).unwrap();
|
||||||
|
|
||||||
|
match source_config {
|
||||||
|
GpsSourceConfig::Serial { port, baud_rate } => {
|
||||||
|
assert_eq!(port, "/dev/ttyUSB0");
|
||||||
|
assert_eq!(baud_rate, 9600);
|
||||||
|
}
|
||||||
|
_ => panic!("Expected Serial configuration"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_gps_source_config_tcp() {
|
||||||
|
let config = DataLinkConfig::new("tcp".to_string())
|
||||||
|
.with_parameter("connection_type".to_string(), "tcp".to_string())
|
||||||
|
.with_parameter("host".to_string(), "gps.example.com".to_string())
|
||||||
|
.with_parameter("port".to_string(), "2947".to_string());
|
||||||
|
|
||||||
|
let source_config = GpsDataLinkProvider::parse_source_config(&config).unwrap();
|
||||||
|
|
||||||
|
match source_config {
|
||||||
|
GpsSourceConfig::Tcp { host, port } => {
|
||||||
|
assert_eq!(host, "gps.example.com");
|
||||||
|
assert_eq!(port, 2947);
|
||||||
|
}
|
||||||
|
_ => panic!("Expected TCP configuration"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_gps_gga_sentence() {
|
||||||
|
let sentence = "$GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47";
|
||||||
|
let message = GpsDataLinkProvider::parse_gps_sentence(sentence).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(message.message_type, "GPS_SENTENCE");
|
||||||
|
assert_eq!(message.source_id, "GPS_RECEIVER");
|
||||||
|
assert_eq!(message.get_data("sentence_type"), Some(&"$GPGGA".to_string()));
|
||||||
|
assert_eq!(message.get_data("time"), Some(&"123519".to_string()));
|
||||||
|
assert_eq!(message.get_data("latitude"), Some(&"4807.038".to_string()));
|
||||||
|
assert_eq!(message.get_data("lat_direction"), Some(&"N".to_string()));
|
||||||
|
assert_eq!(message.get_data("longitude"), Some(&"01131.000".to_string()));
|
||||||
|
assert_eq!(message.get_data("lon_direction"), Some(&"E".to_string()));
|
||||||
|
assert_eq!(message.get_data("fix_quality"), Some(&"1".to_string()));
|
||||||
|
assert_eq!(message.get_data("satellites"), Some(&"08".to_string()));
|
||||||
|
assert_eq!(message.get_data("hdop"), Some(&"0.9".to_string()));
|
||||||
|
assert_eq!(message.get_data("altitude"), Some(&"545.4".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_gps_rmc_sentence() {
|
||||||
|
let sentence = "$GPRMC,123519,A,4807.038,N,01131.000,E,022.4,084.4,230394,003.1,W*6A";
|
||||||
|
let message = GpsDataLinkProvider::parse_gps_sentence(sentence).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(message.message_type, "GPS_SENTENCE");
|
||||||
|
assert_eq!(message.source_id, "GPS_RECEIVER");
|
||||||
|
assert_eq!(message.get_data("sentence_type"), Some(&"$GPRMC".to_string()));
|
||||||
|
assert_eq!(message.get_data("time"), Some(&"123519".to_string()));
|
||||||
|
assert_eq!(message.get_data("status"), Some(&"A".to_string()));
|
||||||
|
assert_eq!(message.get_data("latitude"), Some(&"4807.038".to_string()));
|
||||||
|
assert_eq!(message.get_data("speed"), Some(&"022.4".to_string()));
|
||||||
|
assert_eq!(message.get_data("course"), Some(&"084.4".to_string()));
|
||||||
|
assert_eq!(message.get_data("date"), Some(&"230394".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_gps_gll_sentence() {
|
||||||
|
let sentence = "$GPGLL,4916.45,N,12311.12,W,225444,A,*1D";
|
||||||
|
let message = GpsDataLinkProvider::parse_gps_sentence(sentence).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(message.message_type, "GPS_SENTENCE");
|
||||||
|
assert_eq!(message.source_id, "GPS_RECEIVER");
|
||||||
|
assert_eq!(message.get_data("sentence_type"), Some(&"$GPGLL".to_string()));
|
||||||
|
assert_eq!(message.get_data("latitude"), Some(&"4916.45".to_string()));
|
||||||
|
assert_eq!(message.get_data("lat_direction"), Some(&"N".to_string()));
|
||||||
|
assert_eq!(message.get_data("longitude"), Some(&"12311.12".to_string()));
|
||||||
|
assert_eq!(message.get_data("lon_direction"), Some(&"W".to_string()));
|
||||||
|
assert_eq!(message.get_data("time"), Some(&"225444".to_string()));
|
||||||
|
assert_eq!(message.get_data("status"), Some(&"A".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_parse_gnss_sentence() {
|
||||||
|
let sentence = "$GNGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47";
|
||||||
|
let message = GpsDataLinkProvider::parse_gps_sentence(sentence).unwrap();
|
||||||
|
|
||||||
|
assert_eq!(message.message_type, "GPS_SENTENCE");
|
||||||
|
assert_eq!(message.source_id, "GPS_RECEIVER");
|
||||||
|
assert_eq!(message.get_data("sentence_type"), Some(&"$GNGGA".to_string()));
|
||||||
|
assert_eq!(message.get_data("latitude"), Some(&"4807.038".to_string()));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_invalid_gps_sentence() {
|
||||||
|
let sentence = "This is not a GPS sentence";
|
||||||
|
let message = GpsDataLinkProvider::parse_gps_sentence(sentence);
|
||||||
|
assert!(message.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_invalid_gps_sentence_no_dollar() {
|
||||||
|
let sentence = "GPGGA,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47";
|
||||||
|
let message = GpsDataLinkProvider::parse_gps_sentence(sentence);
|
||||||
|
assert!(message.is_none());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_unsupported_gps_sentence() {
|
||||||
|
let sentence = "$GPXXX,123519,4807.038,N,01131.000,E,1,08,0.9,545.4,M,46.9,M,,*47";
|
||||||
|
let message = GpsDataLinkProvider::parse_gps_sentence(sentence);
|
||||||
|
assert!(message.is_none());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user