latest pushes

This commit is contained in:
Ronaldson Bellande 2024-10-25 21:46:27 -04:00
parent 28749bd777
commit e36f88bca4
4 changed files with 454 additions and 190 deletions

View File

@ -8,7 +8,7 @@ license = "GPL-3.0-or-later"
repository = "https://github.com/Architecture-Mechanism/bellande_mesh_sync"
documentation = "https://bellande-architecture-mechanism-research-innovation-center.org/bellande_mesh_sync/docs"
readme = "README.md"
keywords = ["p2p", "synchronization", "distributed-systems", "mesh-network", "bellande_mesh_sync"]
keywords = ["synchronization", "distributed-systems", "mesh-network", "bellande_mesh_sync"]
categories = ["network-programming", "asynchronous"]
[lib]
@ -29,3 +29,4 @@ chrono = { version = "0.4", features = ["serde"] }
sled = "0.34"
tokio-util = { version = "0.7", features = ["full"] }
async-trait = "0.1"
tokio-rustls = "0.24"

View File

@ -51,6 +51,24 @@ pub struct MeshOptions {
pub enable_persistence: bool,
/// Custom persistence directory
pub persistence_path: Option<PathBuf>,
/// Node timeout duration in seconds
pub node_timeout: Option<u64>,
/// Enable automatic node discovery
pub enable_discovery: bool,
/// Bootstrap nodes for initial connection
pub bootstrap_nodes: Vec<String>,
/// Enable encryption for all communications
pub enable_encryption: bool,
/// Custom TCP port for mesh communication
pub tcp_port: Option<u16>,
/// Custom UDP port for mesh communication
pub udp_port: Option<u16>,
/// Maximum retry attempts for failed connections
pub max_retries: Option<u32>,
/// Enable automatic data replication
pub enable_replication: bool,
/// Number of data replicas to maintain
pub replication_factor: Option<u32>,
}
impl Default for MeshOptions {
@ -63,6 +81,15 @@ impl Default for MeshOptions {
max_connections: Some(1000),
enable_persistence: false,
persistence_path: None,
node_timeout: Some(300),
enable_discovery: true,
bootstrap_nodes: Vec::new(),
enable_encryption: true,
tcp_port: None,
udp_port: None,
max_retries: Some(3),
enable_replication: true,
replication_factor: Some(3),
}
}
}
@ -109,13 +136,34 @@ pub async fn init_with_options(
// Restore persisted data if available
if let Some(persistence) = &persistence_manager {
let nodes = persistence.load_nodes()?;
bellande_mesh.restore_nodes(nodes).await?;
// Load nodes from persistence
let nodes = persistence
.load_nodes()
.map_err(|e| BellandeMeshError::Custom(format!("Failed to load nodes: {}", e)))?;
// Load data chunks for each node
// Restore the nodes structure
bellande_mesh
.restore_nodes(nodes.clone())
.await
.map_err(|e| BellandeMeshError::Custom(format!("Failed to restore nodes: {}", e)))?;
// Then load and restore data chunks for each node
for node in nodes {
let chunks = persistence.load_data_chunks(&node.id)?;
bellande_mesh.restore_data_chunks(node.id, chunks).await?;
match persistence.load_data_chunks(&node.id) {
Ok(chunks) => {
if let Err(e) = bellande_mesh.restore_data_chunks(node.id, chunks).await {
eprintln!(
"Error restoring chunks for node {}: {}",
node.id.to_hex(),
e
);
}
}
Err(e) => {
eprintln!("Error loading chunks for node {}: {}", node.id.to_hex(), e);
continue;
}
}
}
}
@ -142,7 +190,7 @@ pub async fn stop(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshEr
bellande_mesh.stop().await
}
/// Broadcast a message to all nodes in the network
// Message and Data Handling
pub async fn broadcast(
bellande_mesh: &BellandeMeshSync,
data: Vec<u8>,
@ -150,27 +198,6 @@ pub async fn broadcast(
bellande_mesh.broadcast_data(data).await
}
/// Get the current network statistics
pub async fn get_stats(
bellande_mesh: &BellandeMeshSync,
) -> Result<NetworkStats, BellandeMeshError> {
bellande_mesh.get_network_stats().await
}
/// Get the list of currently connected nodes
pub async fn get_nodes(bellande_mesh: &BellandeMeshSync) -> Result<Vec<Node>, BellandeMeshError> {
bellande_mesh.get_all_nodes().await
}
/// Check if a specific node is connected to the network
pub async fn is_node_connected(
bellande_mesh: &BellandeMeshSync,
node_id: NodeId,
) -> Result<bool, BellandeMeshError> {
bellande_mesh.is_node_connected(&node_id).await
}
/// Send a message to a specific node
pub async fn send_to_node(
bellande_mesh: &BellandeMeshSync,
node_id: NodeId,
@ -178,3 +205,109 @@ pub async fn send_to_node(
) -> Result<(), BellandeMeshError> {
bellande_mesh.send_data_to_node(node_id, data).await
}
// Node Management
pub async fn get_nodes(bellande_mesh: &BellandeMeshSync) -> Result<Vec<Node>, BellandeMeshError> {
bellande_mesh.get_all_nodes().await
}
pub async fn get_active_nodes(
bellande_mesh: &BellandeMeshSync,
) -> Result<Vec<Node>, BellandeMeshError> {
bellande_mesh.get_all_active_nodes().await
}
pub async fn get_nodes_paginated(
bellande_mesh: &BellandeMeshSync,
offset: usize,
limit: usize,
) -> Result<(Vec<Node>, usize), BellandeMeshError> {
bellande_mesh.get_nodes_paginated(offset, limit).await
}
pub async fn is_node_connected(
bellande_mesh: &BellandeMeshSync,
node_id: NodeId,
) -> Result<bool, BellandeMeshError> {
bellande_mesh.is_node_connected(&node_id).await
}
pub async fn restore_nodes(
bellande_mesh: &BellandeMeshSync,
nodes: Vec<Node>,
) -> Result<(), BellandeMeshError> {
bellande_mesh.restore_nodes(nodes).await
}
// Network Statistics and Monitoring
pub async fn get_stats(
bellande_mesh: &BellandeMeshSync,
) -> Result<NetworkStats, BellandeMeshError> {
bellande_mesh.get_network_stats().await
}
pub async fn get_node_count(bellande_mesh: &BellandeMeshSync) -> Result<usize, BellandeMeshError> {
bellande_mesh.get_node_count().await
}
pub async fn get_node_list(
bellande_mesh: &BellandeMeshSync,
) -> Result<Vec<NodeId>, BellandeMeshError> {
bellande_mesh.get_node_list().await
}
// Maintenance Operations
pub async fn start_metrics_collection(
bellande_mesh: &BellandeMeshSync,
interval: u64,
) -> Result<(), BellandeMeshError> {
bellande_mesh.start_metrics_collection(interval).await
}
pub async fn set_max_connections(
bellande_mesh: &BellandeMeshSync,
max_conn: usize,
) -> Result<(), BellandeMeshError> {
bellande_mesh.set_max_connections(max_conn).await
}
pub async fn cleanup_dead_nodes(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> {
bellande_mesh.cleanup_dead_nodes().await
}
// Node Discovery and Communication
pub async fn find_closest_nodes(
bellande_mesh: &BellandeMeshSync,
target: &NodeId,
count: usize,
) -> Result<Vec<Node>, BellandeMeshError> {
Ok(bellande_mesh.find_closest_nodes(target, count).await)
}
pub async fn broadcast_new_node(
bellande_mesh: &BellandeMeshSync,
new_node: &Node,
) -> Result<(), BellandeMeshError> {
bellande_mesh.broadcast_new_node(new_node).await
}
// Protocol Functions
pub async fn handle_join_request(
bellande_mesh: &BellandeMeshSync,
data: Vec<u8>,
) -> Result<(), BellandeMeshError> {
bellande_mesh.handle_join_request(data).await
}
// Utility Functions
pub async fn get_local_id(bellande_mesh: &BellandeMeshSync) -> Result<NodeId, BellandeMeshError> {
bellande_mesh.get_local_id().await
}
pub async fn get_status(bellande_mesh: &BellandeMeshSync) -> String {
bellande_mesh.get_status().await
}
pub async fn is_running(bellande_mesh: &BellandeMeshSync) -> Result<bool, BellandeMeshError> {
bellande_mesh.is_running().await
}

View File

@ -18,6 +18,7 @@ use crate::data::data::DataChunk;
use crate::error::error::BellandeMeshError;
pub use crate::metrics::metrics::MetricsManager;
use crate::node::node::{Message, Node, NodeId, PublicKey};
use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client, Request, Response, Server, StatusCode};
use hyper_rustls::HttpsConnectorBuilder;
@ -34,7 +35,6 @@ use std::time::{Duration, SystemTime};
use tokio::net::{TcpListener as TokioTcpListener, UdpSocket as TokioUdpSocket};
use tokio::signal;
use tokio::sync::{mpsc, Mutex};
use tokio::time::sleep;
use tokio_util::sync::CancellationToken;
// Constants
@ -227,7 +227,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn start_message_handler(&self) -> Result<(), BellandeMeshError> {
pub async fn start_message_handler(&self) -> Result<(), BellandeMeshError> {
let handler = self.clone();
tokio::spawn(async move {
let mut rx = handler.message_rx.lock().await;
@ -240,7 +240,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn start_protocol_listeners(&self) -> Result<(), BellandeMeshError> {
pub async fn start_protocol_listeners(&self) -> Result<(), BellandeMeshError> {
self.start_tcp_listener().await?;
self.start_udp_listener().await?;
self.start_http_server().await?;
@ -248,7 +248,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn start_tcp_listener(&self) -> Result<(), BellandeMeshError> {
pub async fn start_tcp_listener(&self) -> Result<(), BellandeMeshError> {
let addr = self
.config
.listen_address
@ -273,22 +273,23 @@ impl BellandeMeshSync {
Ok(())
}
async fn start_udp_listener(&self) -> Result<(), BellandeMeshError> {
pub async fn start_udp_listener(&self) -> Result<(), BellandeMeshError> {
let addr = self
.config
.listen_address
.parse::<SocketAddr>()
.map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?;
let socket = TokioUdpSocket::bind(addr).await?;
let handler = self.clone();
let handler = Arc::new(self.clone());
tokio::spawn(async move {
let mut buf = [0u8; UDP_BUFFER_SIZE];
while handler.is_running().unwrap_or(false) {
loop {
match socket.recv_from(&mut buf).await {
Ok((len, src)) => {
let handler = handler.clone();
let data = buf[..len].to_vec();
tokio::spawn(async move {
if let Err(e) = handler.handle_udp_packet(&data, src).await {
eprintln!("UDP error from {}: {}", src, e);
@ -303,18 +304,17 @@ impl BellandeMeshSync {
Ok(())
}
async fn start_http_server(&self) -> Result<(), BellandeMeshError> {
pub async fn start_http_server(&self) -> Result<(), BellandeMeshError> {
let addr = self
.config
.listen_address
.parse::<SocketAddr>()
.map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?;
let http_addr = SocketAddr::new(addr.ip(), addr.port() + HTTP_PORT_OFFSET);
let metrics = self.metrics.clone();
let handler = self.clone();
let make_service = make_service_fn(move |_| {
let make_service = make_service_fn(move |_conn: &AddrStream| {
let handler = handler.clone();
let metrics = metrics.clone();
@ -331,21 +331,32 @@ impl BellandeMeshSync {
}
});
let server = Server::bind(&http_addr)
.serve(make_service)
.with_graceful_shutdown(Self::shutdown_signal());
let server = match Server::try_bind(&http_addr) {
Ok(builder) => builder.serve(make_service),
Err(e) => {
return Err(BellandeMeshError::Custom(format!(
"Server bind error: {}",
e
)))
}
};
tokio::spawn(async move {
if let Err(e) = server.await {
eprintln!("Server error: {}", e);
}
});
tokio::spawn(server);
Ok(())
}
async fn shutdown_signal() {
pub async fn shutdown_signal() {
if let Err(e) = signal::ctrl_c().await {
eprintln!("Failed to install CTRL+C signal handler: {}", e);
}
}
async fn start_https_server(&self) -> Result<(), BellandeMeshError> {
pub async fn start_https_server(&self) -> Result<(), BellandeMeshError> {
let addr = self
.config
.listen_address
@ -354,31 +365,20 @@ impl BellandeMeshSync {
let https_addr = SocketAddr::new(addr.ip(), addr.port() + HTTPS_PORT_OFFSET);
let handler = self.clone();
let tls_config = self.tls_config.clone();
let listener = TokioTcpListener::bind(https_addr).await?;
// Create TLS acceptor from config
let tls_acceptor = tokio_rustls::TlsAcceptor::from(tls_config);
tokio::spawn(async move {
while handler.is_running().unwrap_or(false) {
if let Ok((stream, addr)) = listener.accept().await {
let handler = handler.clone();
let tls_config = tls_config.clone();
let acceptor = tls_acceptor.clone();
tokio::spawn(async move {
// Create TLS server connection
let tls_conn = match rustls::ServerConnection::new(tls_config.clone()) {
Ok(conn) => conn,
Err(e) => {
eprintln!("TLS connection creation error: {}", e);
return;
}
};
// Create async TLS stream
let mut tls_stream = tokio_rustls::server::TlsStream::new(stream, tls_conn);
// Perform TLS handshake
match tls_stream.do_handshake().await {
Ok(_) => {
match acceptor.accept(stream).await {
Ok(tls_stream) => {
// Handle secure connection
if let Err(e) = handler.handle_https_connection(tls_stream).await {
eprintln!("HTTPS error from {}: {}", addr, e);
@ -396,41 +396,96 @@ impl BellandeMeshSync {
Ok(())
}
async fn start_maintenance_tasks(&self) -> Result<(), BellandeMeshError> {
pub async fn start_maintenance_tasks(&self) -> Result<(), BellandeMeshError> {
let sync_handler = self.clone();
let sync_running = Arc::clone(&self.running);
tokio::spawn(async move {
while sync_handler.is_running().unwrap_or(false) {
let mut interval = tokio::time::interval(SYNC_INTERVAL);
while sync_running.read().await.clone() {
if let Err(e) = sync_handler.sync_with_peers().await {
eprintln!("Sync error: {}", e);
}
sleep(SYNC_INTERVAL).await;
interval.tick().await;
}
});
let cleanup_handler = self.clone();
let cleanup_running = Arc::clone(&self.running);
tokio::spawn(async move {
while cleanup_handler.is_running().unwrap_or(false) {
let mut interval = tokio::time::interval(CLEANUP_INTERVAL);
while cleanup_running.read().await.clone() {
if let Err(e) = cleanup_handler.cleanup_dead_nodes().await {
eprintln!("Cleanup error: {}", e);
}
sleep(CLEANUP_INTERVAL).await;
interval.tick().await;
}
});
let ping_handler = self.clone();
let ping_running = Arc::clone(&self.running);
tokio::spawn(async move {
while ping_handler.is_running().unwrap_or(false) {
let mut interval = tokio::time::interval(PING_INTERVAL);
while ping_running.read().await.clone() {
if let Err(e) = ping_handler.send_ping_to_all_nodes().await {
eprintln!("Ping error: {}", e);
}
sleep(PING_INTERVAL).await;
interval.tick().await;
}
});
Ok(())
}
async fn handle_tcp_connection<S>(&self, stream: S) -> Result<(), BellandeMeshError>
pub async fn get_all_nodes(&self) -> Result<Vec<Node>, BellandeMeshError> {
let nodes = self.nodes.read().await;
Ok(nodes.clone())
}
// Acquire read lock on nodes
pub async fn get_all_nodes_detailed(&self) -> Result<Vec<Node>, BellandeMeshError> {
let nodes = self.nodes.read().await;
let nodes_copy = nodes.clone();
self.metrics.increment_sync_operations();
{
let mut stats = self.stats.write().await;
stats.active_nodes = nodes_copy.len();
}
Ok(nodes_copy)
}
pub async fn get_all_active_nodes(&self) -> Result<Vec<Node>, BellandeMeshError> {
let nodes = self.nodes.read().await;
// Filter active nodes
let active_nodes: Vec<Node> = nodes
.iter()
.filter(|node| node.is_alive(Duration::from_secs(self.config.node_timeout)))
.cloned()
.collect();
Ok(active_nodes)
}
pub async fn get_nodes_paginated(
&self,
offset: usize,
limit: usize,
) -> Result<(Vec<Node>, usize), BellandeMeshError> {
let nodes = self.nodes.read().await;
let total = nodes.len();
let paginated_nodes: Vec<Node> = nodes.iter().skip(offset).take(limit).cloned().collect();
Ok((paginated_nodes, total))
}
pub async fn handle_tcp_connection<S>(&self, stream: S) -> Result<(), BellandeMeshError>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
@ -456,7 +511,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn read_async_message<S>(
pub async fn read_async_message<S>(
&self,
reader: &mut S,
buf: &mut [u8],
@ -479,7 +534,7 @@ impl BellandeMeshSync {
.map_err(|e| BellandeMeshError::Deserialization(e.to_string()))
}
async fn write_async_message<S>(
pub async fn write_async_message<S>(
&self,
writer: &mut S,
message: &Message,
@ -503,7 +558,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_udp_packet(
pub async fn handle_udp_packet(
&self,
data: &[u8],
src: SocketAddr,
@ -515,7 +570,10 @@ impl BellandeMeshSync {
self.handle_message_internal(message, src).await
}
async fn handle_http_request(&self, req: Request<Body>) -> Result<Response<Body>, Infallible> {
pub async fn handle_http_request(
&self,
req: Request<Body>,
) -> Result<Response<Body>, Infallible> {
self.update_stats(|stats| stats.http_requests += 1);
let response = match (req.method(), req.uri().path()) {
@ -545,7 +603,7 @@ impl BellandeMeshSync {
Ok(response)
}
async fn handle_https_connection<S>(&self, stream: S) -> Result<(), BellandeMeshError>
pub async fn handle_https_connection<S>(&self, stream: S) -> Result<(), BellandeMeshError>
where
S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin,
{
@ -553,7 +611,7 @@ impl BellandeMeshSync {
self.handle_tcp_connection(stream).await
}
async fn handle_message_internal(
pub async fn handle_message_internal(
&self,
message: Message,
src: SocketAddr,
@ -620,7 +678,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_store(
pub async fn handle_store(
&self,
key: Vec<u8>,
value: Vec<u8>,
@ -632,7 +690,7 @@ impl BellandeMeshSync {
value.clone(),
"checksum".to_string(),
0,
self.get_local_id()?,
self.get_local_id().await?,
vec![],
);
@ -648,12 +706,12 @@ impl BellandeMeshSync {
data.insert(NodeId::from_bytes(&key), chunk);
// Replicate to closest nodes
let closest_nodes = self.find_closest_nodes(&NodeId::from_bytes(&key), 3);
let closest_nodes = self.find_closest_nodes(&NodeId::from_bytes(&key), 3).await;
for target_node in closest_nodes {
let store_msg = Message::Store {
key: key.clone(),
value: value.clone(),
sender: self.get_local_id()?,
sender: self.get_local_id().await?,
token: rand::random(),
};
if let Err(e) = self.send_message(target_node.address, &store_msg).await {
@ -668,7 +726,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_find_node(
pub async fn handle_find_node(
&self,
target: NodeId,
src: SocketAddr,
@ -683,7 +741,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_find_value(
pub async fn handle_find_value(
&self,
key: Vec<u8>,
src: SocketAddr,
@ -700,7 +758,7 @@ impl BellandeMeshSync {
let response = Message::Value {
key: key.clone(),
value: chunk.content.clone(),
sender: self.get_local_id()?,
sender: self.get_local_id().await?,
token: rand::random(),
};
return self.send_message(src, &response).await;
@ -709,17 +767,17 @@ impl BellandeMeshSync {
}
// If not found, return closest nodes
let closest_nodes = self.find_closest_nodes(&node_id, 20);
let closest_nodes = self.find_closest_nodes(&node_id, 20).await;
let response = Message::Nodes {
nodes: closest_nodes,
sender: self.get_local_id()?,
sender: self.get_local_id().await?,
token: rand::random(),
};
self.send_message(src, &response).await?;
Ok(())
}
async fn handle_nodes(
pub async fn handle_nodes(
&self,
new_nodes: Vec<Node>,
_src: SocketAddr,
@ -734,7 +792,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_value(
pub async fn handle_value(
&self,
key: Vec<u8>,
value: Vec<u8>,
@ -744,7 +802,7 @@ impl BellandeMeshSync {
value,
"checksum".to_string(),
0,
self.get_local_id()?,
self.get_local_id().await?,
vec![],
);
@ -761,7 +819,19 @@ impl BellandeMeshSync {
Ok(())
}
async fn find_closest_nodes(&self, target: &NodeId, count: usize) -> Vec<Node> {
pub async fn update_stats<F>(&self, updater: F) -> Result<(), BellandeMeshError>
where
F: FnOnce(&mut NetworkStats),
{
let mut stats = self
.stats
.write()
.map_err(|_| BellandeMeshError::LockError)?;
updater(&mut stats);
Ok(())
}
pub async fn find_closest_nodes(&self, target: &NodeId, count: usize) -> Vec<Node> {
let nodes = self.nodes.read().await;
let mut distances: Vec<_> = nodes
.iter()
@ -791,7 +861,7 @@ impl BellandeMeshSync {
distance
}
async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> {
pub async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> {
let mut nodes = self
.nodes
.write()
@ -802,7 +872,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_join_request(&self, data: Vec<u8>) -> Result<(), BellandeMeshError> {
pub async fn handle_join_request(&self, data: Vec<u8>) -> Result<(), BellandeMeshError> {
let request: Message = bincode::deserialize(&data)
.map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?;
@ -821,7 +891,7 @@ impl BellandeMeshSync {
}
}
async fn handle_join_request_internal(
pub async fn handle_join_request_internal(
&self,
id: NodeId,
public_key: PublicKey,
@ -844,7 +914,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_data_sync(&self, chunks: Vec<DataChunk>) -> Result<(), BellandeMeshError> {
pub async fn handle_data_sync(&self, chunks: Vec<DataChunk>) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
@ -859,7 +929,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_data_request(
pub async fn handle_data_request(
&self,
ids: &[NodeId],
src: SocketAddr,
@ -884,16 +954,20 @@ impl BellandeMeshSync {
Ok(())
}
async fn handle_ping(&self, token: u64, src: SocketAddr) -> Result<(), BellandeMeshError> {
pub async fn handle_ping(&self, token: u64, src: SocketAddr) -> Result<(), BellandeMeshError> {
let response = Message::Pong {
sender: self.get_local_id()?,
sender: self.get_local_id().await?,
token,
};
self.send_message(src, &response).await?;
Ok(())
}
async fn handle_pong(&self, sender: NodeId, src: SocketAddr) -> Result<(), BellandeMeshError> {
pub async fn handle_pong(
&self,
sender: NodeId,
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let mut nodes = self
.nodes
.write()
@ -904,7 +978,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn restore_nodes(&self, nodes: Vec<Node>) -> Result<(), BellandeMeshError> {
pub async fn restore_nodes(&self, nodes: Vec<Node>) -> Result<(), BellandeMeshError> {
let mut current_nodes = self.nodes.write().await;
for node in nodes {
if !current_nodes.iter().any(|n| n.id == node.id) {
@ -937,7 +1011,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn sync_with_peers(&self) -> Result<(), BellandeMeshError> {
pub async fn sync_with_peers(&self) -> Result<(), BellandeMeshError> {
let nodes = {
let nodes_guard = self
.nodes
@ -978,21 +1052,24 @@ impl BellandeMeshSync {
node_id: NodeId,
chunks: Vec<DataChunk>,
) -> Result<(), BellandeMeshError> {
// Get read lock on nodes
let nodes = self.nodes.read().await;
// Find the target node
if let Some(node) = nodes.iter().find(|n| n.id == node_id) {
let mut data = node
.data
.write()
.map_err(|_| BellandeMeshError::LockError)?;
// Get write lock on node's data
let mut data = node.data.write().await;
// Insert all chunks
for chunk in chunks {
data.insert(chunk.id.clone(), chunk);
self.metrics.increment_sync_operations();
}
}
Ok(())
}
async fn send_response(
pub async fn send_response(
&self,
src: SocketAddr,
closest_nodes: Vec<Node>,
@ -1005,17 +1082,15 @@ impl BellandeMeshSync {
self.send_message(src, &response).await
}
async fn start_metrics_collection(&self) -> Result<(), BellandeMeshError> {
pub async fn start_metrics_collection(&self, interval: u64) -> Result<(), BellandeMeshError> {
let metrics = self.metrics.clone();
let running = self.running.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
while *running.read().await {
let mut interval = tokio::time::interval(Duration::from_secs(interval));
while let Ok(true) = running.read().map(|guard| *guard) {
interval.tick().await;
if let Err(e) = metrics.update_metrics().await {
eprintln!("Metrics error: {}", e);
}
metrics.increment_sync_operations();
}
});
@ -1023,57 +1098,56 @@ impl BellandeMeshSync {
}
pub async fn set_max_connections(&self, max_conn: usize) -> Result<(), BellandeMeshError> {
let nodes = self.nodes.read().await;
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
if nodes.len() > max_conn {
// Remove excess connections, keeping the oldest/most reliable ones
let mut nodes = self.nodes.write().await;
nodes.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
nodes.truncate(max_conn);
self.update_stats(|stats| stats.active_nodes = max_conn)
.await;
}
Ok(())
}
pub async fn broadcast_data(&self, data: Vec<u8>) -> Result<(), BellandeMeshError> {
let nodes = self.nodes.read().await;
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let local_id = self.get_local_id().await?;
let message = Message::Store {
key: vec![], // Broadcast key
key: vec![],
value: data,
sender: self.get_local_id().await?,
sender: local_id,
token: rand::random(),
};
let mut send_errors = Vec::new();
for node in nodes.iter() {
if let Err(e) = self.send_message(node.address, &message).await {
send_errors.push((node.id.clone(), e));
eprintln!("Failed to broadcast to {}: {}", node.address, e);
}
}
// Update metrics
self.metrics.increment_sync_operations();
self.update_stats(|stats| stats.total_messages += 1).await;
// If any errors occurred, return the first one
if let Some((node_id, error)) = send_errors.first() {
return Err(BellandeMeshError::Custom(format!(
"Failed to broadcast to node {}: {}",
node_id, error
)));
}
Ok(())
}
pub async fn get_network_stats(&self) -> Result<NetworkStats, BellandeMeshError> {
let stats = self.stats.read().await;
let stats = self
.stats
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
Ok(NetworkStats {
tcp_connections: stats.tcp_connections,
udp_packets_received: stats.udp_packets_received,
http_requests: stats.http_requests,
https_requests: stats.https_requests,
active_nodes: self.nodes.read().await.len(),
active_nodes: nodes.len(),
total_messages: stats.total_messages,
start_time: stats.start_time,
last_sync: stats.last_sync,
@ -1085,30 +1159,27 @@ impl BellandeMeshSync {
node_id: NodeId,
data: Vec<u8>,
) -> Result<(), BellandeMeshError> {
let nodes = self.nodes.read().await;
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let node = nodes
.iter()
.find(|n| n.id == node_id)
.ok_or_else(|| BellandeMeshError::Custom("Node not found".to_string()))?;
let local_id = self.get_local_id().await?;
let message = Message::Store {
key: node_id.to_bytes(),
value: data,
sender: self.get_local_id().await?,
sender: local_id,
token: rand::random(),
};
// Send the message
self.send_message(node.address, &message).await?;
// Update metrics
self.metrics.increment_sync_operations();
self.update_stats(|stats| stats.total_messages += 1).await;
Ok(())
self.send_message(node.address, &message).await
}
async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> {
pub async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> {
let timeout = Duration::from_secs(self.config.node_timeout);
let mut nodes = self
.nodes
@ -1132,14 +1203,15 @@ impl BellandeMeshSync {
Ok(())
}
async fn send_ping_to_all_nodes(&self) -> Result<(), BellandeMeshError> {
pub async fn send_ping_to_all_nodes(&self) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let local_id = self.get_local_id().await?;
let token = rand::random::<u64>();
let ping = Message::Ping {
sender: self.get_local_id()?,
sender: local_id,
token,
};
@ -1152,7 +1224,7 @@ impl BellandeMeshSync {
Ok(())
}
async fn read_message(&self, stream: &mut TcpStream) -> Result<Message, BellandeMeshError> {
pub async fn read_message(&self, stream: &mut TcpStream) -> Result<Message, BellandeMeshError> {
let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf)?;
let len = u32::from_be_bytes(len_buf) as usize;
@ -1168,7 +1240,7 @@ impl BellandeMeshSync {
.map_err(|e| BellandeMeshError::Deserialization(e.to_string()))
}
async fn get_status(&self) -> String {
pub async fn get_status(&self) -> String {
let stats = self
.stats
.read()
@ -1178,16 +1250,7 @@ impl BellandeMeshSync {
serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "Error getting status".to_string())
}
fn update_stats<F>(&self, updater: F)
where
F: FnOnce(&mut NetworkStats),
{
if let Ok(mut stats) = self.stats.write() {
updater(&mut stats);
}
}
fn is_running(&self) -> Result<bool, BellandeMeshError> {
pub async fn is_running(&self) -> Result<bool, BellandeMeshError> {
self.running
.read()
.map_err(|_| BellandeMeshError::LockError)
@ -1201,7 +1264,7 @@ impl BellandeMeshSync {
.map(|nodes| nodes.first().map(|n| n.id).unwrap_or_else(|| NodeId::new()))
}
async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> {
pub async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
@ -1256,12 +1319,11 @@ impl BellandeMeshSync {
}
pub async fn is_node_connected(&self, node_id: &NodeId) -> Result<bool, BellandeMeshError> {
Ok(self
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?
.iter()
.any(|node| &node.id == node_id))
.map_err(|_| BellandeMeshError::LockError)?;
Ok(nodes.iter().any(|node| &node.id == node_id))
}
}

View File

@ -119,52 +119,120 @@ impl PersistenceManager {
Ok(nodes)
}
pub fn load_data_chunks(&self, node_id: &NodeId) -> Result<Vec<DataChunk>, BellandeMeshError> {
let chunks_dir = self.data_dir.join("data_chunks").join(node_id.to_hex());
if !chunks_dir.exists() {
return Ok(Vec::new());
}
let mut chunks = Vec::new();
const SUPPORTED_EXTENSIONS: [&str; 2] = ["bellande", "dat"];
for entry in std::fs::read_dir(chunks_dir)? {
let entry = entry?;
let path = entry.path();
if let Some(extension) = path.extension().and_then(|s| s.to_str()) {
if SUPPORTED_EXTENSIONS.contains(&extension) {
match self.read_chunk_file(&path) {
Ok(chunk) => chunks.push(chunk),
Err(e) => {
eprintln!("Error reading chunk file {:?}: {}", path, e);
continue;
}
}
}
}
}
chunks.sort_by(|a, b| a.id.cmp(&b.id));
Ok(chunks)
}
fn read_chunk_file(&self, path: &Path) -> Result<DataChunk, BellandeMeshError> {
let file = self.get_connection(path)?;
let reader = BufReader::new(file);
let extension = path
.extension()
.and_then(|s| s.to_str())
.ok_or_else(|| BellandeMeshError::Custom("Invalid file extension".to_string()))?;
match extension {
"bellande" => bincode::deserialize_from(reader).map_err(|e| {
BellandeMeshError::Deserialization(format!(
"Failed to deserialize .bellande file: {}",
e
))
}),
"dat" => bincode::deserialize_from(reader).map_err(|e| {
BellandeMeshError::Deserialization(format!(
"Failed to deserialize .dat file: {}",
e
))
}),
_ => Err(BellandeMeshError::Custom(
"Unsupported file extension".to_string(),
)),
}
}
pub fn save_data_chunk(
&self,
node_id: &NodeId,
chunk: &DataChunk,
use_bellande: bool,
) -> Result<(), BellandeMeshError> {
let path = self
.data_dir
.join("data_chunks")
.join(node_id.to_hex())
.join(format!("{}.dat", chunk.id.to_hex()));
let chunks_dir = self.data_dir.join("data_chunks").join(node_id.to_hex());
std::fs::create_dir_all(&chunks_dir)?;
let extension = if use_bellande { "bellande" } else { "dat" };
let file_name = format!("{}.{}", chunk.id.to_hex(), extension);
let path = chunks_dir.join(file_name);
std::fs::create_dir_all(path.parent().unwrap())?;
let file = self.get_connection(&path)?;
let writer = BufWriter::new(file);
bincode::serialize_into(writer, chunk)
bincode::serialize_into(file, chunk)
.map_err(|e| BellandeMeshError::Serialization(e.to_string()))?;
Ok(())
}
pub fn load_data_chunks(
pub async fn migrate_chunk_format(
&self,
node_id: &NodeId,
) -> Result<HashMap<NodeId, DataChunk>, BellandeMeshError> {
to_bellande: bool,
) -> Result<(), BellandeMeshError> {
let chunks = self.load_data_chunks(node_id)?;
let chunks_dir = self.data_dir.join("data_chunks").join(node_id.to_hex());
let temp_dir = chunks_dir.with_extension("temp");
std::fs::create_dir_all(&temp_dir)?;
if !chunks_dir.exists() {
return Ok(HashMap::new());
for chunk in &chunks {
let extension = if to_bellande { "bellande" } else { "dat" };
let file_name = format!("{}.{}", chunk.id.to_hex(), extension);
let path = temp_dir.join(file_name);
let file = self.get_connection(&path)?;
bincode::serialize_into(file, chunk)
.map_err(|e| BellandeMeshError::Serialization(e.to_string()))?;
}
let mut chunks = HashMap::new();
for entry in std::fs::read_dir(chunks_dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().and_then(|s| s.to_str()) == Some("dat") {
let file = self.get_connection(&path)?;
let reader = BufReader::new(file);
let chunk: DataChunk = bincode::deserialize_from(reader)
.map_err(|e| BellandeMeshError::Deserialization(e.to_string()))?;
chunks.insert(chunk.id, chunk);
}
let backup_dir = chunks_dir.with_extension("backup");
if chunks_dir.exists() {
std::fs::rename(&chunks_dir, &backup_dir)?;
}
Ok(chunks)
std::fs::rename(&temp_dir, &chunks_dir)?;
if backup_dir.exists() {
std::fs::remove_dir_all(backup_dir)?;
}
Ok(())
}
fn get_connection(&self, path: &Path) -> Result<File, BellandeMeshError> {