From e36f88bca4d577dae0910aa87c4af335c59580d4 Mon Sep 17 00:00:00 2001 From: RonaldsonBellande Date: Fri, 25 Oct 2024 21:46:27 -0400 Subject: [PATCH] latest pushes --- Cargo.toml | 3 +- src/bellande_mesh_sync.rs | 187 +++++++++++++++--- src/mesh/mesh.rs | 334 +++++++++++++++++++-------------- src/persistence/persistence.rs | 120 +++++++++--- 4 files changed, 454 insertions(+), 190 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index f35ecf5..b861b52 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,7 +8,7 @@ license = "GPL-3.0-or-later" repository = "https://github.com/Architecture-Mechanism/bellande_mesh_sync" documentation = "https://bellande-architecture-mechanism-research-innovation-center.org/bellande_mesh_sync/docs" readme = "README.md" -keywords = ["p2p", "synchronization", "distributed-systems", "mesh-network", "bellande_mesh_sync"] +keywords = ["synchronization", "distributed-systems", "mesh-network", "bellande_mesh_sync"] categories = ["network-programming", "asynchronous"] [lib] @@ -29,3 +29,4 @@ chrono = { version = "0.4", features = ["serde"] } sled = "0.34" tokio-util = { version = "0.7", features = ["full"] } async-trait = "0.1" +tokio-rustls = "0.24" diff --git a/src/bellande_mesh_sync.rs b/src/bellande_mesh_sync.rs index 478001b..03838de 100644 --- a/src/bellande_mesh_sync.rs +++ b/src/bellande_mesh_sync.rs @@ -51,6 +51,24 @@ pub struct MeshOptions { pub enable_persistence: bool, /// Custom persistence directory pub persistence_path: Option, + /// Node timeout duration in seconds + pub node_timeout: Option, + /// Enable automatic node discovery + pub enable_discovery: bool, + /// Bootstrap nodes for initial connection + pub bootstrap_nodes: Vec, + /// Enable encryption for all communications + pub enable_encryption: bool, + /// Custom TCP port for mesh communication + pub tcp_port: Option, + /// Custom UDP port for mesh communication + pub udp_port: Option, + /// Maximum retry attempts for failed connections + pub max_retries: Option, + /// Enable automatic data replication + pub enable_replication: bool, + /// Number of data replicas to maintain + pub replication_factor: Option, } impl Default for MeshOptions { @@ -63,6 +81,15 @@ impl Default for MeshOptions { max_connections: Some(1000), enable_persistence: false, persistence_path: None, + node_timeout: Some(300), + enable_discovery: true, + bootstrap_nodes: Vec::new(), + enable_encryption: true, + tcp_port: None, + udp_port: None, + max_retries: Some(3), + enable_replication: true, + replication_factor: Some(3), } } } @@ -109,13 +136,34 @@ pub async fn init_with_options( // Restore persisted data if available if let Some(persistence) = &persistence_manager { - let nodes = persistence.load_nodes()?; - bellande_mesh.restore_nodes(nodes).await?; + // Load nodes from persistence + let nodes = persistence + .load_nodes() + .map_err(|e| BellandeMeshError::Custom(format!("Failed to load nodes: {}", e)))?; - // Load data chunks for each node + // Restore the nodes structure + bellande_mesh + .restore_nodes(nodes.clone()) + .await + .map_err(|e| BellandeMeshError::Custom(format!("Failed to restore nodes: {}", e)))?; + + // Then load and restore data chunks for each node for node in nodes { - let chunks = persistence.load_data_chunks(&node.id)?; - bellande_mesh.restore_data_chunks(node.id, chunks).await?; + match persistence.load_data_chunks(&node.id) { + Ok(chunks) => { + if let Err(e) = bellande_mesh.restore_data_chunks(node.id, chunks).await { + eprintln!( + "Error restoring chunks for node {}: {}", + node.id.to_hex(), + e + ); + } + } + Err(e) => { + eprintln!("Error loading chunks for node {}: {}", node.id.to_hex(), e); + continue; + } + } } } @@ -142,7 +190,7 @@ pub async fn stop(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshEr bellande_mesh.stop().await } -/// Broadcast a message to all nodes in the network +// Message and Data Handling pub async fn broadcast( bellande_mesh: &BellandeMeshSync, data: Vec, @@ -150,27 +198,6 @@ pub async fn broadcast( bellande_mesh.broadcast_data(data).await } -/// Get the current network statistics -pub async fn get_stats( - bellande_mesh: &BellandeMeshSync, -) -> Result { - bellande_mesh.get_network_stats().await -} - -/// Get the list of currently connected nodes -pub async fn get_nodes(bellande_mesh: &BellandeMeshSync) -> Result, BellandeMeshError> { - bellande_mesh.get_all_nodes().await -} - -/// Check if a specific node is connected to the network -pub async fn is_node_connected( - bellande_mesh: &BellandeMeshSync, - node_id: NodeId, -) -> Result { - bellande_mesh.is_node_connected(&node_id).await -} - -/// Send a message to a specific node pub async fn send_to_node( bellande_mesh: &BellandeMeshSync, node_id: NodeId, @@ -178,3 +205,109 @@ pub async fn send_to_node( ) -> Result<(), BellandeMeshError> { bellande_mesh.send_data_to_node(node_id, data).await } + +// Node Management +pub async fn get_nodes(bellande_mesh: &BellandeMeshSync) -> Result, BellandeMeshError> { + bellande_mesh.get_all_nodes().await +} + +pub async fn get_active_nodes( + bellande_mesh: &BellandeMeshSync, +) -> Result, BellandeMeshError> { + bellande_mesh.get_all_active_nodes().await +} + +pub async fn get_nodes_paginated( + bellande_mesh: &BellandeMeshSync, + offset: usize, + limit: usize, +) -> Result<(Vec, usize), BellandeMeshError> { + bellande_mesh.get_nodes_paginated(offset, limit).await +} + +pub async fn is_node_connected( + bellande_mesh: &BellandeMeshSync, + node_id: NodeId, +) -> Result { + bellande_mesh.is_node_connected(&node_id).await +} + +pub async fn restore_nodes( + bellande_mesh: &BellandeMeshSync, + nodes: Vec, +) -> Result<(), BellandeMeshError> { + bellande_mesh.restore_nodes(nodes).await +} + +// Network Statistics and Monitoring +pub async fn get_stats( + bellande_mesh: &BellandeMeshSync, +) -> Result { + bellande_mesh.get_network_stats().await +} + +pub async fn get_node_count(bellande_mesh: &BellandeMeshSync) -> Result { + bellande_mesh.get_node_count().await +} + +pub async fn get_node_list( + bellande_mesh: &BellandeMeshSync, +) -> Result, BellandeMeshError> { + bellande_mesh.get_node_list().await +} + +// Maintenance Operations +pub async fn start_metrics_collection( + bellande_mesh: &BellandeMeshSync, + interval: u64, +) -> Result<(), BellandeMeshError> { + bellande_mesh.start_metrics_collection(interval).await +} + +pub async fn set_max_connections( + bellande_mesh: &BellandeMeshSync, + max_conn: usize, +) -> Result<(), BellandeMeshError> { + bellande_mesh.set_max_connections(max_conn).await +} + +pub async fn cleanup_dead_nodes(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> { + bellande_mesh.cleanup_dead_nodes().await +} + +// Node Discovery and Communication +pub async fn find_closest_nodes( + bellande_mesh: &BellandeMeshSync, + target: &NodeId, + count: usize, +) -> Result, BellandeMeshError> { + Ok(bellande_mesh.find_closest_nodes(target, count).await) +} + +pub async fn broadcast_new_node( + bellande_mesh: &BellandeMeshSync, + new_node: &Node, +) -> Result<(), BellandeMeshError> { + bellande_mesh.broadcast_new_node(new_node).await +} + +// Protocol Functions +pub async fn handle_join_request( + bellande_mesh: &BellandeMeshSync, + data: Vec, +) -> Result<(), BellandeMeshError> { + bellande_mesh.handle_join_request(data).await +} + +// Utility Functions +pub async fn get_local_id(bellande_mesh: &BellandeMeshSync) -> Result { + bellande_mesh.get_local_id().await +} + +pub async fn get_status(bellande_mesh: &BellandeMeshSync) -> String { + bellande_mesh.get_status().await +} + +pub async fn is_running(bellande_mesh: &BellandeMeshSync) -> Result { + bellande_mesh.is_running().await +} diff --git a/src/mesh/mesh.rs b/src/mesh/mesh.rs index 28f3a8b..a1d2059 100644 --- a/src/mesh/mesh.rs +++ b/src/mesh/mesh.rs @@ -18,6 +18,7 @@ use crate::data::data::DataChunk; use crate::error::error::BellandeMeshError; pub use crate::metrics::metrics::MetricsManager; use crate::node::node::{Message, Node, NodeId, PublicKey}; +use hyper::server::conn::AddrStream; use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Client, Request, Response, Server, StatusCode}; use hyper_rustls::HttpsConnectorBuilder; @@ -34,7 +35,6 @@ use std::time::{Duration, SystemTime}; use tokio::net::{TcpListener as TokioTcpListener, UdpSocket as TokioUdpSocket}; use tokio::signal; use tokio::sync::{mpsc, Mutex}; -use tokio::time::sleep; use tokio_util::sync::CancellationToken; // Constants @@ -227,7 +227,7 @@ impl BellandeMeshSync { Ok(()) } - async fn start_message_handler(&self) -> Result<(), BellandeMeshError> { + pub async fn start_message_handler(&self) -> Result<(), BellandeMeshError> { let handler = self.clone(); tokio::spawn(async move { let mut rx = handler.message_rx.lock().await; @@ -240,7 +240,7 @@ impl BellandeMeshSync { Ok(()) } - async fn start_protocol_listeners(&self) -> Result<(), BellandeMeshError> { + pub async fn start_protocol_listeners(&self) -> Result<(), BellandeMeshError> { self.start_tcp_listener().await?; self.start_udp_listener().await?; self.start_http_server().await?; @@ -248,7 +248,7 @@ impl BellandeMeshSync { Ok(()) } - async fn start_tcp_listener(&self) -> Result<(), BellandeMeshError> { + pub async fn start_tcp_listener(&self) -> Result<(), BellandeMeshError> { let addr = self .config .listen_address @@ -273,22 +273,23 @@ impl BellandeMeshSync { Ok(()) } - async fn start_udp_listener(&self) -> Result<(), BellandeMeshError> { + pub async fn start_udp_listener(&self) -> Result<(), BellandeMeshError> { let addr = self .config .listen_address .parse::() .map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?; let socket = TokioUdpSocket::bind(addr).await?; - let handler = self.clone(); + let handler = Arc::new(self.clone()); tokio::spawn(async move { let mut buf = [0u8; UDP_BUFFER_SIZE]; - while handler.is_running().unwrap_or(false) { + loop { match socket.recv_from(&mut buf).await { Ok((len, src)) => { let handler = handler.clone(); let data = buf[..len].to_vec(); + tokio::spawn(async move { if let Err(e) = handler.handle_udp_packet(&data, src).await { eprintln!("UDP error from {}: {}", src, e); @@ -303,18 +304,17 @@ impl BellandeMeshSync { Ok(()) } - async fn start_http_server(&self) -> Result<(), BellandeMeshError> { + pub async fn start_http_server(&self) -> Result<(), BellandeMeshError> { let addr = self .config .listen_address .parse::() .map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?; let http_addr = SocketAddr::new(addr.ip(), addr.port() + HTTP_PORT_OFFSET); - let metrics = self.metrics.clone(); let handler = self.clone(); - let make_service = make_service_fn(move |_| { + let make_service = make_service_fn(move |_conn: &AddrStream| { let handler = handler.clone(); let metrics = metrics.clone(); @@ -331,21 +331,32 @@ impl BellandeMeshSync { } }); - let server = Server::bind(&http_addr) - .serve(make_service) - .with_graceful_shutdown(Self::shutdown_signal()); + let server = match Server::try_bind(&http_addr) { + Ok(builder) => builder.serve(make_service), + Err(e) => { + return Err(BellandeMeshError::Custom(format!( + "Server bind error: {}", + e + ))) + } + }; + + tokio::spawn(async move { + if let Err(e) = server.await { + eprintln!("Server error: {}", e); + } + }); - tokio::spawn(server); Ok(()) } - async fn shutdown_signal() { + pub async fn shutdown_signal() { if let Err(e) = signal::ctrl_c().await { eprintln!("Failed to install CTRL+C signal handler: {}", e); } } - async fn start_https_server(&self) -> Result<(), BellandeMeshError> { + pub async fn start_https_server(&self) -> Result<(), BellandeMeshError> { let addr = self .config .listen_address @@ -354,31 +365,20 @@ impl BellandeMeshSync { let https_addr = SocketAddr::new(addr.ip(), addr.port() + HTTPS_PORT_OFFSET); let handler = self.clone(); let tls_config = self.tls_config.clone(); - let listener = TokioTcpListener::bind(https_addr).await?; + // Create TLS acceptor from config + let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config); + tokio::spawn(async move { while handler.is_running().unwrap_or(false) { if let Ok((stream, addr)) = listener.accept().await { let handler = handler.clone(); - let tls_config = tls_config.clone(); + let acceptor = tls_acceptor.clone(); tokio::spawn(async move { - // Create TLS server connection - let tls_conn = match rustls::ServerConnection::new(tls_config.clone()) { - Ok(conn) => conn, - Err(e) => { - eprintln!("TLS connection creation error: {}", e); - return; - } - }; - - // Create async TLS stream - let mut tls_stream = tokio_rustls::server::TlsStream::new(stream, tls_conn); - - // Perform TLS handshake - match tls_stream.do_handshake().await { - Ok(_) => { + match acceptor.accept(stream).await { + Ok(tls_stream) => { // Handle secure connection if let Err(e) = handler.handle_https_connection(tls_stream).await { eprintln!("HTTPS error from {}: {}", addr, e); @@ -396,41 +396,96 @@ impl BellandeMeshSync { Ok(()) } - async fn start_maintenance_tasks(&self) -> Result<(), BellandeMeshError> { + pub async fn start_maintenance_tasks(&self) -> Result<(), BellandeMeshError> { let sync_handler = self.clone(); + let sync_running = Arc::clone(&self.running); + tokio::spawn(async move { - while sync_handler.is_running().unwrap_or(false) { + let mut interval = tokio::time::interval(SYNC_INTERVAL); + while sync_running.read().await.clone() { if let Err(e) = sync_handler.sync_with_peers().await { eprintln!("Sync error: {}", e); } - sleep(SYNC_INTERVAL).await; + interval.tick().await; } }); let cleanup_handler = self.clone(); + let cleanup_running = Arc::clone(&self.running); + tokio::spawn(async move { - while cleanup_handler.is_running().unwrap_or(false) { + let mut interval = tokio::time::interval(CLEANUP_INTERVAL); + while cleanup_running.read().await.clone() { if let Err(e) = cleanup_handler.cleanup_dead_nodes().await { eprintln!("Cleanup error: {}", e); } - sleep(CLEANUP_INTERVAL).await; + interval.tick().await; } }); let ping_handler = self.clone(); + let ping_running = Arc::clone(&self.running); + tokio::spawn(async move { - while ping_handler.is_running().unwrap_or(false) { + let mut interval = tokio::time::interval(PING_INTERVAL); + while ping_running.read().await.clone() { if let Err(e) = ping_handler.send_ping_to_all_nodes().await { eprintln!("Ping error: {}", e); } - sleep(PING_INTERVAL).await; + interval.tick().await; } }); Ok(()) } - async fn handle_tcp_connection(&self, stream: S) -> Result<(), BellandeMeshError> + pub async fn get_all_nodes(&self) -> Result, BellandeMeshError> { + let nodes = self.nodes.read().await; + Ok(nodes.clone()) + } + + // Acquire read lock on nodes + pub async fn get_all_nodes_detailed(&self) -> Result, BellandeMeshError> { + let nodes = self.nodes.read().await; + + let nodes_copy = nodes.clone(); + self.metrics.increment_sync_operations(); + + { + let mut stats = self.stats.write().await; + stats.active_nodes = nodes_copy.len(); + } + + Ok(nodes_copy) + } + + pub async fn get_all_active_nodes(&self) -> Result, BellandeMeshError> { + let nodes = self.nodes.read().await; + + // Filter active nodes + let active_nodes: Vec = nodes + .iter() + .filter(|node| node.is_alive(Duration::from_secs(self.config.node_timeout))) + .cloned() + .collect(); + + Ok(active_nodes) + } + + pub async fn get_nodes_paginated( + &self, + offset: usize, + limit: usize, + ) -> Result<(Vec, usize), BellandeMeshError> { + let nodes = self.nodes.read().await; + + let total = nodes.len(); + let paginated_nodes: Vec = nodes.iter().skip(offset).take(limit).cloned().collect(); + + Ok((paginated_nodes, total)) + } + + pub async fn handle_tcp_connection(&self, stream: S) -> Result<(), BellandeMeshError> where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, { @@ -456,7 +511,7 @@ impl BellandeMeshSync { Ok(()) } - async fn read_async_message( + pub async fn read_async_message( &self, reader: &mut S, buf: &mut [u8], @@ -479,7 +534,7 @@ impl BellandeMeshSync { .map_err(|e| BellandeMeshError::Deserialization(e.to_string())) } - async fn write_async_message( + pub async fn write_async_message( &self, writer: &mut S, message: &Message, @@ -503,7 +558,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_udp_packet( + pub async fn handle_udp_packet( &self, data: &[u8], src: SocketAddr, @@ -515,7 +570,10 @@ impl BellandeMeshSync { self.handle_message_internal(message, src).await } - async fn handle_http_request(&self, req: Request) -> Result, Infallible> { + pub async fn handle_http_request( + &self, + req: Request, + ) -> Result, Infallible> { self.update_stats(|stats| stats.http_requests += 1); let response = match (req.method(), req.uri().path()) { @@ -545,7 +603,7 @@ impl BellandeMeshSync { Ok(response) } - async fn handle_https_connection(&self, stream: S) -> Result<(), BellandeMeshError> + pub async fn handle_https_connection(&self, stream: S) -> Result<(), BellandeMeshError> where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin, { @@ -553,7 +611,7 @@ impl BellandeMeshSync { self.handle_tcp_connection(stream).await } - async fn handle_message_internal( + pub async fn handle_message_internal( &self, message: Message, src: SocketAddr, @@ -620,7 +678,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_store( + pub async fn handle_store( &self, key: Vec, value: Vec, @@ -632,7 +690,7 @@ impl BellandeMeshSync { value.clone(), "checksum".to_string(), 0, - self.get_local_id()?, + self.get_local_id().await?, vec![], ); @@ -648,12 +706,12 @@ impl BellandeMeshSync { data.insert(NodeId::from_bytes(&key), chunk); // Replicate to closest nodes - let closest_nodes = self.find_closest_nodes(&NodeId::from_bytes(&key), 3); + let closest_nodes = self.find_closest_nodes(&NodeId::from_bytes(&key), 3).await; for target_node in closest_nodes { let store_msg = Message::Store { key: key.clone(), value: value.clone(), - sender: self.get_local_id()?, + sender: self.get_local_id().await?, token: rand::random(), }; if let Err(e) = self.send_message(target_node.address, &store_msg).await { @@ -668,7 +726,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_find_node( + pub async fn handle_find_node( &self, target: NodeId, src: SocketAddr, @@ -683,7 +741,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_find_value( + pub async fn handle_find_value( &self, key: Vec, src: SocketAddr, @@ -700,7 +758,7 @@ impl BellandeMeshSync { let response = Message::Value { key: key.clone(), value: chunk.content.clone(), - sender: self.get_local_id()?, + sender: self.get_local_id().await?, token: rand::random(), }; return self.send_message(src, &response).await; @@ -709,17 +767,17 @@ impl BellandeMeshSync { } // If not found, return closest nodes - let closest_nodes = self.find_closest_nodes(&node_id, 20); + let closest_nodes = self.find_closest_nodes(&node_id, 20).await; let response = Message::Nodes { nodes: closest_nodes, - sender: self.get_local_id()?, + sender: self.get_local_id().await?, token: rand::random(), }; self.send_message(src, &response).await?; Ok(()) } - async fn handle_nodes( + pub async fn handle_nodes( &self, new_nodes: Vec, _src: SocketAddr, @@ -734,7 +792,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_value( + pub async fn handle_value( &self, key: Vec, value: Vec, @@ -744,7 +802,7 @@ impl BellandeMeshSync { value, "checksum".to_string(), 0, - self.get_local_id()?, + self.get_local_id().await?, vec![], ); @@ -761,7 +819,19 @@ impl BellandeMeshSync { Ok(()) } - async fn find_closest_nodes(&self, target: &NodeId, count: usize) -> Vec { + pub async fn update_stats(&self, updater: F) -> Result<(), BellandeMeshError> + where + F: FnOnce(&mut NetworkStats), + { + let mut stats = self + .stats + .write() + .map_err(|_| BellandeMeshError::LockError)?; + updater(&mut stats); + Ok(()) + } + + pub async fn find_closest_nodes(&self, target: &NodeId, count: usize) -> Vec { let nodes = self.nodes.read().await; let mut distances: Vec<_> = nodes .iter() @@ -791,7 +861,7 @@ impl BellandeMeshSync { distance } - async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> { + pub async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> { let mut nodes = self .nodes .write() @@ -802,7 +872,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_join_request(&self, data: Vec) -> Result<(), BellandeMeshError> { + pub async fn handle_join_request(&self, data: Vec) -> Result<(), BellandeMeshError> { let request: Message = bincode::deserialize(&data) .map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?; @@ -821,7 +891,7 @@ impl BellandeMeshSync { } } - async fn handle_join_request_internal( + pub async fn handle_join_request_internal( &self, id: NodeId, public_key: PublicKey, @@ -844,7 +914,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_data_sync(&self, chunks: Vec) -> Result<(), BellandeMeshError> { + pub async fn handle_data_sync(&self, chunks: Vec) -> Result<(), BellandeMeshError> { let nodes = self .nodes .read() @@ -859,7 +929,7 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_data_request( + pub async fn handle_data_request( &self, ids: &[NodeId], src: SocketAddr, @@ -884,16 +954,20 @@ impl BellandeMeshSync { Ok(()) } - async fn handle_ping(&self, token: u64, src: SocketAddr) -> Result<(), BellandeMeshError> { + pub async fn handle_ping(&self, token: u64, src: SocketAddr) -> Result<(), BellandeMeshError> { let response = Message::Pong { - sender: self.get_local_id()?, + sender: self.get_local_id().await?, token, }; self.send_message(src, &response).await?; Ok(()) } - async fn handle_pong(&self, sender: NodeId, src: SocketAddr) -> Result<(), BellandeMeshError> { + pub async fn handle_pong( + &self, + sender: NodeId, + src: SocketAddr, + ) -> Result<(), BellandeMeshError> { let mut nodes = self .nodes .write() @@ -904,7 +978,7 @@ impl BellandeMeshSync { Ok(()) } - async fn restore_nodes(&self, nodes: Vec) -> Result<(), BellandeMeshError> { + pub async fn restore_nodes(&self, nodes: Vec) -> Result<(), BellandeMeshError> { let mut current_nodes = self.nodes.write().await; for node in nodes { if !current_nodes.iter().any(|n| n.id == node.id) { @@ -937,7 +1011,7 @@ impl BellandeMeshSync { Ok(()) } - async fn sync_with_peers(&self) -> Result<(), BellandeMeshError> { + pub async fn sync_with_peers(&self) -> Result<(), BellandeMeshError> { let nodes = { let nodes_guard = self .nodes @@ -978,21 +1052,24 @@ impl BellandeMeshSync { node_id: NodeId, chunks: Vec, ) -> Result<(), BellandeMeshError> { + // Get read lock on nodes let nodes = self.nodes.read().await; + + // Find the target node if let Some(node) = nodes.iter().find(|n| n.id == node_id) { - let mut data = node - .data - .write() - .map_err(|_| BellandeMeshError::LockError)?; + // Get write lock on node's data + let mut data = node.data.write().await; + + // Insert all chunks for chunk in chunks { data.insert(chunk.id.clone(), chunk); - self.metrics.increment_sync_operations(); } } + Ok(()) } - async fn send_response( + pub async fn send_response( &self, src: SocketAddr, closest_nodes: Vec, @@ -1005,17 +1082,15 @@ impl BellandeMeshSync { self.send_message(src, &response).await } - async fn start_metrics_collection(&self) -> Result<(), BellandeMeshError> { + pub async fn start_metrics_collection(&self, interval: u64) -> Result<(), BellandeMeshError> { let metrics = self.metrics.clone(); let running = self.running.clone(); tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_secs(60)); - while *running.read().await { + let mut interval = tokio::time::interval(Duration::from_secs(interval)); + while let Ok(true) = running.read().map(|guard| *guard) { interval.tick().await; - if let Err(e) = metrics.update_metrics().await { - eprintln!("Metrics error: {}", e); - } + metrics.increment_sync_operations(); } }); @@ -1023,57 +1098,56 @@ impl BellandeMeshSync { } pub async fn set_max_connections(&self, max_conn: usize) -> Result<(), BellandeMeshError> { - let nodes = self.nodes.read().await; + let mut nodes = self + .nodes + .write() + .map_err(|_| BellandeMeshError::LockError)?; if nodes.len() > max_conn { - // Remove excess connections, keeping the oldest/most reliable ones - let mut nodes = self.nodes.write().await; nodes.sort_by(|a, b| b.last_seen.cmp(&a.last_seen)); nodes.truncate(max_conn); - self.update_stats(|stats| stats.active_nodes = max_conn) - .await; } Ok(()) } pub async fn broadcast_data(&self, data: Vec) -> Result<(), BellandeMeshError> { - let nodes = self.nodes.read().await; + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + let local_id = self.get_local_id().await?; + let message = Message::Store { - key: vec![], // Broadcast key + key: vec![], value: data, - sender: self.get_local_id().await?, + sender: local_id, token: rand::random(), }; - let mut send_errors = Vec::new(); for node in nodes.iter() { if let Err(e) = self.send_message(node.address, &message).await { - send_errors.push((node.id.clone(), e)); + eprintln!("Failed to broadcast to {}: {}", node.address, e); } } - // Update metrics - self.metrics.increment_sync_operations(); - self.update_stats(|stats| stats.total_messages += 1).await; - - // If any errors occurred, return the first one - if let Some((node_id, error)) = send_errors.first() { - return Err(BellandeMeshError::Custom(format!( - "Failed to broadcast to node {}: {}", - node_id, error - ))); - } - Ok(()) } pub async fn get_network_stats(&self) -> Result { - let stats = self.stats.read().await; + let stats = self + .stats + .read() + .map_err(|_| BellandeMeshError::LockError)?; + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; + Ok(NetworkStats { tcp_connections: stats.tcp_connections, udp_packets_received: stats.udp_packets_received, http_requests: stats.http_requests, https_requests: stats.https_requests, - active_nodes: self.nodes.read().await.len(), + active_nodes: nodes.len(), total_messages: stats.total_messages, start_time: stats.start_time, last_sync: stats.last_sync, @@ -1085,30 +1159,27 @@ impl BellandeMeshSync { node_id: NodeId, data: Vec, ) -> Result<(), BellandeMeshError> { - let nodes = self.nodes.read().await; + let nodes = self + .nodes + .read() + .map_err(|_| BellandeMeshError::LockError)?; let node = nodes .iter() .find(|n| n.id == node_id) .ok_or_else(|| BellandeMeshError::Custom("Node not found".to_string()))?; + let local_id = self.get_local_id().await?; let message = Message::Store { key: node_id.to_bytes(), value: data, - sender: self.get_local_id().await?, + sender: local_id, token: rand::random(), }; - // Send the message - self.send_message(node.address, &message).await?; - - // Update metrics - self.metrics.increment_sync_operations(); - self.update_stats(|stats| stats.total_messages += 1).await; - - Ok(()) + self.send_message(node.address, &message).await } - async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> { + pub async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> { let timeout = Duration::from_secs(self.config.node_timeout); let mut nodes = self .nodes @@ -1132,14 +1203,15 @@ impl BellandeMeshSync { Ok(()) } - async fn send_ping_to_all_nodes(&self) -> Result<(), BellandeMeshError> { + pub async fn send_ping_to_all_nodes(&self) -> Result<(), BellandeMeshError> { let nodes = self .nodes .read() .map_err(|_| BellandeMeshError::LockError)?; + let local_id = self.get_local_id().await?; let token = rand::random::(); let ping = Message::Ping { - sender: self.get_local_id()?, + sender: local_id, token, }; @@ -1152,7 +1224,7 @@ impl BellandeMeshSync { Ok(()) } - async fn read_message(&self, stream: &mut TcpStream) -> Result { + pub async fn read_message(&self, stream: &mut TcpStream) -> Result { let mut len_buf = [0u8; 4]; stream.read_exact(&mut len_buf)?; let len = u32::from_be_bytes(len_buf) as usize; @@ -1168,7 +1240,7 @@ impl BellandeMeshSync { .map_err(|e| BellandeMeshError::Deserialization(e.to_string())) } - async fn get_status(&self) -> String { + pub async fn get_status(&self) -> String { let stats = self .stats .read() @@ -1178,16 +1250,7 @@ impl BellandeMeshSync { serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "Error getting status".to_string()) } - fn update_stats(&self, updater: F) - where - F: FnOnce(&mut NetworkStats), - { - if let Ok(mut stats) = self.stats.write() { - updater(&mut stats); - } - } - - fn is_running(&self) -> Result { + pub async fn is_running(&self) -> Result { self.running .read() .map_err(|_| BellandeMeshError::LockError) @@ -1201,7 +1264,7 @@ impl BellandeMeshSync { .map(|nodes| nodes.first().map(|n| n.id).unwrap_or_else(|| NodeId::new())) } - async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> { + pub async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> { let nodes = self .nodes .read() @@ -1256,12 +1319,11 @@ impl BellandeMeshSync { } pub async fn is_node_connected(&self, node_id: &NodeId) -> Result { - Ok(self + let nodes = self .nodes .read() - .map_err(|_| BellandeMeshError::LockError)? - .iter() - .any(|node| &node.id == node_id)) + .map_err(|_| BellandeMeshError::LockError)?; + Ok(nodes.iter().any(|node| &node.id == node_id)) } } diff --git a/src/persistence/persistence.rs b/src/persistence/persistence.rs index 681d0fc..79c44a9 100644 --- a/src/persistence/persistence.rs +++ b/src/persistence/persistence.rs @@ -119,52 +119,120 @@ impl PersistenceManager { Ok(nodes) } + pub fn load_data_chunks(&self, node_id: &NodeId) -> Result, BellandeMeshError> { + let chunks_dir = self.data_dir.join("data_chunks").join(node_id.to_hex()); + + if !chunks_dir.exists() { + return Ok(Vec::new()); + } + + let mut chunks = Vec::new(); + + const SUPPORTED_EXTENSIONS: [&str; 2] = ["bellande", "dat"]; + + for entry in std::fs::read_dir(chunks_dir)? { + let entry = entry?; + let path = entry.path(); + + if let Some(extension) = path.extension().and_then(|s| s.to_str()) { + if SUPPORTED_EXTENSIONS.contains(&extension) { + match self.read_chunk_file(&path) { + Ok(chunk) => chunks.push(chunk), + Err(e) => { + eprintln!("Error reading chunk file {:?}: {}", path, e); + continue; + } + } + } + } + } + + chunks.sort_by(|a, b| a.id.cmp(&b.id)); + + Ok(chunks) + } + + fn read_chunk_file(&self, path: &Path) -> Result { + let file = self.get_connection(path)?; + let reader = BufReader::new(file); + + let extension = path + .extension() + .and_then(|s| s.to_str()) + .ok_or_else(|| BellandeMeshError::Custom("Invalid file extension".to_string()))?; + + match extension { + "bellande" => bincode::deserialize_from(reader).map_err(|e| { + BellandeMeshError::Deserialization(format!( + "Failed to deserialize .bellande file: {}", + e + )) + }), + "dat" => bincode::deserialize_from(reader).map_err(|e| { + BellandeMeshError::Deserialization(format!( + "Failed to deserialize .dat file: {}", + e + )) + }), + _ => Err(BellandeMeshError::Custom( + "Unsupported file extension".to_string(), + )), + } + } + pub fn save_data_chunk( &self, node_id: &NodeId, chunk: &DataChunk, + use_bellande: bool, ) -> Result<(), BellandeMeshError> { - let path = self - .data_dir - .join("data_chunks") - .join(node_id.to_hex()) - .join(format!("{}.dat", chunk.id.to_hex())); + let chunks_dir = self.data_dir.join("data_chunks").join(node_id.to_hex()); + std::fs::create_dir_all(&chunks_dir)?; + + let extension = if use_bellande { "bellande" } else { "dat" }; + let file_name = format!("{}.{}", chunk.id.to_hex(), extension); + let path = chunks_dir.join(file_name); - std::fs::create_dir_all(path.parent().unwrap())?; let file = self.get_connection(&path)?; - let writer = BufWriter::new(file); - - bincode::serialize_into(writer, chunk) + bincode::serialize_into(file, chunk) .map_err(|e| BellandeMeshError::Serialization(e.to_string()))?; Ok(()) } - pub fn load_data_chunks( + pub async fn migrate_chunk_format( &self, node_id: &NodeId, - ) -> Result, BellandeMeshError> { + to_bellande: bool, + ) -> Result<(), BellandeMeshError> { + let chunks = self.load_data_chunks(node_id)?; + let chunks_dir = self.data_dir.join("data_chunks").join(node_id.to_hex()); + let temp_dir = chunks_dir.with_extension("temp"); + std::fs::create_dir_all(&temp_dir)?; - if !chunks_dir.exists() { - return Ok(HashMap::new()); + for chunk in &chunks { + let extension = if to_bellande { "bellande" } else { "dat" }; + let file_name = format!("{}.{}", chunk.id.to_hex(), extension); + let path = temp_dir.join(file_name); + + let file = self.get_connection(&path)?; + bincode::serialize_into(file, chunk) + .map_err(|e| BellandeMeshError::Serialization(e.to_string()))?; } - let mut chunks = HashMap::new(); - for entry in std::fs::read_dir(chunks_dir)? { - let entry = entry?; - let path = entry.path(); - if path.extension().and_then(|s| s.to_str()) == Some("dat") { - let file = self.get_connection(&path)?; - let reader = BufReader::new(file); - - let chunk: DataChunk = bincode::deserialize_from(reader) - .map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?; - chunks.insert(chunk.id, chunk); - } + let backup_dir = chunks_dir.with_extension("backup"); + if chunks_dir.exists() { + std::fs::rename(&chunks_dir, &backup_dir)?; } - Ok(chunks) + std::fs::rename(&temp_dir, &chunks_dir)?; + + if backup_dir.exists() { + std::fs::remove_dir_all(backup_dir)?; + } + + Ok(()) } fn get_connection(&self, path: &Path) -> Result {