From 72bf4b53c6aa5a7f1bdc7af6027016bc38ee8f68 Mon Sep 17 00:00:00 2001 From: geoffsee <> Date: Thu, 3 Jul 2025 12:24:25 -0400 Subject: [PATCH] Introduce core modules: device management, bus communication, and discovery protocol. Adds system device interface, virtual hardware bus, and device discovery logic. Includes tests for all components. --- crates/hardware/Cargo.toml | 16 + crates/hardware/README.md | 366 ++++++++++++++ crates/hardware/src/bus.rs | 338 +++++++++++++ crates/hardware/src/device.rs | 430 +++++++++++++++++ crates/hardware/src/discovery_protocol.rs | 561 ++++++++++++++++++++++ crates/hardware/src/error.rs | 72 +++ crates/hardware/src/lib.rs | 27 ++ 7 files changed, 1810 insertions(+) create mode 100644 crates/hardware/Cargo.toml create mode 100644 crates/hardware/README.md create mode 100644 crates/hardware/src/bus.rs create mode 100644 crates/hardware/src/device.rs create mode 100644 crates/hardware/src/discovery_protocol.rs create mode 100644 crates/hardware/src/error.rs create mode 100644 crates/hardware/src/lib.rs diff --git a/crates/hardware/Cargo.toml b/crates/hardware/Cargo.toml new file mode 100644 index 0000000..9602ffd --- /dev/null +++ b/crates/hardware/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "hardware" +version = "0.1.0" +edition = "2021" + +[dependencies] +async-trait = "0.1" +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" +tokio = { version = "1.0", features = ["full"] } +uuid = { version = "1.0", features = ["v4"] } +tracing = "0.1" + +[dev-dependencies] +tokio-test = "0.4" diff --git a/crates/hardware/README.md b/crates/hardware/README.md new file mode 100644 index 0000000..dffb3bb --- /dev/null +++ b/crates/hardware/README.md @@ -0,0 +1,366 @@ +# Virtual Hardware Abstraction Layer - Integration Guide + +This document provides detailed instructions on how to integrate the virtual hardware abstraction layer into yachtpit systems. + +## Overview + +The virtual hardware abstraction layer consists of three main components: + +1. **Hardware Bus** - Communication infrastructure for virtual devices +2. **System Device** - Interface and base implementation for virtual hardware devices +3. **Discovery Protocol** - Device discovery and capability advertisement + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Yachtpit Application │ +├─────────────────────────────────────────────────────────────┤ +│ Systems Crate │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │ GPS System │ │Radar System │ │ AIS System │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ │ +├─────────────────────────────────────────────────────────────┤ +│ Hardware Abstraction Layer │ +│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ +│ │Hardware Bus │ │System Device│ │Discovery │ │ +│ │ │ │Interface │ │Protocol │ │ +│ └─────────────┘ └─────────────┘ └─────────────┘ │ +└─────────────────────────────────────────────────────────────┘ +``` + +## Integration Steps + +### Step 1: Add Hardware Dependency + +Add the hardware crate as a dependency to the systems crate: + +```toml +# crates/systems/Cargo.toml +[dependencies] +hardware = { path = "../hardware" } +``` + +### Step 2: Create Hardware-Aware System Implementations + +Modify existing systems to implement the `SystemDevice` trait: + +```rust +// crates/systems/src/gps/gps_system.rs +use hardware::prelude::*; + +pub struct GpsSystemDevice { + base: BaseSystemDevice, + // GPS-specific fields + position: Option, + satellites: u8, +} + +#[async_trait::async_trait] +impl SystemDevice for GpsSystemDevice { + async fn initialize(&mut self) -> Result<()> { + self.base.initialize().await?; + // GPS-specific initialization + self.satellites = 0; + Ok(()) + } + + async fn process(&mut self) -> Result> { + // Generate GPS data messages + let mut messages = Vec::new(); + + if let Some(position) = &self.position { + let payload = serde_json::to_vec(&position)?; + let message = BusMessage::Data { + from: self.base.info.address.clone(), + to: BusAddress::new("navigation_system"), // Example target + payload, + message_id: Uuid::new_v4(), + }; + messages.push(message); + } + + Ok(messages) + } + + // Implement other required methods... +} +``` + +### Step 3: Set Up Hardware Bus + +Create a central hardware bus manager: + +```rust +// crates/systems/src/hardware_manager.rs +use hardware::prelude::*; +use std::sync::Arc; + +pub struct HardwareManager { + bus: Arc, + device_manager: DeviceManager, + discovery_protocol: DiscoveryProtocol, +} + +impl HardwareManager { + pub async fn new() -> Result { + let bus = Arc::new(HardwareBus::new()); + let device_manager = DeviceManager::new(); + + // Create discovery protocol for the manager itself + let manager_info = DeviceInfo { + address: BusAddress::new("hardware_manager"), + config: DeviceConfig { + name: "Hardware Manager".to_string(), + capabilities: vec![DeviceCapability::Communication], + ..Default::default() + }, + status: DeviceStatus::Online, + last_seen: SystemTime::now(), + version: "1.0.0".to_string(), + manufacturer: "Yachtpit".to_string(), + }; + + let discovery_protocol = DiscoveryProtocol::new( + manager_info, + DiscoveryConfig::default(), + ); + + Ok(Self { + bus, + device_manager, + discovery_protocol, + }) + } + + pub async fn add_system_device(&mut self, device: Box) -> Result<()> { + let address = device.get_info().address.clone(); + + // Connect device to bus + let connection = self.bus.connect_device(address.clone()).await?; + + // Add to device manager + self.device_manager.add_device(device); + + Ok(()) + } + + pub async fn start_all_systems(&mut self) -> Result<()> { + self.device_manager.start_all().await?; + self.discovery_protocol.start().await?; + Ok(()) + } +} +``` + +### Step 4: Integrate with Existing Systems + +Modify the existing vessel systems to use the hardware abstraction: + +```rust +// crates/systems/src/vessel/vessel_systems.rs +use crate::hardware_manager::HardwareManager; + +pub async fn create_vessel_systems_with_hardware() -> Result { + let mut hardware_manager = HardwareManager::new().await?; + + // Create GPS system + let gps_config = DeviceConfig { + name: "GPS System".to_string(), + capabilities: vec![DeviceCapability::Gps], + update_interval_ms: 1000, + ..Default::default() + }; + let gps_device = Box::new(GpsSystemDevice::new(gps_config)); + hardware_manager.add_system_device(gps_device).await?; + + // Create Radar system + let radar_config = DeviceConfig { + name: "Radar System".to_string(), + capabilities: vec![DeviceCapability::Radar], + update_interval_ms: 500, + ..Default::default() + }; + let radar_device = Box::new(RadarSystemDevice::new(radar_config)); + hardware_manager.add_system_device(radar_device).await?; + + // Create AIS system + let ais_config = DeviceConfig { + name: "AIS System".to_string(), + capabilities: vec![DeviceCapability::Ais], + update_interval_ms: 2000, + ..Default::default() + }; + let ais_device = Box::new(AisSystemDevice::new(ais_config)); + hardware_manager.add_system_device(ais_device).await?; + + hardware_manager.start_all_systems().await?; + + Ok(hardware_manager) +} +``` + +### Step 5: Update Main Application + +Integrate the hardware manager into the main yachtpit application: + +```rust +// crates/yachtpit/src/core/system_manager.rs +use systems::vessel::vessel_systems::create_vessel_systems_with_hardware; + +pub struct SystemManager { + hardware_manager: Option, +} + +impl SystemManager { + pub async fn initialize_with_hardware(&mut self) -> Result<()> { + let hardware_manager = create_vessel_systems_with_hardware().await?; + self.hardware_manager = Some(hardware_manager); + Ok(()) + } + + pub async fn discover_devices(&self) -> Result> { + if let Some(ref manager) = self.hardware_manager { + // Use discovery protocol to find devices + manager.discovery_protocol.discover_devices(None).await?; + tokio::time::sleep(Duration::from_millis(100)).await; // Wait for responses + Ok(manager.discovery_protocol.get_known_devices().await) + } else { + Ok(vec![]) + } + } +} +``` + +## Message Flow Examples + +### GPS Data Flow +``` +GPS Device → Hardware Bus → Navigation System + → Discovery Protocol (heartbeat) + → Other interested devices +``` + +### Device Discovery Flow +``` +New Device → Announce Message → Hardware Bus → All Devices +Discovery Request → Hardware Bus → Matching Devices → Response +``` + +## Configuration + +### Device Configuration +Each device can be configured with: +- Update intervals +- Capabilities +- Custom configuration parameters +- Message queue sizes + +### Discovery Configuration +- Heartbeat intervals +- Device timeout periods +- Cleanup intervals +- Maximum tracked devices + +## Testing Integration + +### Unit Tests +Run tests for individual components: +```bash +cargo test -p hardware +cargo test -p systems +``` + +### Integration Tests +Create integration tests that verify the complete flow: + +```rust +#[tokio::test] +async fn test_complete_hardware_integration() { + let mut hardware_manager = HardwareManager::new().await.unwrap(); + + // Add test devices + let gps_device = Box::new(create_test_gps_device()); + hardware_manager.add_system_device(gps_device).await.unwrap(); + + // Start systems + hardware_manager.start_all_systems().await.unwrap(); + + // Verify device discovery + let devices = hardware_manager.discovery_protocol.get_known_devices().await; + assert!(!devices.is_empty()); + + // Test message passing + // ... additional test logic +} +``` + +## Performance Considerations + +1. **Message Throughput**: The hardware bus uses unbounded channels for high throughput +2. **Device Limits**: Configure maximum device limits based on system resources +3. **Update Intervals**: Balance between data freshness and system load +4. **Memory Usage**: Monitor device registry size and message history + +## Error Handling + +The hardware abstraction layer provides comprehensive error handling: + +- **Device Errors**: Automatic retry and fallback mechanisms +- **Bus Errors**: Connection recovery and message queuing +- **Discovery Errors**: Timeout handling and device cleanup + +## Migration Strategy + +### Phase 1: Parallel Implementation +- Keep existing systems running +- Implement hardware abstraction alongside +- Gradual migration of individual systems + +### Phase 2: Feature Parity +- Ensure all existing functionality is available +- Add comprehensive testing +- Performance validation + +### Phase 3: Full Migration +- Switch to hardware abstraction as primary +- Remove legacy system implementations +- Optimize performance + +## Troubleshooting + +### Common Issues + +1. **Device Not Found**: Check device registration and bus connection +2. **Message Delivery Failures**: Verify device addresses and bus connectivity +3. **Discovery Timeouts**: Adjust discovery configuration parameters +4. **Performance Issues**: Monitor message queue sizes and update intervals + +### Debugging Tools + +```rust +// Enable debug logging +use tracing::{info, debug, warn}; + +// Check device status +// let device_info = hardware_manager.get_device_info(&address).await; +// debug!("Device status: {:?}", device_info.status); + +// Monitor message history +// let messages = hardware_bus.get_message_history().await; +// info!("Recent messages: {}", messages.len()); +``` + +## Future Enhancements + +1. **Network Discovery**: Extend discovery protocol to work across network boundaries +2. **Device Simulation**: Add comprehensive device simulators for testing +3. **Hot-Plugging**: Support for dynamic device addition/removal +4. **Load Balancing**: Distribute device processing across multiple threads +5. **Persistence**: Save and restore device configurations and state + +## Conclusion + +The virtual hardware abstraction layer provides a robust foundation for managing yacht systems. By following this integration guide, you can gradually migrate existing systems while maintaining full functionality and adding new capabilities for device discovery and communication. + +For questions or issues during integration, refer to the individual module documentation in the hardware crate or create an issue in the project repository. diff --git a/crates/hardware/src/bus.rs b/crates/hardware/src/bus.rs new file mode 100644 index 0000000..c195c10 --- /dev/null +++ b/crates/hardware/src/bus.rs @@ -0,0 +1,338 @@ +//! Virtual Hardware Bus Module +//! +//! Provides a communication infrastructure for virtual hardware devices + +use crate::{HardwareError, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::{mpsc, RwLock}; +use tracing::{debug, error, info, warn}; +use uuid::Uuid; + +/// Unique address for devices on the hardware bus +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct BusAddress { + pub id: Uuid, + pub name: String, +} + +impl BusAddress { + /// Create a new bus address with a generated UUID + pub fn new(name: impl Into) -> Self { + Self { + id: Uuid::new_v4(), + name: name.into(), + } + } + + /// Create a bus address with a specific UUID + pub fn with_id(id: Uuid, name: impl Into) -> Self { + Self { + id, + name: name.into(), + } + } +} + +/// Message types that can be sent over the hardware bus +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum BusMessage { + /// Data message with payload + Data { + from: BusAddress, + to: BusAddress, + payload: Vec, + message_id: Uuid, + }, + /// Control message for bus management + Control { + from: BusAddress, + command: ControlCommand, + message_id: Uuid, + }, + /// Broadcast message to all devices + Broadcast { + from: BusAddress, + payload: Vec, + message_id: Uuid, + }, + /// Acknowledgment message + Ack { + to: BusAddress, + original_message_id: Uuid, + message_id: Uuid, + }, +} + +/// Control commands for bus management +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum ControlCommand { + /// Register a device on the bus + Register { address: BusAddress }, + /// Unregister a device from the bus + Unregister { address: BusAddress }, + /// Ping a device + Ping { target: BusAddress }, + /// Pong response to ping + Pong { from: BusAddress }, + /// Request device list + ListDevices, + /// Response with device list + DeviceList { devices: Vec }, +} + +impl BusMessage { + /// Get the message ID + pub fn message_id(&self) -> Uuid { + match self { + BusMessage::Data { message_id, .. } => *message_id, + BusMessage::Control { message_id, .. } => *message_id, + BusMessage::Broadcast { message_id, .. } => *message_id, + BusMessage::Ack { message_id, .. } => *message_id, + } + } + + /// Get the sender address if available + pub fn from(&self) -> Option<&BusAddress> { + match self { + BusMessage::Data { from, .. } => Some(from), + BusMessage::Control { from, .. } => Some(from), + BusMessage::Broadcast { from, .. } => Some(from), + BusMessage::Ack { .. } => None, + } + } +} + +/// Device connection handle for the hardware bus +pub struct DeviceConnection { + pub address: BusAddress, + pub sender: mpsc::UnboundedSender, + pub receiver: mpsc::UnboundedReceiver, +} + +/// Virtual Hardware Bus implementation +pub struct HardwareBus { + devices: Arc>>>, + message_log: Arc>>, +} + +impl Default for HardwareBus { + fn default() -> Self { + Self::new() + } +} + +impl HardwareBus { + /// Create a new hardware bus + pub fn new() -> Self { + Self { + devices: Arc::new(RwLock::new(HashMap::new())), + message_log: Arc::new(RwLock::new(Vec::new())), + } + } + + /// Connect a device to the bus + pub async fn connect_device(&self, address: BusAddress) -> Result { + let (tx, rx) = mpsc::unbounded_channel(); + + { + let mut devices = self.devices.write().await; + if devices.contains_key(&address) { + return Err(HardwareError::generic(format!( + "Device {} already connected", address.name + ))); + } + devices.insert(address.clone(), tx.clone()); + } + + info!("Device {} connected to bus", address.name); + + // Send registration message to all other devices + let register_msg = BusMessage::Control { + from: address.clone(), + command: ControlCommand::Register { + address: address.clone(), + }, + message_id: Uuid::new_v4(), + }; + + self.broadcast_message(register_msg).await?; + + Ok(DeviceConnection { + address, + sender: tx, + receiver: rx, + }) + } + + /// Disconnect a device from the bus + pub async fn disconnect_device(&self, address: &BusAddress) -> Result<()> { + { + let mut devices = self.devices.write().await; + devices.remove(address); + } + + info!("Device {} disconnected from bus", address.name); + + // Send unregistration message to all other devices + let unregister_msg = BusMessage::Control { + from: address.clone(), + command: ControlCommand::Unregister { + address: address.clone(), + }, + message_id: Uuid::new_v4(), + }; + + self.broadcast_message(unregister_msg).await?; + + Ok(()) + } + + /// Send a message to a specific device + pub async fn send_message(&self, message: BusMessage) -> Result<()> { + // Log the message + { + let mut log = self.message_log.write().await; + log.push(message.clone()); + } + + match &message { + BusMessage::Data { to, .. } => { + let devices = self.devices.read().await; + if let Some(sender) = devices.get(to) { + sender.send(message).map_err(|_| { + HardwareError::bus_communication("Failed to send message to device") + })?; + } else { + return Err(HardwareError::device_not_found(&to.name)); + } + } + BusMessage::Broadcast { .. } => { + self.broadcast_message(message).await?; + } + BusMessage::Control { .. } => { + self.broadcast_message(message).await?; + } + BusMessage::Ack { to, .. } => { + let devices = self.devices.read().await; + if let Some(sender) = devices.get(to) { + sender.send(message).map_err(|_| { + HardwareError::bus_communication("Failed to send ACK to device") + })?; + } else { + warn!("Attempted to send ACK to unknown device: {}", to.name); + } + } + } + + Ok(()) + } + + /// Broadcast a message to all connected devices + async fn broadcast_message(&self, message: BusMessage) -> Result<()> { + let devices = self.devices.read().await; + let sender_address = message.from(); + + for (address, sender) in devices.iter() { + // Don't send message back to sender + if let Some(from) = sender_address { + if address == from { + continue; + } + } + + if let Err(_) = sender.send(message.clone()) { + error!("Failed to broadcast message to device: {}", address.name); + } + } + + Ok(()) + } + + /// Get list of connected devices + pub async fn get_connected_devices(&self) -> Vec { + let devices = self.devices.read().await; + devices.keys().cloned().collect() + } + + /// Get message history + pub async fn get_message_history(&self) -> Vec { + let log = self.message_log.read().await; + log.clone() + } + + /// Clear message history + pub async fn clear_message_history(&self) { + let mut log = self.message_log.write().await; + log.clear(); + } + + /// Check if a device is connected + pub async fn is_device_connected(&self, address: &BusAddress) -> bool { + let devices = self.devices.read().await; + devices.contains_key(address) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tokio_test; + + #[tokio::test] + async fn test_bus_creation() { + let bus = HardwareBus::new(); + assert_eq!(bus.get_connected_devices().await.len(), 0); + } + + #[tokio::test] + async fn test_device_connection() { + let bus = HardwareBus::new(); + let address = BusAddress::new("test_device"); + + let connection = bus.connect_device(address.clone()).await.unwrap(); + assert_eq!(connection.address, address); + assert!(bus.is_device_connected(&address).await); + } + + #[tokio::test] + async fn test_device_disconnection() { + let bus = HardwareBus::new(); + let address = BusAddress::new("test_device"); + + let _connection = bus.connect_device(address.clone()).await.unwrap(); + assert!(bus.is_device_connected(&address).await); + + bus.disconnect_device(&address).await.unwrap(); + assert!(!bus.is_device_connected(&address).await); + } + + #[tokio::test] + async fn test_message_sending() { + let bus = HardwareBus::new(); + let addr1 = BusAddress::new("device1"); + let addr2 = BusAddress::new("device2"); + + let mut conn1 = bus.connect_device(addr1.clone()).await.unwrap(); + let _conn2 = bus.connect_device(addr2.clone()).await.unwrap(); + + let message = BusMessage::Data { + from: addr2.clone(), + to: addr1.clone(), + payload: b"test data".to_vec(), + message_id: Uuid::new_v4(), + }; + + bus.send_message(message.clone()).await.unwrap(); + + // Check if message was received + let received = conn1.receiver.recv().await.unwrap(); + match received { + BusMessage::Data { payload, .. } => { + assert_eq!(payload, b"test data"); + } + _ => panic!("Expected data message"), + } + } +} \ No newline at end of file diff --git a/crates/hardware/src/device.rs b/crates/hardware/src/device.rs new file mode 100644 index 0000000..21cb826 --- /dev/null +++ b/crates/hardware/src/device.rs @@ -0,0 +1,430 @@ +//! System Device Module +//! +//! Defines the interface and behavior for virtual hardware devices + +use crate::{BusAddress, BusMessage, HardwareError, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::{Duration, SystemTime}; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; +use uuid::Uuid; + +/// Device capabilities that can be advertised +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub enum DeviceCapability { + /// GPS positioning capability + Gps, + /// Radar detection capability + Radar, + /// AIS (Automatic Identification System) capability + Ais, + /// Engine monitoring capability + Engine, + /// Navigation capability + Navigation, + /// Communication capability + Communication, + /// Sensor data capability + Sensor, + /// Custom capability with name + Custom(String), +} + +impl DeviceCapability { + /// Get the capability name as a string + pub fn name(&self) -> &str { + match self { + DeviceCapability::Gps => "GPS", + DeviceCapability::Radar => "Radar", + DeviceCapability::Ais => "AIS", + DeviceCapability::Engine => "Engine", + DeviceCapability::Navigation => "Navigation", + DeviceCapability::Communication => "Communication", + DeviceCapability::Sensor => "Sensor", + DeviceCapability::Custom(name) => name, + } + } +} + +/// Current status of a device +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum DeviceStatus { + /// Device is initializing + Initializing, + /// Device is online and operational + Online, + /// Device is offline + Offline, + /// Device has encountered an error + Error { message: String }, + /// Device is in maintenance mode + Maintenance, +} + +/// Device configuration parameters +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeviceConfig { + /// Device name + pub name: String, + /// Device capabilities + pub capabilities: Vec, + /// Update interval in milliseconds + pub update_interval_ms: u64, + /// Maximum message queue size + pub max_queue_size: usize, + /// Device-specific configuration + pub custom_config: HashMap, +} + +impl Default for DeviceConfig { + fn default() -> Self { + Self { + name: "Unknown Device".to_string(), + capabilities: vec![], + update_interval_ms: 1000, + max_queue_size: 100, + custom_config: HashMap::new(), + } + } +} + +/// Device information for discovery and identification +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DeviceInfo { + /// Device address + pub address: BusAddress, + /// Device configuration + pub config: DeviceConfig, + /// Current status + pub status: DeviceStatus, + /// Last seen timestamp + pub last_seen: SystemTime, + /// Device version + pub version: String, + /// Manufacturer information + pub manufacturer: String, +} + +/// Trait for implementing system devices +#[async_trait::async_trait] +pub trait SystemDevice: Send + Sync { + /// Initialize the device + async fn initialize(&mut self) -> Result<()>; + + /// Start the device operation + async fn start(&mut self) -> Result<()>; + + /// Stop the device operation + async fn stop(&mut self) -> Result<()>; + + /// Get device information + fn get_info(&self) -> DeviceInfo; + + /// Get current device status + fn get_status(&self) -> DeviceStatus; + + /// Handle incoming bus message + async fn handle_message(&mut self, message: BusMessage) -> Result>; + + /// Process device-specific logic (called periodically) + async fn process(&mut self) -> Result>; + + /// Get device capabilities + fn get_capabilities(&self) -> Vec; + + /// Update device configuration + async fn update_config(&mut self, config: DeviceConfig) -> Result<()>; +} + +/// Base implementation for system devices +pub struct BaseSystemDevice { + pub info: DeviceInfo, + pub message_sender: Option>, + pub message_receiver: Option>, + pub is_running: bool, +} + +impl BaseSystemDevice { + /// Create a new base system device + pub fn new(config: DeviceConfig) -> Self { + let address = BusAddress::new(&config.name); + let info = DeviceInfo { + address, + config, + status: DeviceStatus::Initializing, + last_seen: SystemTime::now(), + version: "1.0.0".to_string(), + manufacturer: "Virtual Hardware".to_string(), + }; + + Self { + info, + message_sender: None, + message_receiver: None, + is_running: false, + } + } + + /// Set the message channels + pub fn set_message_channels( + &mut self, + sender: mpsc::UnboundedSender, + receiver: mpsc::UnboundedReceiver, + ) { + self.message_sender = Some(sender); + self.message_receiver = Some(receiver); + } + + /// Send a message through the bus + pub async fn send_message(&self, message: BusMessage) -> Result<()> { + if let Some(sender) = &self.message_sender { + sender.send(message).map_err(|_| { + HardwareError::bus_communication("Failed to send message from device") + })?; + } else { + return Err(HardwareError::generic("Device not connected to bus")); + } + Ok(()) + } + + /// Update device status + pub fn set_status(&mut self, status: DeviceStatus) { + self.info.status = status; + self.info.last_seen = SystemTime::now(); + } + + /// Check if device is running + pub fn is_running(&self) -> bool { + self.is_running + } +} + +#[async_trait::async_trait] +impl SystemDevice for BaseSystemDevice { + async fn initialize(&mut self) -> Result<()> { + info!("Initializing device: {}", self.info.config.name); + self.set_status(DeviceStatus::Initializing); + + // Simulate initialization delay + tokio::time::sleep(Duration::from_millis(100)).await; + + self.set_status(DeviceStatus::Online); + Ok(()) + } + + async fn start(&mut self) -> Result<()> { + info!("Starting device: {}", self.info.config.name); + self.is_running = true; + self.set_status(DeviceStatus::Online); + Ok(()) + } + + async fn stop(&mut self) -> Result<()> { + info!("Stopping device: {}", self.info.config.name); + self.is_running = false; + self.set_status(DeviceStatus::Offline); + Ok(()) + } + + fn get_info(&self) -> DeviceInfo { + self.info.clone() + } + + fn get_status(&self) -> DeviceStatus { + self.info.status.clone() + } + + async fn handle_message(&mut self, message: BusMessage) -> Result> { + debug!("Device {} received message: {:?}", self.info.config.name, message); + + match message { + BusMessage::Control { command, .. } => { + match command { + crate::bus::ControlCommand::Ping { target } => { + if target == self.info.address { + let pong = BusMessage::Control { + from: self.info.address.clone(), + command: crate::bus::ControlCommand::Pong { + from: self.info.address.clone(), + }, + message_id: Uuid::new_v4(), + }; + return Ok(Some(pong)); + } + } + _ => {} + } + } + _ => {} + } + + Ok(None) + } + + async fn process(&mut self) -> Result> { + // Base implementation does nothing + Ok(vec![]) + } + + fn get_capabilities(&self) -> Vec { + self.info.config.capabilities.clone() + } + + async fn update_config(&mut self, config: DeviceConfig) -> Result<()> { + info!("Updating config for device: {}", self.info.config.name); + self.info.config = config; + Ok(()) + } +} + +/// Device manager for handling multiple devices +pub struct DeviceManager { + devices: HashMap>, +} + +impl DeviceManager { + /// Create a new device manager + pub fn new() -> Self { + Self { + devices: HashMap::new(), + } + } + + /// Add a device to the manager + pub fn add_device(&mut self, device: Box) { + let address = device.get_info().address.clone(); + self.devices.insert(address, device); + } + + /// Remove a device from the manager + pub fn remove_device(&mut self, address: &BusAddress) -> Option> { + self.devices.remove(address) + } + + /// Get a device by address + pub fn get_device(&self, address: &BusAddress) -> Option<&dyn SystemDevice> { + self.devices.get(address).map(|d| d.as_ref()) + } + + /// Get a mutable device by address + pub fn get_device_mut(&mut self, address: &BusAddress) -> Option<&mut Box> { + self.devices.get_mut(address) + } + + /// Initialize all devices + pub async fn initialize_all(&mut self) -> Result<()> { + for device in self.devices.values_mut() { + device.initialize().await?; + } + Ok(()) + } + + /// Start all devices + pub async fn start_all(&mut self) -> Result<()> { + for device in self.devices.values_mut() { + device.start().await?; + } + Ok(()) + } + + /// Stop all devices + pub async fn stop_all(&mut self) -> Result<()> { + for device in self.devices.values_mut() { + device.stop().await?; + } + Ok(()) + } + + /// Get all device information + pub fn get_all_device_info(&self) -> Vec { + self.devices.values().map(|d| d.get_info()).collect() + } + + /// Process all devices + pub async fn process_all(&mut self) -> Result> { + let mut messages = Vec::new(); + for device in self.devices.values_mut() { + let device_messages = device.process().await?; + messages.extend(device_messages); + } + Ok(messages) + } +} + +impl Default for DeviceManager { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_device_creation() { + let config = DeviceConfig { + name: "Test Device".to_string(), + capabilities: vec![DeviceCapability::Gps], + ..Default::default() + }; + + let device = BaseSystemDevice::new(config); + assert_eq!(device.info.config.name, "Test Device"); + assert_eq!(device.info.status, DeviceStatus::Initializing); + } + + #[tokio::test] + async fn test_device_initialization() { + let config = DeviceConfig { + name: "Test Device".to_string(), + ..Default::default() + }; + + let mut device = BaseSystemDevice::new(config); + device.initialize().await.unwrap(); + assert_eq!(device.get_status(), DeviceStatus::Online); + } + + #[tokio::test] + async fn test_device_start_stop() { + let config = DeviceConfig { + name: "Test Device".to_string(), + ..Default::default() + }; + + let mut device = BaseSystemDevice::new(config); + + device.start().await.unwrap(); + assert!(device.is_running()); + assert_eq!(device.get_status(), DeviceStatus::Online); + + device.stop().await.unwrap(); + assert!(!device.is_running()); + assert_eq!(device.get_status(), DeviceStatus::Offline); + } + + #[tokio::test] + async fn test_device_manager() { + let mut manager = DeviceManager::new(); + + let config = DeviceConfig { + name: "Test Device".to_string(), + ..Default::default() + }; + + let device = Box::new(BaseSystemDevice::new(config)); + let address = device.get_info().address.clone(); + + manager.add_device(device); + assert!(manager.get_device(&address).is_some()); + + manager.initialize_all().await.unwrap(); + manager.start_all().await.unwrap(); + + let info = manager.get_all_device_info(); + assert_eq!(info.len(), 1); + assert_eq!(info[0].status, DeviceStatus::Online); + } +} diff --git a/crates/hardware/src/discovery_protocol.rs b/crates/hardware/src/discovery_protocol.rs new file mode 100644 index 0000000..20e7565 --- /dev/null +++ b/crates/hardware/src/discovery_protocol.rs @@ -0,0 +1,561 @@ +//! Discovery Protocol Module +//! +//! Provides device discovery and capability advertisement functionality + +use crate::{BusAddress, BusMessage, DeviceCapability, DeviceInfo, HardwareError, Result}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use std::time::{Duration, SystemTime}; +use tokio::sync::{mpsc, RwLock}; +use tracing::{debug, info, warn}; +use uuid::Uuid; +use std::sync::Arc; + +/// Discovery message types +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum DiscoveryMessage { + /// Announce device presence and capabilities + Announce { + device_info: DeviceInfo, + timestamp: SystemTime, + }, + /// Request device information + Discover { + requester: BusAddress, + filter: Option, + timestamp: SystemTime, + }, + /// Response to discovery request + DiscoverResponse { + devices: Vec, + responder: BusAddress, + timestamp: SystemTime, + }, + /// Heartbeat to maintain presence + Heartbeat { + device: BusAddress, + timestamp: SystemTime, + }, + /// Device going offline notification + Goodbye { + device: BusAddress, + timestamp: SystemTime, + }, +} + +/// Filter criteria for device discovery +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DiscoveryFilter { + /// Filter by device capabilities + pub capabilities: Option>, + /// Filter by device name pattern + pub name_pattern: Option, + /// Filter by manufacturer + pub manufacturer: Option, + /// Filter by minimum version + pub min_version: Option, +} + +impl DiscoveryFilter { + /// Create a new empty filter + pub fn new() -> Self { + Self { + capabilities: None, + name_pattern: None, + manufacturer: None, + min_version: None, + } + } + + /// Filter by capabilities + pub fn with_capabilities(mut self, capabilities: Vec) -> Self { + self.capabilities = Some(capabilities); + self + } + + /// Filter by name pattern + pub fn with_name_pattern(mut self, pattern: impl Into) -> Self { + self.name_pattern = Some(pattern.into()); + self + } + + /// Filter by manufacturer + pub fn with_manufacturer(mut self, manufacturer: impl Into) -> Self { + self.manufacturer = Some(manufacturer.into()); + self + } + + /// Check if device matches this filter + pub fn matches(&self, device_info: &DeviceInfo) -> bool { + // Check capabilities + if let Some(required_caps) = &self.capabilities { + let device_caps = &device_info.config.capabilities; + if !required_caps.iter().all(|cap| device_caps.contains(cap)) { + return false; + } + } + + // Check name pattern (simple substring match) + if let Some(pattern) = &self.name_pattern { + if !device_info.config.name.contains(pattern) { + return false; + } + } + + // Check manufacturer + if let Some(manufacturer) = &self.manufacturer { + if device_info.manufacturer != *manufacturer { + return false; + } + } + + // Check version (simple string comparison for now) + if let Some(min_version) = &self.min_version { + if device_info.version < *min_version { + return false; + } + } + + true + } +} + +impl Default for DiscoveryFilter { + fn default() -> Self { + Self::new() + } +} + +/// Discovery protocol configuration +#[derive(Debug, Clone)] +pub struct DiscoveryConfig { + /// How often to send heartbeat messages (in seconds) + pub heartbeat_interval: Duration, + /// How long to wait before considering a device offline (in seconds) + pub device_timeout: Duration, + /// How often to clean up expired devices (in seconds) + pub cleanup_interval: Duration, + /// Maximum number of devices to track + pub max_devices: usize, +} + +impl Default for DiscoveryConfig { + fn default() -> Self { + Self { + heartbeat_interval: Duration::from_secs(30), + device_timeout: Duration::from_secs(90), + cleanup_interval: Duration::from_secs(60), + max_devices: 1000, + } + } +} + +/// Discovery protocol implementation +pub struct DiscoveryProtocol { + /// Local device information + local_device: DeviceInfo, + /// Known devices registry + known_devices: Arc>>, + /// Configuration + config: DiscoveryConfig, + /// Message sender for bus communication + message_sender: Option>, + /// Discovery message receiver + discovery_receiver: Option>, + /// Running state + is_running: bool, +} + +impl DiscoveryProtocol { + /// Create a new discovery protocol instance + pub fn new(local_device: DeviceInfo, config: DiscoveryConfig) -> Self { + Self { + local_device, + known_devices: Arc::new(RwLock::new(HashMap::new())), + config, + message_sender: None, + discovery_receiver: None, + is_running: false, + } + } + + /// Set the message sender for bus communication + pub fn set_message_sender(&mut self, sender: mpsc::UnboundedSender) { + self.message_sender = Some(sender); + } + + /// Set the discovery message receiver + pub fn set_discovery_receiver(&mut self, receiver: mpsc::UnboundedReceiver) { + self.discovery_receiver = Some(receiver); + } + + /// Start the discovery protocol + pub async fn start(&mut self) -> Result<()> { + info!("Starting discovery protocol for device: {}", self.local_device.config.name); + self.is_running = true; + + // Send initial announcement + self.announce_device().await?; + + Ok(()) + } + + /// Stop the discovery protocol + pub async fn stop(&mut self) -> Result<()> { + info!("Stopping discovery protocol for device: {}", self.local_device.config.name); + self.is_running = false; + + // Send goodbye message + self.send_goodbye().await?; + + Ok(()) + } + + /// Announce this device to the network + pub async fn announce_device(&self) -> Result<()> { + let announcement = DiscoveryMessage::Announce { + device_info: self.local_device.clone(), + timestamp: SystemTime::now(), + }; + + self.send_discovery_message(announcement).await + } + + /// Send heartbeat to maintain presence + pub async fn send_heartbeat(&self) -> Result<()> { + let heartbeat = DiscoveryMessage::Heartbeat { + device: self.local_device.address.clone(), + timestamp: SystemTime::now(), + }; + + self.send_discovery_message(heartbeat).await + } + + /// Send goodbye message when going offline + pub async fn send_goodbye(&self) -> Result<()> { + let goodbye = DiscoveryMessage::Goodbye { + device: self.local_device.address.clone(), + timestamp: SystemTime::now(), + }; + + self.send_discovery_message(goodbye).await + } + + /// Discover devices on the network + pub async fn discover_devices(&self, filter: Option) -> Result<()> { + let discover_msg = DiscoveryMessage::Discover { + requester: self.local_device.address.clone(), + filter, + timestamp: SystemTime::now(), + }; + + self.send_discovery_message(discover_msg).await + } + + /// Get all known devices + pub async fn get_known_devices(&self) -> Vec { + let devices = self.known_devices.read().await; + devices.values().cloned().collect() + } + + /// Get devices matching a filter + pub async fn get_devices_by_filter(&self, filter: &DiscoveryFilter) -> Vec { + let devices = self.known_devices.read().await; + devices + .values() + .filter(|device| filter.matches(device)) + .cloned() + .collect() + } + + /// Get device by address + pub async fn get_device(&self, address: &BusAddress) -> Option { + let devices = self.known_devices.read().await; + devices.get(address).cloned() + } + + /// Handle incoming discovery message + pub async fn handle_discovery_message(&self, message: DiscoveryMessage) -> Result<()> { + match message { + DiscoveryMessage::Announce { device_info, .. } => { + self.handle_device_announcement(device_info).await + } + DiscoveryMessage::Discover { requester, filter, .. } => { + self.handle_discovery_request(requester, filter).await + } + DiscoveryMessage::DiscoverResponse { devices, .. } => { + self.handle_discovery_response(devices).await + } + DiscoveryMessage::Heartbeat { device, timestamp } => { + self.handle_heartbeat(device, timestamp).await + } + DiscoveryMessage::Goodbye { device, .. } => { + self.handle_goodbye(device).await + } + } + } + + /// Handle device announcement + async fn handle_device_announcement(&self, device_info: DeviceInfo) -> Result<()> { + info!("Device announced: {}", device_info.config.name); + + let mut devices = self.known_devices.write().await; + devices.insert(device_info.address.clone(), device_info); + + Ok(()) + } + + /// Handle discovery request + async fn handle_discovery_request( + &self, + requester: BusAddress, + filter: Option, + ) -> Result<()> { + debug!("Discovery request from: {}", requester.name); + + let devices = self.known_devices.read().await; + let mut matching_devices = vec![self.local_device.clone()]; // Include self + + // Add matching known devices + for device in devices.values() { + if let Some(ref filter) = filter { + if filter.matches(device) { + matching_devices.push(device.clone()); + } + } else { + matching_devices.push(device.clone()); + } + } + + drop(devices); // Release the lock + + let response = DiscoveryMessage::DiscoverResponse { + devices: matching_devices, + responder: self.local_device.address.clone(), + timestamp: SystemTime::now(), + }; + + self.send_discovery_message(response).await + } + + /// Handle discovery response + async fn handle_discovery_response(&self, devices: Vec) -> Result<()> { + debug!("Received discovery response with {} devices", devices.len()); + + let mut known_devices = self.known_devices.write().await; + for device in devices { + // Don't add ourselves + if device.address != self.local_device.address { + known_devices.insert(device.address.clone(), device); + } + } + + Ok(()) + } + + /// Handle heartbeat message + async fn handle_heartbeat(&self, device: BusAddress, timestamp: SystemTime) -> Result<()> { + debug!("Heartbeat from device: {}", device.name); + + let mut devices = self.known_devices.write().await; + if let Some(device_info) = devices.get_mut(&device) { + device_info.last_seen = timestamp; + } + + Ok(()) + } + + /// Handle goodbye message + async fn handle_goodbye(&self, device: BusAddress) -> Result<()> { + info!("Device going offline: {}", device.name); + + let mut devices = self.known_devices.write().await; + devices.remove(&device); + + Ok(()) + } + + /// Clean up expired devices + pub async fn cleanup_expired_devices(&self) -> Result<()> { + let now = SystemTime::now(); + let timeout = self.config.device_timeout; + + let mut devices = self.known_devices.write().await; + let mut expired_devices = Vec::new(); + + for (address, device_info) in devices.iter() { + if let Ok(elapsed) = now.duration_since(device_info.last_seen) { + if elapsed > timeout { + expired_devices.push(address.clone()); + } + } + } + + for address in expired_devices { + warn!("Removing expired device: {}", address.name); + devices.remove(&address); + } + + Ok(()) + } + + /// Send discovery message over the bus + async fn send_discovery_message(&self, discovery_msg: DiscoveryMessage) -> Result<()> { + if let Some(sender) = &self.message_sender { + let payload = serde_json::to_vec(&discovery_msg)?; + + let bus_message = BusMessage::Broadcast { + from: self.local_device.address.clone(), + payload, + message_id: Uuid::new_v4(), + }; + + sender.send(bus_message).map_err(|_| { + HardwareError::bus_communication("Failed to send discovery message") + })?; + } else { + return Err(HardwareError::generic("Discovery protocol not connected to bus")); + } + + Ok(()) + } + + /// Run the discovery protocol main loop + pub async fn run(&mut self) -> Result<()> { + let mut heartbeat_timer = tokio::time::interval(self.config.heartbeat_interval); + let mut cleanup_timer = tokio::time::interval(self.config.cleanup_interval); + + while self.is_running { + tokio::select! { + _ = heartbeat_timer.tick() => { + if let Err(e) = self.send_heartbeat().await { + warn!("Failed to send heartbeat: {}", e); + } + } + _ = cleanup_timer.tick() => { + if let Err(e) = self.cleanup_expired_devices().await { + warn!("Failed to cleanup expired devices: {}", e); + } + } + // Handle incoming discovery messages if receiver is set + msg = async { + if let Some(ref mut receiver) = self.discovery_receiver { + receiver.recv().await + } else { + std::future::pending().await + } + } => { + if let Some(discovery_msg) = msg { + if let Err(e) = self.handle_discovery_message(discovery_msg).await { + warn!("Failed to handle discovery message: {}", e); + } + } + } + } + } + + Ok(()) + } + + /// Check if the protocol is running + pub fn is_running(&self) -> bool { + self.is_running + } + + /// Get the number of known devices + pub async fn device_count(&self) -> usize { + let devices = self.known_devices.read().await; + devices.len() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::{DeviceConfig, DeviceStatus}; + + fn create_test_device_info(name: &str) -> DeviceInfo { + DeviceInfo { + address: BusAddress::new(name), + config: DeviceConfig { + name: name.to_string(), + capabilities: vec![DeviceCapability::Gps], + ..Default::default() + }, + status: DeviceStatus::Online, + last_seen: SystemTime::now(), + version: "1.0.0".to_string(), + manufacturer: "Test Manufacturer".to_string(), + } + } + + #[tokio::test] + async fn test_discovery_protocol_creation() { + let device_info = create_test_device_info("test_device"); + let config = DiscoveryConfig::default(); + let protocol = DiscoveryProtocol::new(device_info, config); + + assert!(!protocol.is_running()); + assert_eq!(protocol.device_count().await, 0); + } + + #[tokio::test] + async fn test_device_announcement() { + let device_info = create_test_device_info("test_device"); + let config = DiscoveryConfig::default(); + let protocol = DiscoveryProtocol::new(device_info.clone(), config); + + let other_device = create_test_device_info("other_device"); + protocol.handle_device_announcement(other_device.clone()).await.unwrap(); + + let known_devices = protocol.get_known_devices().await; + assert_eq!(known_devices.len(), 1); + assert_eq!(known_devices[0].config.name, "other_device"); + } + + #[tokio::test] + async fn test_discovery_filter() { + let filter = DiscoveryFilter::new() + .with_capabilities(vec![DeviceCapability::Gps]) + .with_name_pattern("test"); + + let device_info = create_test_device_info("test_device"); + assert!(filter.matches(&device_info)); + + let other_device = DeviceInfo { + address: BusAddress::new("other"), + config: DeviceConfig { + name: "other".to_string(), + capabilities: vec![DeviceCapability::Radar], + ..Default::default() + }, + status: DeviceStatus::Online, + last_seen: SystemTime::now(), + version: "1.0.0".to_string(), + manufacturer: "Test".to_string(), + }; + assert!(!filter.matches(&other_device)); + } + + #[tokio::test] + async fn test_device_cleanup() { + let device_info = create_test_device_info("test_device"); + let mut config = DiscoveryConfig::default(); + config.device_timeout = Duration::from_millis(100); + + let protocol = DiscoveryProtocol::new(device_info, config); + + // Add a device with old timestamp + let mut old_device = create_test_device_info("old_device"); + old_device.last_seen = SystemTime::now() - Duration::from_secs(200); + + protocol.handle_device_announcement(old_device).await.unwrap(); + assert_eq!(protocol.device_count().await, 1); + + // Wait and cleanup + tokio::time::sleep(Duration::from_millis(150)).await; + protocol.cleanup_expired_devices().await.unwrap(); + + assert_eq!(protocol.device_count().await, 0); + } +} \ No newline at end of file diff --git a/crates/hardware/src/error.rs b/crates/hardware/src/error.rs new file mode 100644 index 0000000..50eb773 --- /dev/null +++ b/crates/hardware/src/error.rs @@ -0,0 +1,72 @@ +//! Error types for the hardware abstraction layer + +use thiserror::Error; + +/// Result type alias for hardware operations +pub type Result = std::result::Result; + +/// Common error types for hardware operations +#[derive(Error, Debug)] +pub enum HardwareError { + /// Device not found on the bus + #[error("Device not found: {device_id}")] + DeviceNotFound { device_id: String }, + + /// Bus communication error + #[error("Bus communication error: {message}")] + BusCommunicationError { message: String }, + + /// Device is not responding + #[error("Device not responding: {device_id}")] + DeviceNotResponding { device_id: String }, + + /// Invalid device capability + #[error("Invalid device capability: {capability}")] + InvalidCapability { capability: String }, + + /// Discovery protocol error + #[error("Discovery protocol error: {message}")] + DiscoveryError { message: String }, + + /// Device initialization error + #[error("Device initialization failed: {device_id}, reason: {reason}")] + InitializationError { device_id: String, reason: String }, + + /// Serialization/Deserialization error + #[error("Serialization error: {0}")] + SerializationError(#[from] serde_json::Error), + + /// Generic hardware error + #[error("Hardware error: {message}")] + Generic { message: String }, +} + +impl HardwareError { + /// Create a new generic hardware error + pub fn generic(message: impl Into) -> Self { + Self::Generic { + message: message.into(), + } + } + + /// Create a new bus communication error + pub fn bus_communication(message: impl Into) -> Self { + Self::BusCommunicationError { + message: message.into(), + } + } + + /// Create a new device not found error + pub fn device_not_found(device_id: impl Into) -> Self { + Self::DeviceNotFound { + device_id: device_id.into(), + } + } + + /// Create a new discovery error + pub fn discovery_error(message: impl Into) -> Self { + Self::DiscoveryError { + message: message.into(), + } + } +} diff --git a/crates/hardware/src/lib.rs b/crates/hardware/src/lib.rs new file mode 100644 index 0000000..854eaf0 --- /dev/null +++ b/crates/hardware/src/lib.rs @@ -0,0 +1,27 @@ +//! Virtual Hardware Abstraction Layer +//! +//! This crate provides a common abstraction for virtual hardware components +//! including a hardware bus, system devices, and discovery protocols. + +#![allow(clippy::type_complexity)] + +pub mod bus; +pub mod device; +pub mod discovery_protocol; +pub mod error; + +// Re-export main types +pub use bus::{HardwareBus, BusMessage, BusAddress}; +pub use device::{SystemDevice, DeviceCapability, DeviceStatus, DeviceInfo, DeviceConfig}; +pub use discovery_protocol::{DiscoveryProtocol, DiscoveryMessage}; +pub use error::{HardwareError, Result}; + +/// Common traits and types used throughout the hardware abstraction layer +pub mod prelude { + pub use crate::{ + HardwareBus, BusMessage, BusAddress, + SystemDevice, DeviceCapability, DeviceStatus, DeviceInfo, DeviceConfig, + DiscoveryProtocol, DiscoveryMessage, + HardwareError, Result, + }; +}