diff --git a/Cargo.toml b/Cargo.toml index d9ecd0c..f35ecf5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ categories = ["network-programming", "asynchronous"] path = "src/bellande_mesh_sync.rs" [dependencies] -tokio = { version = "1.28", features = ["rt", "net", "time", "sync", "macros"] } +tokio = { version = "1.0", features = ["full"] } hyper-rustls = { version = "0.24", features = ["http1", "http2"] } serde = { version = "1.0", features = ["derive"] } hyper = { version = "0.14", features = ["full"] } @@ -24,3 +24,8 @@ serde_json = "1.0" bincode = "1.3" rand = "0.8" uuid = { version = "1.3", features = ["v4", "serde"] } +rustls = "0.21" +chrono = { version = "0.4", features = ["serde"] } +sled = "0.34" +tokio-util = { version = "0.7", features = ["full"] } +async-trait = "0.1" diff --git a/README.md b/README.md index c86aa59..9a99412 100644 --- a/README.md +++ b/README.md @@ -32,6 +32,38 @@ - Token validation - Node authentication +# Usage +``` +use bellande_mesh_sync::{init, init_with_options, start, stop, MeshOptions, Config}; + +async fn example() -> Result<(), BellandeMeshError> { + // Basic initialization + let config = Config { + listen_address: "127.0.0.1:8000".to_string(), + node_timeout: 300, + }; + let mesh = init(config.clone()).await?; + start(&mesh).await?; + + // Or with custom options + let options = MeshOptions { + dev_mode: true, + metrics_interval: Some(30), + enable_persistence: true, + ..Default::default() + }; + let mesh = init_with_options(config, options).await?; + start(&mesh).await?; + + // Use other functionalities + broadcast(&mesh, b"Hello network!".to_vec()).await?; + let stats = get_stats(&mesh).await?; + let nodes = get_nodes(&mesh).await?; + + stop(&mesh).await?; + Ok(()) +} +``` ## Website Crates - https://crates.io/crates/bellande_mesh_sync diff --git a/src/bellande_mesh_sync.rs b/src/bellande_mesh_sync.rs index 9843fa8..478001b 100644 --- a/src/bellande_mesh_sync.rs +++ b/src/bellande_mesh_sync.rs @@ -13,6 +13,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use std::path::PathBuf; +use std::sync::Arc; + mod config; mod data; mod dht; @@ -26,17 +29,152 @@ mod persistence; pub use crate::config::config::Config; pub use crate::error::error::BellandeMeshError; pub use crate::mesh::mesh::BellandeMeshSync; +use crate::mesh::mesh::NetworkStats; +pub use crate::metrics::metrics::MetricsManager; +pub use crate::node::node::{Node, NodeId, PublicKey}; +pub use crate::persistence::persistence::PersistenceManager; -/// Initialize the BellandeMeshSync System -/// This function takes a configuration and sets up the BellandeMeshSync System. -/// It returns a BellandeMeshSync instance that can be used to interact with the mesh network. +/// Configuration options for initializing the BellandeMeshSync system +#[derive(Debug, Clone)] +pub struct MeshOptions { + /// Path to the TLS certificate file + pub cert_path: Option, + /// Path to the TLS private key file + pub key_path: Option, + /// Enable development mode (generates self-signed certificates) + pub dev_mode: bool, + /// Custom metrics collection interval in seconds + pub metrics_interval: Option, + /// Maximum number of concurrent connections + pub max_connections: Option, + /// Enable persistence layer + pub enable_persistence: bool, + /// Custom persistence directory + pub persistence_path: Option, +} + +impl Default for MeshOptions { + fn default() -> Self { + Self { + cert_path: None, + key_path: None, + dev_mode: false, + metrics_interval: Some(60), + max_connections: Some(1000), + enable_persistence: false, + persistence_path: None, + } + } +} + +/// Initialize the BellandeMeshSync System with default options pub async fn init(config: Config) -> Result { - let bellande_mesh = BellandeMeshSync::new(&config)?; + init_with_options(config, MeshOptions::default()).await +} + +/// Initialize the BellandeMeshSync System with custom options +pub async fn init_with_options( + config: Config, + options: MeshOptions, +) -> Result { + // Initialize metrics manager + let metrics = Arc::new(MetricsManager::new()); + + // Initialize persistence if enabled + let persistence_manager = if options.enable_persistence { + let path = options + .persistence_path + .unwrap_or_else(|| PathBuf::from("data")); + Some(PersistenceManager::new(path.to_str().unwrap_or("data"))?) + } else { + None + }; + + // Initialize the mesh network with appropriate certificates + let bellande_mesh = match (options.cert_path, options.key_path, options.dev_mode) { + (Some(cert), Some(key), false) => { + BellandeMeshSync::new_with_certs(&config, cert, key, metrics.clone())? + } + (None, None, true) => { + // Use default certificates for development + let cert = PathBuf::from("certs/server.crt"); + let key = PathBuf::from("certs/server.key"); + BellandeMeshSync::new_with_certs(&config, cert, key, metrics.clone())? + } + (None, None, false) => BellandeMeshSync::new_with_metrics(&config, metrics.clone())?, + _ => return Err(BellandeMeshError::Custom( + "Invalid certificate configuration. Provide both cert and key paths, or enable dev mode, or neither.".into() + )), + }; + + // Restore persisted data if available + if let Some(persistence) = &persistence_manager { + let nodes = persistence.load_nodes()?; + bellande_mesh.restore_nodes(nodes).await?; + + // Load data chunks for each node + for node in nodes { + let chunks = persistence.load_data_chunks(&node.id)?; + bellande_mesh.restore_data_chunks(node.id, chunks).await?; + } + } + + // Configure metrics collection if enabled + if let Some(interval) = options.metrics_interval { + bellande_mesh.start_metrics_collection(interval).await?; + } + + // Set connection limits if specified + if let Some(max_conn) = options.max_connections { + bellande_mesh.set_max_connections(max_conn).await?; + } + Ok(bellande_mesh) } /// Start the BellandeMeshSync System -/// This function takes a BellandeMeshSync instance and starts the mesh network operations. pub async fn start(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> { bellande_mesh.start().await } + +/// Stop the BellandeMeshSync System +pub async fn stop(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> { + bellande_mesh.stop().await +} + +/// Broadcast a message to all nodes in the network +pub async fn broadcast( + bellande_mesh: &BellandeMeshSync, + data: Vec, +) -> Result<(), BellandeMeshError> { + bellande_mesh.broadcast_data(data).await +} + +/// Get the current network statistics +pub async fn get_stats( + bellande_mesh: &BellandeMeshSync, +) -> Result { + bellande_mesh.get_network_stats().await +} + +/// Get the list of currently connected nodes +pub async fn get_nodes(bellande_mesh: &BellandeMeshSync) -> Result, BellandeMeshError> { + bellande_mesh.get_all_nodes().await +} + +/// Check if a specific node is connected to the network +pub async fn is_node_connected( + bellande_mesh: &BellandeMeshSync, + node_id: NodeId, +) -> Result { + bellande_mesh.is_node_connected(&node_id).await +} + +/// Send a message to a specific node +pub async fn send_to_node( + bellande_mesh: &BellandeMeshSync, + node_id: NodeId, + data: Vec, +) -> Result<(), BellandeMeshError> { + bellande_mesh.send_data_to_node(node_id, data).await +} diff --git a/src/data/data.rs b/src/data/data.rs index 09240c1..cecc86b 100644 --- a/src/data/data.rs +++ b/src/data/data.rs @@ -13,18 +13,18 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::node::node::NodeId; use serde::{Deserialize, Serialize}; use std::time::SystemTime; -use uuid::Uuid; #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] pub struct DataChunk { - pub id: Uuid, + pub id: NodeId, pub content: Vec, pub checksum: String, pub version: u64, pub last_modified: SystemTime, - pub author: Uuid, + pub author: NodeId, pub parent_versions: Vec, } @@ -33,17 +33,20 @@ impl DataChunk { content: Vec, checksum: String, version: u64, - author: Uuid, + author: NodeId, parent_versions: Vec, ) -> Self { Self { - id: Uuid::new_v4(), + id: NodeId::new(), content, checksum, version, last_modified: SystemTime::now(), - author, + author: NodeId::new(), parent_versions, } } + pub fn size(&self) -> usize { + std::mem::size_of::() * 2 + self.content.len() + std::mem::size_of::() + } } diff --git a/src/mesh/mesh.rs b/src/mesh/mesh.rs index b48f8aa..28f3a8b 100644 --- a/src/mesh/mesh.rs +++ b/src/mesh/mesh.rs @@ -14,25 +14,41 @@ // along with this program. If not, see . use crate::config::config::Config; +use crate::data::data::DataChunk; use crate::error::error::BellandeMeshError; -use crate::node::node::{DataChunk, Message, Node, NodeId, PublicKey}; +pub use crate::metrics::metrics::MetricsManager; +use crate::node::node::{Message, Node, NodeId, PublicKey}; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Client, Request, Response, Server, StatusCode}; use hyper_rustls::HttpsConnectorBuilder; +use rustls::{Certificate, PrivateKey, ServerConfig}; use serde::Serialize; +use serde_json; use std::convert::Infallible; +use std::fs; use std::io::{Read, Write}; use std::net::{SocketAddr, TcpStream}; +use std::path::Path; use std::sync::{Arc, RwLock}; use std::time::{Duration, SystemTime}; use tokio::net::{TcpListener as TokioTcpListener, UdpSocket as TokioUdpSocket}; -use tokio::sync::mpsc; +use tokio::signal; +use tokio::sync::{mpsc, Mutex}; use tokio::time::sleep; -use tokio_rustls::rustls::{Certificate, PrivateKey, ServerConfig}; -use tokio_rustls::TlsAcceptor; +use tokio_util::sync::CancellationToken; + +// Constants +const UDP_BUFFER_SIZE: usize = 65536; +const HTTP_PORT_OFFSET: u16 = 1; +const HTTPS_PORT_OFFSET: u16 = 2; +const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB +const CHANNEL_BUFFER_SIZE: usize = 1000; +const SYNC_INTERVAL: Duration = Duration::from_secs(60); +const CLEANUP_INTERVAL: Duration = Duration::from_secs(300); +const PING_INTERVAL: Duration = Duration::from_secs(30); #[derive(Debug, Clone, Serialize)] -struct NetworkStats { +pub struct NetworkStats { tcp_connections: usize, udp_packets_received: usize, http_requests: usize, @@ -66,12 +82,39 @@ pub struct BellandeMeshSync { http_client: Client, https_client: Client>, stats: Arc>, + metrics: Arc, message_tx: mpsc::Sender<(Message, SocketAddr)>, - message_rx: Arc>>, + message_rx: Arc>>, + cancel_token: CancellationToken, +} + +#[async_trait::async_trait] +pub trait MeshTransport: Send + Sync { + async fn start(&self) -> Result<(), BellandeMeshError>; + async fn stop(&self) -> Result<(), BellandeMeshError>; + async fn broadcast_data(&self, data: Vec) -> Result<(), BellandeMeshError>; + async fn get_network_stats(&self) -> Result; + async fn get_all_nodes(&self) -> Result, BellandeMeshError>; + async fn is_node_connected(&self, node_id: &NodeId) -> Result; + async fn send_data_to_node( + &self, + node_id: NodeId, + data: Vec, + ) -> Result<(), BellandeMeshError>; + async fn restore_data_chunks( + &self, + node_id: NodeId, + chunks: Vec, + ) -> Result<(), BellandeMeshError>; + async fn start_metrics_collection(&self, interval: u64) -> Result<(), BellandeMeshError>; + async fn set_max_connections(&self, max_conn: usize) -> Result<(), BellandeMeshError>; } impl BellandeMeshSync { - pub fn new(config: &Config) -> Result { + pub fn new_with_metrics( + config: &Config, + metrics: Arc, + ) -> Result { let tls_config = Self::create_tls_config()?; let http_client = Client::new(); let https_connector = HttpsConnectorBuilder::new() @@ -82,6 +125,7 @@ impl BellandeMeshSync { let https_client = Client::builder().build(https_connector); let (message_tx, message_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); + let stats = Arc::new(RwLock::new(NetworkStats::default())); Ok(Self { config: Arc::new(config.clone()), @@ -90,16 +134,58 @@ impl BellandeMeshSync { tls_config: Arc::new(tls_config), http_client, https_client, - stats: Arc::new(RwLock::new(NetworkStats::default())), + stats, + metrics, message_tx, - message_rx: Arc::new(tokio::sync::Mutex::new(message_rx)), + message_rx: Arc::new(Mutex::new(message_rx)), + cancel_token: CancellationToken::new(), }) } + pub fn new_with_certs( + config: &Config, + cert_path: std::path::PathBuf, + key_path: std::path::PathBuf, + metrics: Arc, + ) -> Result { + let tls_config = Self::create_tls_config_from_files(&cert_path, &key_path)?; + let http_client = Client::new(); + let https_connector = HttpsConnectorBuilder::new() + .with_native_roots() + .https_only() + .enable_http1() + .build(); + let https_client = Client::builder().build(https_connector); + + let (message_tx, message_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); + let stats = Arc::new(RwLock::new(NetworkStats::default())); + + Ok(Self { + config: Arc::new(config.clone()), + nodes: Arc::new(RwLock::new(Vec::new())), + running: Arc::new(RwLock::new(true)), + tls_config: Arc::new(tls_config), + http_client, + https_client, + stats, + metrics, + message_tx, + message_rx: Arc::new(Mutex::new(message_rx)), + cancel_token: CancellationToken::new(), + }) + } + + // Default TLS configuration using embedded certificates fn create_tls_config() -> Result { let cert_path = Path::new("certs/server.crt"); let key_path = Path::new("certs/server.key"); + Self::create_tls_config_from_files(cert_path, key_path) + } + fn create_tls_config_from_files( + cert_path: &Path, + key_path: &Path, + ) -> Result { let cert_data = fs::read(cert_path) .map_err(|e| BellandeMeshError::Custom(format!("Failed to read certificate: {}", e)))?; let key_data = fs::read(key_path) @@ -144,7 +230,8 @@ impl BellandeMeshSync { async fn start_message_handler(&self) -> Result<(), BellandeMeshError> { let handler = self.clone(); tokio::spawn(async move { - while let Some((message, addr)) = handler.message_rx.recv().await { + let mut rx = handler.message_rx.lock().await; + while let Some((message, addr)) = rx.recv().await { if let Err(e) = handler.handle_message_internal(message, addr).await { eprintln!("Message handling error from {}: {}", addr, e); } @@ -224,23 +311,40 @@ impl BellandeMeshSync { .map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?; let http_addr = SocketAddr::new(addr.ip(), addr.port() + HTTP_PORT_OFFSET); + let metrics = self.metrics.clone(); let handler = self.clone(); + let make_service = make_service_fn(move |_| { let handler = handler.clone(); + let metrics = metrics.clone(); + async move { Ok::<_, Infallible>(service_fn(move |req| { let handler = handler.clone(); - async move { handler.handle_http_request(req).await } + let metrics = metrics.clone(); + + async move { + metrics.increment_sync_operations(); + handler.handle_http_request(req).await + } })) } }); - let server = Server::bind(&http_addr).serve(make_service); - tokio::spawn(server); + let server = Server::bind(&http_addr) + .serve(make_service) + .with_graceful_shutdown(Self::shutdown_signal()); + tokio::spawn(server); Ok(()) } + async fn shutdown_signal() { + if let Err(e) = signal::ctrl_c().await { + eprintln!("Failed to install CTRL+C signal handler: {}", e); + } + } + async fn start_https_server(&self) -> Result<(), BellandeMeshError> { let addr = self .config @@ -248,27 +352,41 @@ impl BellandeMeshSync { .parse::() .map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?; let https_addr = SocketAddr::new(addr.ip(), addr.port() + HTTPS_PORT_OFFSET); - - let tls_cfg = self.tls_config.clone(); let handler = self.clone(); - let acceptor = TlsAcceptor::from(tls_cfg); + let tls_config = self.tls_config.clone(); let listener = TokioTcpListener::bind(https_addr).await?; tokio::spawn(async move { while handler.is_running().unwrap_or(false) { if let Ok((stream, addr)) = listener.accept().await { - let acceptor = acceptor.clone(); let handler = handler.clone(); + let tls_config = tls_config.clone(); tokio::spawn(async move { - match acceptor.accept(stream).await { - Ok(tls_stream) => { + // Create TLS server connection + let tls_conn = match rustls::ServerConnection::new(tls_config.clone()) { + Ok(conn) => conn, + Err(e) => { + eprintln!("TLS connection creation error: {}", e); + return; + } + }; + + // Create async TLS stream + let mut tls_stream = tokio_rustls::server::TlsStream::new(stream, tls_conn); + + // Perform TLS handshake + match tls_stream.do_handshake().await { + Ok(_) => { + // Handle secure connection if let Err(e) = handler.handle_https_connection(tls_stream).await { eprintln!("HTTPS error from {}: {}", addr, e); } } - Err(e) => eprintln!("TLS accept error from {}: {}", addr, e), + Err(e) => { + eprintln!("TLS handshake error from {}: {}", addr, e); + } } }); } @@ -451,22 +569,228 @@ impl BellandeMeshSync { Message::DataRequest { ids } => { self.handle_data_request(&ids, src).await?; } - Message::Ping { sender, token } => { - self.handle_ping(sender, token, src).await?; + Message::Ping { sender: _, token } => { + self.handle_ping(token, src).await?; } - Message::Pong { sender, token } => { - self.handle_pong(sender, token, src).await?; + Message::Pong { sender, token: _ } => { + self.handle_pong(sender, src).await?; + } + Message::Store { + key, + value, + sender: _, + token: _, + } => { + self.handle_store(key, value, src).await?; + } + Message::FindNode { + target, + sender: _, + token: _, + } => { + self.handle_find_node(target, src).await?; + } + Message::FindValue { + key, + sender: _, + token: _, + } => { + self.handle_find_value(key, src).await?; + } + Message::Nodes { + nodes, + sender: _, + token: _, + } => { + self.handle_nodes(nodes, src).await?; + } + Message::Value { + key, + value, + sender: _, + token: _, + } => { + self.handle_value(key, value, src).await?; } Message::Heartbeat => { self.update_node_last_seen(src).await?; } - _ => { - eprintln!("Unhandled message type from {}", src); + _ => {} + } + Ok(()) + } + + async fn handle_store( + &self, + key: Vec, + value: Vec, + src: SocketAddr, + ) -> Result<(), BellandeMeshError> { + self.metrics.increment_sync_operations(); + + let chunk = DataChunk::new( + value.clone(), + "checksum".to_string(), + 0, + self.get_local_id()?, + vec![], + ); + + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + if let Some(node) = nodes.iter().find(|n| n.address == src) { + let mut data = node + .data + .write() + .map_err(|_| BellandeMeshError::LockError)?; + data.insert(NodeId::from_bytes(&key), chunk); + + // Replicate to closest nodes + let closest_nodes = self.find_closest_nodes(&NodeId::from_bytes(&key), 3); + for target_node in closest_nodes { + let store_msg = Message::Store { + key: key.clone(), + value: value.clone(), + sender: self.get_local_id()?, + token: rand::random(), + }; + if let Err(e) = self.send_message(target_node.address, &store_msg).await { + eprintln!( + "Failed to replicate store to {}: {}", + target_node.address, e + ); + } + } + } + + Ok(()) + } + + async fn handle_find_node( + &self, + target: NodeId, + src: SocketAddr, + ) -> Result<(), BellandeMeshError> { + let closest_nodes = self.find_closest_nodes(&target, 20).await; + let response = Message::Nodes { + nodes: closest_nodes, + sender: self.get_local_id().await?, + token: rand::random(), + }; + self.send_message(src, &response).await?; + Ok(()) + } + + async fn handle_find_value( + &self, + key: Vec, + src: SocketAddr, + ) -> Result<(), BellandeMeshError> { + let node_id = NodeId::from_bytes(&key); + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + + for node in nodes.iter() { + if let Ok(data) = node.data.read() { + if let Some(chunk) = data.get(&node_id) { + let response = Message::Value { + key: key.clone(), + value: chunk.content.clone(), + sender: self.get_local_id()?, + token: rand::random(), + }; + return self.send_message(src, &response).await; + } + } + } + + // If not found, return closest nodes + let closest_nodes = self.find_closest_nodes(&node_id, 20); + let response = Message::Nodes { + nodes: closest_nodes, + sender: self.get_local_id()?, + token: rand::random(), + }; + self.send_message(src, &response).await?; + Ok(()) + } + + async fn handle_nodes( + &self, + new_nodes: Vec, + _src: SocketAddr, + ) -> Result<(), BellandeMeshError> { + let mut nodes = self.nodes.write().await; + for new_node in new_nodes { + if !nodes.iter().any(|n| n.id == new_node.id) { + nodes.push(new_node); + self.metrics.increment_sync_operations(); } } Ok(()) } + async fn handle_value( + &self, + key: Vec, + value: Vec, + _src: SocketAddr, + ) -> Result<(), BellandeMeshError> { + let chunk = DataChunk::new( + value, + "checksum".to_string(), + 0, + self.get_local_id()?, + vec![], + ); + + let mut nodes = self + .nodes + .write() + .map_err(|_| BellandeMeshError::LockError)?; + if let Some(node) = nodes.first_mut() { + if let Ok(mut data) = node.data.write() { + data.insert(NodeId::from_bytes(&key), chunk); + self.metrics.increment_sync_operations(); + } + } + Ok(()) + } + + async fn find_closest_nodes(&self, target: &NodeId, count: usize) -> Vec { + let nodes = self.nodes.read().await; + let mut distances: Vec<_> = nodes + .iter() + .map(|node| (node.clone(), self.calculate_distance(&node.id, target))) + .collect(); + + distances.sort_by(|a, b| a.1.cmp(&b.1)); + distances + .into_iter() + .take(count) + .map(|(node, _)| node) + .collect() + } + + // XOR distance metric for Kademlia-like routing + fn calculate_distance(&self, node1: &NodeId, node2: &NodeId) -> u64 { + let n1_bytes = node1.to_bytes(); + let n2_bytes = node2.to_bytes(); + let mut distance = 0u64; + + for i in 0..8 { + if i < n1_bytes.len() && i < n2_bytes.len() { + distance ^= (n1_bytes[i] ^ n2_bytes[i]) as u64; + } + } + + distance + } + async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> { let mut nodes = self .nodes @@ -560,12 +884,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_ping( - &self, - sender: NodeId, - token: u64, - src: SocketAddr, - ) -> Result<(), BellandeMeshError> { + async fn handle_ping(&self, token: u64, src: SocketAddr) -> Result<(), BellandeMeshError> { let response = Message::Pong { sender: self.get_local_id()?, token, @@ -574,12 +893,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_pong( - &self, - sender: NodeId, - token: u64, - src: SocketAddr, - ) -> Result<(), BellandeMeshError> { + async fn handle_pong(&self, sender: NodeId, src: SocketAddr) -> Result<(), BellandeMeshError> { let mut nodes = self .nodes .write() @@ -590,23 +904,35 @@ impl BellandeMeshSync { Ok(()) } - async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> { - let nodes = self - .nodes - .read() - .map_err(|_| BellandeMeshError::LockError)?; - let message = Message::JoinResponse { - accepted: true, - nodes: vec![new_node.clone()], - }; - - for node in nodes.iter() { - if node.id != new_node.id { - if let Err(e) = self.send_message(node.address, &message).await { - eprintln!("Failed to broadcast new node to {}: {}", node.address, e); - } + async fn restore_nodes(&self, nodes: Vec) -> Result<(), BellandeMeshError> { + let mut current_nodes = self.nodes.write().await; + for node in nodes { + if !current_nodes.iter().any(|n| n.id == node.id) { + self.metrics.increment_sync_operations(); + current_nodes.push(node); } } + Ok(()) + } + + pub async fn send_message( + &self, + addr: SocketAddr, + msg: &Message, + ) -> Result<(), BellandeMeshError> { + let data = + bincode::serialize(msg).map_err(|e| BellandeMeshError::Serialization(e.to_string()))?; + + if data.len() > MAX_MESSAGE_SIZE { + return Err(BellandeMeshError::Custom("Message too large".to_string())); + } + + let mut stream = + TcpStream::connect(addr).map_err(|e| BellandeMeshError::NetworkError(e.to_string()))?; + + stream.write_all(&(data.len() as u32).to_be_bytes())?; + stream.write_all(&data)?; + stream.flush()?; Ok(()) } @@ -647,6 +973,141 @@ impl BellandeMeshSync { Ok(()) } + pub async fn restore_data_chunks( + &self, + node_id: NodeId, + chunks: Vec, + ) -> Result<(), BellandeMeshError> { + let nodes = self.nodes.read().await; + if let Some(node) = nodes.iter().find(|n| n.id == node_id) { + let mut data = node + .data + .write() + .map_err(|_| BellandeMeshError::LockError)?; + for chunk in chunks { + data.insert(chunk.id.clone(), chunk); + self.metrics.increment_sync_operations(); + } + } + Ok(()) + } + + async fn send_response( + &self, + src: SocketAddr, + closest_nodes: Vec, + ) -> Result<(), BellandeMeshError> { + let response = Message::Nodes { + nodes: closest_nodes, + sender: self.get_local_id().await?, + token: rand::random(), + }; + self.send_message(src, &response).await + } + + async fn start_metrics_collection(&self) -> Result<(), BellandeMeshError> { + let metrics = self.metrics.clone(); + let running = self.running.clone(); + + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(60)); + while *running.read().await { + interval.tick().await; + if let Err(e) = metrics.update_metrics().await { + eprintln!("Metrics error: {}", e); + } + } + }); + + Ok(()) + } + + pub async fn set_max_connections(&self, max_conn: usize) -> Result<(), BellandeMeshError> { + let nodes = self.nodes.read().await; + if nodes.len() > max_conn { + // Remove excess connections, keeping the oldest/most reliable ones + let mut nodes = self.nodes.write().await; + nodes.sort_by(|a, b| b.last_seen.cmp(&a.last_seen)); + nodes.truncate(max_conn); + self.update_stats(|stats| stats.active_nodes = max_conn) + .await; + } + Ok(()) + } + + pub async fn broadcast_data(&self, data: Vec) -> Result<(), BellandeMeshError> { + let nodes = self.nodes.read().await; + let message = Message::Store { + key: vec![], // Broadcast key + value: data, + sender: self.get_local_id().await?, + token: rand::random(), + }; + + let mut send_errors = Vec::new(); + for node in nodes.iter() { + if let Err(e) = self.send_message(node.address, &message).await { + send_errors.push((node.id.clone(), e)); + } + } + + // Update metrics + self.metrics.increment_sync_operations(); + self.update_stats(|stats| stats.total_messages += 1).await; + + // If any errors occurred, return the first one + if let Some((node_id, error)) = send_errors.first() { + return Err(BellandeMeshError::Custom(format!( + "Failed to broadcast to node {}: {}", + node_id, error + ))); + } + + Ok(()) + } + + pub async fn get_network_stats(&self) -> Result { + let stats = self.stats.read().await; + Ok(NetworkStats { + tcp_connections: stats.tcp_connections, + udp_packets_received: stats.udp_packets_received, + http_requests: stats.http_requests, + https_requests: stats.https_requests, + active_nodes: self.nodes.read().await.len(), + total_messages: stats.total_messages, + start_time: stats.start_time, + last_sync: stats.last_sync, + }) + } + + pub async fn send_data_to_node( + &self, + node_id: NodeId, + data: Vec, + ) -> Result<(), BellandeMeshError> { + let nodes = self.nodes.read().await; + let node = nodes + .iter() + .find(|n| n.id == node_id) + .ok_or_else(|| BellandeMeshError::Custom("Node not found".to_string()))?; + + let message = Message::Store { + key: node_id.to_bytes(), + value: data, + sender: self.get_local_id().await?, + token: rand::random(), + }; + + // Send the message + self.send_message(node.address, &message).await?; + + // Update metrics + self.metrics.increment_sync_operations(); + self.update_stats(|stats| stats.total_messages += 1).await; + + Ok(()) + } + async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> { let timeout = Duration::from_secs(self.config.node_timeout); let mut nodes = self @@ -691,24 +1152,6 @@ impl BellandeMeshSync { Ok(()) } - async fn send_message(&self, addr: SocketAddr, msg: &Message) -> Result<(), BellandeMeshError> { - let data = - bincode::serialize(msg).map_err(|e| BellandeMeshError::Serialization(e.to_string()))?; - - if data.len() > MAX_MESSAGE_SIZE { - return Err(BellandeMeshError::Custom("Message too large".to_string())); - } - - let mut stream = - TcpStream::connect(addr).map_err(|e| BellandeMeshError::NetworkError(e.to_string()))?; - - stream.write_all(&(data.len() as u32).to_be_bytes())?; - stream.write_all(&data)?; - stream.flush()?; - - Ok(()) - } - async fn read_message(&self, stream: &mut TcpStream) -> Result { let mut len_buf = [0u8; 4]; stream.read_exact(&mut len_buf)?; @@ -729,8 +1172,9 @@ impl BellandeMeshSync { let stats = self .stats .read() - .map(|stats| stats.clone()) + .map(|guard| guard.clone()) .unwrap_or_else(|_| NetworkStats::default()); + serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "Error getting status".to_string()) } @@ -750,12 +1194,75 @@ impl BellandeMeshSync { .map(|guard| *guard) } - fn get_local_id(&self) -> Result { + pub async fn get_local_id(&self) -> Result { self.nodes .read() .map_err(|_| BellandeMeshError::LockError) .map(|nodes| nodes.first().map(|n| n.id).unwrap_or_else(|| NodeId::new())) } + + async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> { + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + let message = Message::JoinResponse { + accepted: true, + nodes: vec![new_node.clone()], + }; + + for node in nodes.iter() { + if node.id != new_node.id { + if let Err(e) = self.send_message(node.address, &message).await { + eprintln!("Failed to broadcast new node to {}: {}", node.address, e); + } + } + } + + Ok(()) + } + + pub async fn broadcast_message(&self, message: Message) -> Result<(), BellandeMeshError> { + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + + for node in nodes.iter() { + if let Err(e) = self.send_message(node.address, &message).await { + eprintln!("Failed to broadcast message to {}: {}", node.address, e); + } + } + + Ok(()) + } + + pub async fn get_node_count(&self) -> Result { + Ok(self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)? + .len()) + } + + pub async fn get_node_list(&self) -> Result, BellandeMeshError> { + Ok(self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)? + .iter() + .map(|node| node.id) + .collect()) + } + + pub async fn is_node_connected(&self, node_id: &NodeId) -> Result { + Ok(self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)? + .iter() + .any(|node| &node.id == node_id)) + } } impl Clone for BellandeMeshSync { @@ -768,8 +1275,10 @@ impl Clone for BellandeMeshSync { http_client: self.http_client.clone(), https_client: self.https_client.clone(), stats: Arc::clone(&self.stats), + metrics: Arc::clone(&self.metrics), message_tx: self.message_tx.clone(), message_rx: Arc::clone(&self.message_rx), + cancel_token: CancellationToken::new(), } } } diff --git a/src/node/node.rs b/src/node/node.rs index 4a8ff25..675a4ec 100644 --- a/src/node/node.rs +++ b/src/node/node.rs @@ -13,6 +13,7 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::data::data::DataChunk; use rand::{thread_rng, RngCore}; use serde::{Deserialize, Serialize}; use std::cmp::Ordering; @@ -31,6 +32,10 @@ impl NodeId { NodeId(id) } + pub fn to_bytes(&self) -> Vec { + bincode::serialize(self).unwrap_or_default() + } + pub fn from_bytes(bytes: &[u8]) -> Self { let mut id = [0u8; 32]; let len = std::cmp::min(bytes.len(), 32); @@ -68,29 +73,6 @@ impl PublicKey { } } -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct DataChunk { - pub id: NodeId, - pub content: Vec, - pub timestamp: SystemTime, - pub author: NodeId, -} - -impl DataChunk { - pub fn new(id: NodeId, content: Vec, author: NodeId) -> Self { - Self { - id, - content, - timestamp: SystemTime::now(), - author, - } - } - - pub fn size(&self) -> usize { - std::mem::size_of::() * 2 + self.content.len() + std::mem::size_of::() - } -} - #[derive(Debug, Clone, Serialize, Deserialize)] pub enum Message { Ping { @@ -203,7 +185,7 @@ impl Node { Ok(mut data) => { let now = SystemTime::now(); data.retain(|_, chunk| { - now.duration_since(chunk.timestamp) + now.duration_since(chunk.last_modified) .map(|age| age <= max_age) .unwrap_or(false) }); diff --git a/src/persistence/persistence.rs b/src/persistence/persistence.rs index be56af7..681d0fc 100644 --- a/src/persistence/persistence.rs +++ b/src/persistence/persistence.rs @@ -13,8 +13,9 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . +use crate::data::data::DataChunk; use crate::error::error::BellandeMeshError; -use crate::node::node::{DataChunk, Node, NodeId, PublicKey}; +use crate::node::node::{Node, NodeId, PublicKey}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::fs::{File, OpenOptions};