latest pushes

This commit is contained in:
Ronaldson Bellande 2024-10-25 00:10:40 -04:00
parent 6c02e9a0a5
commit 28749bd777
7 changed files with 780 additions and 110 deletions

View File

@ -15,7 +15,7 @@ categories = ["network-programming", "asynchronous"]
path = "src/bellande_mesh_sync.rs" path = "src/bellande_mesh_sync.rs"
[dependencies] [dependencies]
tokio = { version = "1.28", features = ["rt", "net", "time", "sync", "macros"] } tokio = { version = "1.0", features = ["full"] }
hyper-rustls = { version = "0.24", features = ["http1", "http2"] } hyper-rustls = { version = "0.24", features = ["http1", "http2"] }
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
hyper = { version = "0.14", features = ["full"] } hyper = { version = "0.14", features = ["full"] }
@ -24,3 +24,8 @@ serde_json = "1.0"
bincode = "1.3" bincode = "1.3"
rand = "0.8" rand = "0.8"
uuid = { version = "1.3", features = ["v4", "serde"] } uuid = { version = "1.3", features = ["v4", "serde"] }
rustls = "0.21"
chrono = { version = "0.4", features = ["serde"] }
sled = "0.34"
tokio-util = { version = "0.7", features = ["full"] }
async-trait = "0.1"

View File

@ -32,6 +32,38 @@
- Token validation - Token validation
- Node authentication - Node authentication
# Usage
```
use bellande_mesh_sync::{init, init_with_options, start, stop, MeshOptions, Config};
async fn example() -> Result<(), BellandeMeshError> {
// Basic initialization
let config = Config {
listen_address: "127.0.0.1:8000".to_string(),
node_timeout: 300,
};
let mesh = init(config.clone()).await?;
start(&mesh).await?;
// Or with custom options
let options = MeshOptions {
dev_mode: true,
metrics_interval: Some(30),
enable_persistence: true,
..Default::default()
};
let mesh = init_with_options(config, options).await?;
start(&mesh).await?;
// Use other functionalities
broadcast(&mesh, b"Hello network!".to_vec()).await?;
let stats = get_stats(&mesh).await?;
let nodes = get_nodes(&mesh).await?;
stop(&mesh).await?;
Ok(())
}
```
## Website Crates ## Website Crates
- https://crates.io/crates/bellande_mesh_sync - https://crates.io/crates/bellande_mesh_sync

View File

@ -13,6 +13,9 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use std::path::PathBuf;
use std::sync::Arc;
mod config; mod config;
mod data; mod data;
mod dht; mod dht;
@ -26,17 +29,152 @@ mod persistence;
pub use crate::config::config::Config; pub use crate::config::config::Config;
pub use crate::error::error::BellandeMeshError; pub use crate::error::error::BellandeMeshError;
pub use crate::mesh::mesh::BellandeMeshSync; pub use crate::mesh::mesh::BellandeMeshSync;
use crate::mesh::mesh::NetworkStats;
pub use crate::metrics::metrics::MetricsManager;
pub use crate::node::node::{Node, NodeId, PublicKey};
pub use crate::persistence::persistence::PersistenceManager;
/// Initialize the BellandeMeshSync System /// Configuration options for initializing the BellandeMeshSync system
/// This function takes a configuration and sets up the BellandeMeshSync System. #[derive(Debug, Clone)]
/// It returns a BellandeMeshSync instance that can be used to interact with the mesh network. pub struct MeshOptions {
/// Path to the TLS certificate file
pub cert_path: Option<PathBuf>,
/// Path to the TLS private key file
pub key_path: Option<PathBuf>,
/// Enable development mode (generates self-signed certificates)
pub dev_mode: bool,
/// Custom metrics collection interval in seconds
pub metrics_interval: Option<u64>,
/// Maximum number of concurrent connections
pub max_connections: Option<usize>,
/// Enable persistence layer
pub enable_persistence: bool,
/// Custom persistence directory
pub persistence_path: Option<PathBuf>,
}
impl Default for MeshOptions {
fn default() -> Self {
Self {
cert_path: None,
key_path: None,
dev_mode: false,
metrics_interval: Some(60),
max_connections: Some(1000),
enable_persistence: false,
persistence_path: None,
}
}
}
/// Initialize the BellandeMeshSync System with default options
pub async fn init(config: Config) -> Result<BellandeMeshSync, BellandeMeshError> { pub async fn init(config: Config) -> Result<BellandeMeshSync, BellandeMeshError> {
let bellande_mesh = BellandeMeshSync::new(&config)?; init_with_options(config, MeshOptions::default()).await
}
/// Initialize the BellandeMeshSync System with custom options
pub async fn init_with_options(
config: Config,
options: MeshOptions,
) -> Result<BellandeMeshSync, BellandeMeshError> {
// Initialize metrics manager
let metrics = Arc::new(MetricsManager::new());
// Initialize persistence if enabled
let persistence_manager = if options.enable_persistence {
let path = options
.persistence_path
.unwrap_or_else(|| PathBuf::from("data"));
Some(PersistenceManager::new(path.to_str().unwrap_or("data"))?)
} else {
None
};
// Initialize the mesh network with appropriate certificates
let bellande_mesh = match (options.cert_path, options.key_path, options.dev_mode) {
(Some(cert), Some(key), false) => {
BellandeMeshSync::new_with_certs(&config, cert, key, metrics.clone())?
}
(None, None, true) => {
// Use default certificates for development
let cert = PathBuf::from("certs/server.crt");
let key = PathBuf::from("certs/server.key");
BellandeMeshSync::new_with_certs(&config, cert, key, metrics.clone())?
}
(None, None, false) => BellandeMeshSync::new_with_metrics(&config, metrics.clone())?,
_ => return Err(BellandeMeshError::Custom(
"Invalid certificate configuration. Provide both cert and key paths, or enable dev mode, or neither.".into()
)),
};
// Restore persisted data if available
if let Some(persistence) = &persistence_manager {
let nodes = persistence.load_nodes()?;
bellande_mesh.restore_nodes(nodes).await?;
// Load data chunks for each node
for node in nodes {
let chunks = persistence.load_data_chunks(&node.id)?;
bellande_mesh.restore_data_chunks(node.id, chunks).await?;
}
}
// Configure metrics collection if enabled
if let Some(interval) = options.metrics_interval {
bellande_mesh.start_metrics_collection(interval).await?;
}
// Set connection limits if specified
if let Some(max_conn) = options.max_connections {
bellande_mesh.set_max_connections(max_conn).await?;
}
Ok(bellande_mesh) Ok(bellande_mesh)
} }
/// Start the BellandeMeshSync System /// Start the BellandeMeshSync System
/// This function takes a BellandeMeshSync instance and starts the mesh network operations.
pub async fn start(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> { pub async fn start(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> {
bellande_mesh.start().await bellande_mesh.start().await
} }
/// Stop the BellandeMeshSync System
pub async fn stop(bellande_mesh: &BellandeMeshSync) -> Result<(), BellandeMeshError> {
bellande_mesh.stop().await
}
/// Broadcast a message to all nodes in the network
pub async fn broadcast(
bellande_mesh: &BellandeMeshSync,
data: Vec<u8>,
) -> Result<(), BellandeMeshError> {
bellande_mesh.broadcast_data(data).await
}
/// Get the current network statistics
pub async fn get_stats(
bellande_mesh: &BellandeMeshSync,
) -> Result<NetworkStats, BellandeMeshError> {
bellande_mesh.get_network_stats().await
}
/// Get the list of currently connected nodes
pub async fn get_nodes(bellande_mesh: &BellandeMeshSync) -> Result<Vec<Node>, BellandeMeshError> {
bellande_mesh.get_all_nodes().await
}
/// Check if a specific node is connected to the network
pub async fn is_node_connected(
bellande_mesh: &BellandeMeshSync,
node_id: NodeId,
) -> Result<bool, BellandeMeshError> {
bellande_mesh.is_node_connected(&node_id).await
}
/// Send a message to a specific node
pub async fn send_to_node(
bellande_mesh: &BellandeMeshSync,
node_id: NodeId,
data: Vec<u8>,
) -> Result<(), BellandeMeshError> {
bellande_mesh.send_data_to_node(node_id, data).await
}

View File

@ -13,18 +13,18 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::node::node::NodeId;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::time::SystemTime; use std::time::SystemTime;
use uuid::Uuid;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct DataChunk { pub struct DataChunk {
pub id: Uuid, pub id: NodeId,
pub content: Vec<u8>, pub content: Vec<u8>,
pub checksum: String, pub checksum: String,
pub version: u64, pub version: u64,
pub last_modified: SystemTime, pub last_modified: SystemTime,
pub author: Uuid, pub author: NodeId,
pub parent_versions: Vec<u64>, pub parent_versions: Vec<u64>,
} }
@ -33,17 +33,20 @@ impl DataChunk {
content: Vec<u8>, content: Vec<u8>,
checksum: String, checksum: String,
version: u64, version: u64,
author: Uuid, author: NodeId,
parent_versions: Vec<u64>, parent_versions: Vec<u64>,
) -> Self { ) -> Self {
Self { Self {
id: Uuid::new_v4(), id: NodeId::new(),
content, content,
checksum, checksum,
version, version,
last_modified: SystemTime::now(), last_modified: SystemTime::now(),
author, author: NodeId::new(),
parent_versions, parent_versions,
} }
} }
pub fn size(&self) -> usize {
std::mem::size_of::<NodeId>() * 2 + self.content.len() + std::mem::size_of::<SystemTime>()
}
} }

View File

@ -14,25 +14,41 @@
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::config::config::Config; use crate::config::config::Config;
use crate::data::data::DataChunk;
use crate::error::error::BellandeMeshError; use crate::error::error::BellandeMeshError;
use crate::node::node::{DataChunk, Message, Node, NodeId, PublicKey}; pub use crate::metrics::metrics::MetricsManager;
use crate::node::node::{Message, Node, NodeId, PublicKey};
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Client, Request, Response, Server, StatusCode}; use hyper::{Body, Client, Request, Response, Server, StatusCode};
use hyper_rustls::HttpsConnectorBuilder; use hyper_rustls::HttpsConnectorBuilder;
use rustls::{Certificate, PrivateKey, ServerConfig};
use serde::Serialize; use serde::Serialize;
use serde_json;
use std::convert::Infallible; use std::convert::Infallible;
use std::fs;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{SocketAddr, TcpStream}; use std::net::{SocketAddr, TcpStream};
use std::path::Path;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime}; use std::time::{Duration, SystemTime};
use tokio::net::{TcpListener as TokioTcpListener, UdpSocket as TokioUdpSocket}; use tokio::net::{TcpListener as TokioTcpListener, UdpSocket as TokioUdpSocket};
use tokio::sync::mpsc; use tokio::signal;
use tokio::sync::{mpsc, Mutex};
use tokio::time::sleep; use tokio::time::sleep;
use tokio_rustls::rustls::{Certificate, PrivateKey, ServerConfig}; use tokio_util::sync::CancellationToken;
use tokio_rustls::TlsAcceptor;
// Constants
const UDP_BUFFER_SIZE: usize = 65536;
const HTTP_PORT_OFFSET: u16 = 1;
const HTTPS_PORT_OFFSET: u16 = 2;
const MAX_MESSAGE_SIZE: usize = 1024 * 1024; // 1MB
const CHANNEL_BUFFER_SIZE: usize = 1000;
const SYNC_INTERVAL: Duration = Duration::from_secs(60);
const CLEANUP_INTERVAL: Duration = Duration::from_secs(300);
const PING_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
struct NetworkStats { pub struct NetworkStats {
tcp_connections: usize, tcp_connections: usize,
udp_packets_received: usize, udp_packets_received: usize,
http_requests: usize, http_requests: usize,
@ -66,12 +82,39 @@ pub struct BellandeMeshSync {
http_client: Client<hyper::client::HttpConnector>, http_client: Client<hyper::client::HttpConnector>,
https_client: Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>, https_client: Client<hyper_rustls::HttpsConnector<hyper::client::HttpConnector>>,
stats: Arc<RwLock<NetworkStats>>, stats: Arc<RwLock<NetworkStats>>,
metrics: Arc<MetricsManager>,
message_tx: mpsc::Sender<(Message, SocketAddr)>, message_tx: mpsc::Sender<(Message, SocketAddr)>,
message_rx: Arc<tokio::sync::Mutex<mpsc::Receiver<(Message, SocketAddr)>>>, message_rx: Arc<Mutex<mpsc::Receiver<(Message, SocketAddr)>>>,
cancel_token: CancellationToken,
}
#[async_trait::async_trait]
pub trait MeshTransport: Send + Sync {
async fn start(&self) -> Result<(), BellandeMeshError>;
async fn stop(&self) -> Result<(), BellandeMeshError>;
async fn broadcast_data(&self, data: Vec<u8>) -> Result<(), BellandeMeshError>;
async fn get_network_stats(&self) -> Result<NetworkStats, BellandeMeshError>;
async fn get_all_nodes(&self) -> Result<Vec<Node>, BellandeMeshError>;
async fn is_node_connected(&self, node_id: &NodeId) -> Result<bool, BellandeMeshError>;
async fn send_data_to_node(
&self,
node_id: NodeId,
data: Vec<u8>,
) -> Result<(), BellandeMeshError>;
async fn restore_data_chunks(
&self,
node_id: NodeId,
chunks: Vec<DataChunk>,
) -> Result<(), BellandeMeshError>;
async fn start_metrics_collection(&self, interval: u64) -> Result<(), BellandeMeshError>;
async fn set_max_connections(&self, max_conn: usize) -> Result<(), BellandeMeshError>;
} }
impl BellandeMeshSync { impl BellandeMeshSync {
pub fn new(config: &Config) -> Result<Self, BellandeMeshError> { pub fn new_with_metrics(
config: &Config,
metrics: Arc<MetricsManager>,
) -> Result<Self, BellandeMeshError> {
let tls_config = Self::create_tls_config()?; let tls_config = Self::create_tls_config()?;
let http_client = Client::new(); let http_client = Client::new();
let https_connector = HttpsConnectorBuilder::new() let https_connector = HttpsConnectorBuilder::new()
@ -82,6 +125,7 @@ impl BellandeMeshSync {
let https_client = Client::builder().build(https_connector); let https_client = Client::builder().build(https_connector);
let (message_tx, message_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE); let (message_tx, message_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let stats = Arc::new(RwLock::new(NetworkStats::default()));
Ok(Self { Ok(Self {
config: Arc::new(config.clone()), config: Arc::new(config.clone()),
@ -90,16 +134,58 @@ impl BellandeMeshSync {
tls_config: Arc::new(tls_config), tls_config: Arc::new(tls_config),
http_client, http_client,
https_client, https_client,
stats: Arc::new(RwLock::new(NetworkStats::default())), stats,
metrics,
message_tx, message_tx,
message_rx: Arc::new(tokio::sync::Mutex::new(message_rx)), message_rx: Arc::new(Mutex::new(message_rx)),
cancel_token: CancellationToken::new(),
}) })
} }
pub fn new_with_certs(
config: &Config,
cert_path: std::path::PathBuf,
key_path: std::path::PathBuf,
metrics: Arc<MetricsManager>,
) -> Result<Self, BellandeMeshError> {
let tls_config = Self::create_tls_config_from_files(&cert_path, &key_path)?;
let http_client = Client::new();
let https_connector = HttpsConnectorBuilder::new()
.with_native_roots()
.https_only()
.enable_http1()
.build();
let https_client = Client::builder().build(https_connector);
let (message_tx, message_rx) = mpsc::channel(CHANNEL_BUFFER_SIZE);
let stats = Arc::new(RwLock::new(NetworkStats::default()));
Ok(Self {
config: Arc::new(config.clone()),
nodes: Arc::new(RwLock::new(Vec::new())),
running: Arc::new(RwLock::new(true)),
tls_config: Arc::new(tls_config),
http_client,
https_client,
stats,
metrics,
message_tx,
message_rx: Arc::new(Mutex::new(message_rx)),
cancel_token: CancellationToken::new(),
})
}
// Default TLS configuration using embedded certificates
fn create_tls_config() -> Result<ServerConfig, BellandeMeshError> { fn create_tls_config() -> Result<ServerConfig, BellandeMeshError> {
let cert_path = Path::new("certs/server.crt"); let cert_path = Path::new("certs/server.crt");
let key_path = Path::new("certs/server.key"); let key_path = Path::new("certs/server.key");
Self::create_tls_config_from_files(cert_path, key_path)
}
fn create_tls_config_from_files(
cert_path: &Path,
key_path: &Path,
) -> Result<ServerConfig, BellandeMeshError> {
let cert_data = fs::read(cert_path) let cert_data = fs::read(cert_path)
.map_err(|e| BellandeMeshError::Custom(format!("Failed to read certificate: {}", e)))?; .map_err(|e| BellandeMeshError::Custom(format!("Failed to read certificate: {}", e)))?;
let key_data = fs::read(key_path) let key_data = fs::read(key_path)
@ -144,7 +230,8 @@ impl BellandeMeshSync {
async fn start_message_handler(&self) -> Result<(), BellandeMeshError> { async fn start_message_handler(&self) -> Result<(), BellandeMeshError> {
let handler = self.clone(); let handler = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some((message, addr)) = handler.message_rx.recv().await { let mut rx = handler.message_rx.lock().await;
while let Some((message, addr)) = rx.recv().await {
if let Err(e) = handler.handle_message_internal(message, addr).await { if let Err(e) = handler.handle_message_internal(message, addr).await {
eprintln!("Message handling error from {}: {}", addr, e); eprintln!("Message handling error from {}: {}", addr, e);
} }
@ -224,23 +311,40 @@ impl BellandeMeshSync {
.map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?; .map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?;
let http_addr = SocketAddr::new(addr.ip(), addr.port() + HTTP_PORT_OFFSET); let http_addr = SocketAddr::new(addr.ip(), addr.port() + HTTP_PORT_OFFSET);
let metrics = self.metrics.clone();
let handler = self.clone(); let handler = self.clone();
let make_service = make_service_fn(move |_| { let make_service = make_service_fn(move |_| {
let handler = handler.clone(); let handler = handler.clone();
let metrics = metrics.clone();
async move { async move {
Ok::<_, Infallible>(service_fn(move |req| { Ok::<_, Infallible>(service_fn(move |req| {
let handler = handler.clone(); let handler = handler.clone();
async move { handler.handle_http_request(req).await } let metrics = metrics.clone();
async move {
metrics.increment_sync_operations();
handler.handle_http_request(req).await
}
})) }))
} }
}); });
let server = Server::bind(&http_addr).serve(make_service); let server = Server::bind(&http_addr)
tokio::spawn(server); .serve(make_service)
.with_graceful_shutdown(Self::shutdown_signal());
tokio::spawn(server);
Ok(()) Ok(())
} }
async fn shutdown_signal() {
if let Err(e) = signal::ctrl_c().await {
eprintln!("Failed to install CTRL+C signal handler: {}", e);
}
}
async fn start_https_server(&self) -> Result<(), BellandeMeshError> { async fn start_https_server(&self) -> Result<(), BellandeMeshError> {
let addr = self let addr = self
.config .config
@ -248,27 +352,41 @@ impl BellandeMeshSync {
.parse::<SocketAddr>() .parse::<SocketAddr>()
.map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?; .map_err(|e| BellandeMeshError::Custom(format!("Invalid address: {}", e)))?;
let https_addr = SocketAddr::new(addr.ip(), addr.port() + HTTPS_PORT_OFFSET); let https_addr = SocketAddr::new(addr.ip(), addr.port() + HTTPS_PORT_OFFSET);
let tls_cfg = self.tls_config.clone();
let handler = self.clone(); let handler = self.clone();
let acceptor = TlsAcceptor::from(tls_cfg); let tls_config = self.tls_config.clone();
let listener = TokioTcpListener::bind(https_addr).await?; let listener = TokioTcpListener::bind(https_addr).await?;
tokio::spawn(async move { tokio::spawn(async move {
while handler.is_running().unwrap_or(false) { while handler.is_running().unwrap_or(false) {
if let Ok((stream, addr)) = listener.accept().await { if let Ok((stream, addr)) = listener.accept().await {
let acceptor = acceptor.clone();
let handler = handler.clone(); let handler = handler.clone();
let tls_config = tls_config.clone();
tokio::spawn(async move { tokio::spawn(async move {
match acceptor.accept(stream).await { // Create TLS server connection
Ok(tls_stream) => { let tls_conn = match rustls::ServerConnection::new(tls_config.clone()) {
Ok(conn) => conn,
Err(e) => {
eprintln!("TLS connection creation error: {}", e);
return;
}
};
// Create async TLS stream
let mut tls_stream = tokio_rustls::server::TlsStream::new(stream, tls_conn);
// Perform TLS handshake
match tls_stream.do_handshake().await {
Ok(_) => {
// Handle secure connection
if let Err(e) = handler.handle_https_connection(tls_stream).await { if let Err(e) = handler.handle_https_connection(tls_stream).await {
eprintln!("HTTPS error from {}: {}", addr, e); eprintln!("HTTPS error from {}: {}", addr, e);
} }
} }
Err(e) => eprintln!("TLS accept error from {}: {}", addr, e), Err(e) => {
eprintln!("TLS handshake error from {}: {}", addr, e);
}
} }
}); });
} }
@ -451,22 +569,228 @@ impl BellandeMeshSync {
Message::DataRequest { ids } => { Message::DataRequest { ids } => {
self.handle_data_request(&ids, src).await?; self.handle_data_request(&ids, src).await?;
} }
Message::Ping { sender, token } => { Message::Ping { sender: _, token } => {
self.handle_ping(sender, token, src).await?; self.handle_ping(token, src).await?;
} }
Message::Pong { sender, token } => { Message::Pong { sender, token: _ } => {
self.handle_pong(sender, token, src).await?; self.handle_pong(sender, src).await?;
}
Message::Store {
key,
value,
sender: _,
token: _,
} => {
self.handle_store(key, value, src).await?;
}
Message::FindNode {
target,
sender: _,
token: _,
} => {
self.handle_find_node(target, src).await?;
}
Message::FindValue {
key,
sender: _,
token: _,
} => {
self.handle_find_value(key, src).await?;
}
Message::Nodes {
nodes,
sender: _,
token: _,
} => {
self.handle_nodes(nodes, src).await?;
}
Message::Value {
key,
value,
sender: _,
token: _,
} => {
self.handle_value(key, value, src).await?;
} }
Message::Heartbeat => { Message::Heartbeat => {
self.update_node_last_seen(src).await?; self.update_node_last_seen(src).await?;
} }
_ => { _ => {}
eprintln!("Unhandled message type from {}", src); }
Ok(())
}
async fn handle_store(
&self,
key: Vec<u8>,
value: Vec<u8>,
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
self.metrics.increment_sync_operations();
let chunk = DataChunk::new(
value.clone(),
"checksum".to_string(),
0,
self.get_local_id()?,
vec![],
);
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
if let Some(node) = nodes.iter().find(|n| n.address == src) {
let mut data = node
.data
.write()
.map_err(|_| BellandeMeshError::LockError)?;
data.insert(NodeId::from_bytes(&key), chunk);
// Replicate to closest nodes
let closest_nodes = self.find_closest_nodes(&NodeId::from_bytes(&key), 3);
for target_node in closest_nodes {
let store_msg = Message::Store {
key: key.clone(),
value: value.clone(),
sender: self.get_local_id()?,
token: rand::random(),
};
if let Err(e) = self.send_message(target_node.address, &store_msg).await {
eprintln!(
"Failed to replicate store to {}: {}",
target_node.address, e
);
}
}
}
Ok(())
}
async fn handle_find_node(
&self,
target: NodeId,
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let closest_nodes = self.find_closest_nodes(&target, 20).await;
let response = Message::Nodes {
nodes: closest_nodes,
sender: self.get_local_id().await?,
token: rand::random(),
};
self.send_message(src, &response).await?;
Ok(())
}
async fn handle_find_value(
&self,
key: Vec<u8>,
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let node_id = NodeId::from_bytes(&key);
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
for node in nodes.iter() {
if let Ok(data) = node.data.read() {
if let Some(chunk) = data.get(&node_id) {
let response = Message::Value {
key: key.clone(),
value: chunk.content.clone(),
sender: self.get_local_id()?,
token: rand::random(),
};
return self.send_message(src, &response).await;
}
}
}
// If not found, return closest nodes
let closest_nodes = self.find_closest_nodes(&node_id, 20);
let response = Message::Nodes {
nodes: closest_nodes,
sender: self.get_local_id()?,
token: rand::random(),
};
self.send_message(src, &response).await?;
Ok(())
}
async fn handle_nodes(
&self,
new_nodes: Vec<Node>,
_src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let mut nodes = self.nodes.write().await;
for new_node in new_nodes {
if !nodes.iter().any(|n| n.id == new_node.id) {
nodes.push(new_node);
self.metrics.increment_sync_operations();
} }
} }
Ok(()) Ok(())
} }
async fn handle_value(
&self,
key: Vec<u8>,
value: Vec<u8>,
_src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let chunk = DataChunk::new(
value,
"checksum".to_string(),
0,
self.get_local_id()?,
vec![],
);
let mut nodes = self
.nodes
.write()
.map_err(|_| BellandeMeshError::LockError)?;
if let Some(node) = nodes.first_mut() {
if let Ok(mut data) = node.data.write() {
data.insert(NodeId::from_bytes(&key), chunk);
self.metrics.increment_sync_operations();
}
}
Ok(())
}
async fn find_closest_nodes(&self, target: &NodeId, count: usize) -> Vec<Node> {
let nodes = self.nodes.read().await;
let mut distances: Vec<_> = nodes
.iter()
.map(|node| (node.clone(), self.calculate_distance(&node.id, target)))
.collect();
distances.sort_by(|a, b| a.1.cmp(&b.1));
distances
.into_iter()
.take(count)
.map(|(node, _)| node)
.collect()
}
// XOR distance metric for Kademlia-like routing
fn calculate_distance(&self, node1: &NodeId, node2: &NodeId) -> u64 {
let n1_bytes = node1.to_bytes();
let n2_bytes = node2.to_bytes();
let mut distance = 0u64;
for i in 0..8 {
if i < n1_bytes.len() && i < n2_bytes.len() {
distance ^= (n1_bytes[i] ^ n2_bytes[i]) as u64;
}
}
distance
}
async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> { async fn update_node_last_seen(&self, addr: SocketAddr) -> Result<(), BellandeMeshError> {
let mut nodes = self let mut nodes = self
.nodes .nodes
@ -560,12 +884,7 @@ impl BellandeMeshSync {
Ok(()) Ok(())
} }
async fn handle_ping( async fn handle_ping(&self, token: u64, src: SocketAddr) -> Result<(), BellandeMeshError> {
&self,
sender: NodeId,
token: u64,
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let response = Message::Pong { let response = Message::Pong {
sender: self.get_local_id()?, sender: self.get_local_id()?,
token, token,
@ -574,12 +893,7 @@ impl BellandeMeshSync {
Ok(()) Ok(())
} }
async fn handle_pong( async fn handle_pong(&self, sender: NodeId, src: SocketAddr) -> Result<(), BellandeMeshError> {
&self,
sender: NodeId,
token: u64,
src: SocketAddr,
) -> Result<(), BellandeMeshError> {
let mut nodes = self let mut nodes = self
.nodes .nodes
.write() .write()
@ -590,23 +904,35 @@ impl BellandeMeshSync {
Ok(()) Ok(())
} }
async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> { async fn restore_nodes(&self, nodes: Vec<Node>) -> Result<(), BellandeMeshError> {
let nodes = self let mut current_nodes = self.nodes.write().await;
.nodes for node in nodes {
.read() if !current_nodes.iter().any(|n| n.id == node.id) {
.map_err(|_| BellandeMeshError::LockError)?; self.metrics.increment_sync_operations();
let message = Message::JoinResponse { current_nodes.push(node);
accepted: true,
nodes: vec![new_node.clone()],
};
for node in nodes.iter() {
if node.id != new_node.id {
if let Err(e) = self.send_message(node.address, &message).await {
eprintln!("Failed to broadcast new node to {}: {}", node.address, e);
}
} }
} }
Ok(())
}
pub async fn send_message(
&self,
addr: SocketAddr,
msg: &Message,
) -> Result<(), BellandeMeshError> {
let data =
bincode::serialize(msg).map_err(|e| BellandeMeshError::Serialization(e.to_string()))?;
if data.len() > MAX_MESSAGE_SIZE {
return Err(BellandeMeshError::Custom("Message too large".to_string()));
}
let mut stream =
TcpStream::connect(addr).map_err(|e| BellandeMeshError::NetworkError(e.to_string()))?;
stream.write_all(&(data.len() as u32).to_be_bytes())?;
stream.write_all(&data)?;
stream.flush()?;
Ok(()) Ok(())
} }
@ -647,6 +973,141 @@ impl BellandeMeshSync {
Ok(()) Ok(())
} }
pub async fn restore_data_chunks(
&self,
node_id: NodeId,
chunks: Vec<DataChunk>,
) -> Result<(), BellandeMeshError> {
let nodes = self.nodes.read().await;
if let Some(node) = nodes.iter().find(|n| n.id == node_id) {
let mut data = node
.data
.write()
.map_err(|_| BellandeMeshError::LockError)?;
for chunk in chunks {
data.insert(chunk.id.clone(), chunk);
self.metrics.increment_sync_operations();
}
}
Ok(())
}
async fn send_response(
&self,
src: SocketAddr,
closest_nodes: Vec<Node>,
) -> Result<(), BellandeMeshError> {
let response = Message::Nodes {
nodes: closest_nodes,
sender: self.get_local_id().await?,
token: rand::random(),
};
self.send_message(src, &response).await
}
async fn start_metrics_collection(&self) -> Result<(), BellandeMeshError> {
let metrics = self.metrics.clone();
let running = self.running.clone();
tokio::spawn(async move {
let mut interval = tokio::time::interval(Duration::from_secs(60));
while *running.read().await {
interval.tick().await;
if let Err(e) = metrics.update_metrics().await {
eprintln!("Metrics error: {}", e);
}
}
});
Ok(())
}
pub async fn set_max_connections(&self, max_conn: usize) -> Result<(), BellandeMeshError> {
let nodes = self.nodes.read().await;
if nodes.len() > max_conn {
// Remove excess connections, keeping the oldest/most reliable ones
let mut nodes = self.nodes.write().await;
nodes.sort_by(|a, b| b.last_seen.cmp(&a.last_seen));
nodes.truncate(max_conn);
self.update_stats(|stats| stats.active_nodes = max_conn)
.await;
}
Ok(())
}
pub async fn broadcast_data(&self, data: Vec<u8>) -> Result<(), BellandeMeshError> {
let nodes = self.nodes.read().await;
let message = Message::Store {
key: vec![], // Broadcast key
value: data,
sender: self.get_local_id().await?,
token: rand::random(),
};
let mut send_errors = Vec::new();
for node in nodes.iter() {
if let Err(e) = self.send_message(node.address, &message).await {
send_errors.push((node.id.clone(), e));
}
}
// Update metrics
self.metrics.increment_sync_operations();
self.update_stats(|stats| stats.total_messages += 1).await;
// If any errors occurred, return the first one
if let Some((node_id, error)) = send_errors.first() {
return Err(BellandeMeshError::Custom(format!(
"Failed to broadcast to node {}: {}",
node_id, error
)));
}
Ok(())
}
pub async fn get_network_stats(&self) -> Result<NetworkStats, BellandeMeshError> {
let stats = self.stats.read().await;
Ok(NetworkStats {
tcp_connections: stats.tcp_connections,
udp_packets_received: stats.udp_packets_received,
http_requests: stats.http_requests,
https_requests: stats.https_requests,
active_nodes: self.nodes.read().await.len(),
total_messages: stats.total_messages,
start_time: stats.start_time,
last_sync: stats.last_sync,
})
}
pub async fn send_data_to_node(
&self,
node_id: NodeId,
data: Vec<u8>,
) -> Result<(), BellandeMeshError> {
let nodes = self.nodes.read().await;
let node = nodes
.iter()
.find(|n| n.id == node_id)
.ok_or_else(|| BellandeMeshError::Custom("Node not found".to_string()))?;
let message = Message::Store {
key: node_id.to_bytes(),
value: data,
sender: self.get_local_id().await?,
token: rand::random(),
};
// Send the message
self.send_message(node.address, &message).await?;
// Update metrics
self.metrics.increment_sync_operations();
self.update_stats(|stats| stats.total_messages += 1).await;
Ok(())
}
async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> { async fn cleanup_dead_nodes(&self) -> Result<(), BellandeMeshError> {
let timeout = Duration::from_secs(self.config.node_timeout); let timeout = Duration::from_secs(self.config.node_timeout);
let mut nodes = self let mut nodes = self
@ -691,24 +1152,6 @@ impl BellandeMeshSync {
Ok(()) Ok(())
} }
async fn send_message(&self, addr: SocketAddr, msg: &Message) -> Result<(), BellandeMeshError> {
let data =
bincode::serialize(msg).map_err(|e| BellandeMeshError::Serialization(e.to_string()))?;
if data.len() > MAX_MESSAGE_SIZE {
return Err(BellandeMeshError::Custom("Message too large".to_string()));
}
let mut stream =
TcpStream::connect(addr).map_err(|e| BellandeMeshError::NetworkError(e.to_string()))?;
stream.write_all(&(data.len() as u32).to_be_bytes())?;
stream.write_all(&data)?;
stream.flush()?;
Ok(())
}
async fn read_message(&self, stream: &mut TcpStream) -> Result<Message, BellandeMeshError> { async fn read_message(&self, stream: &mut TcpStream) -> Result<Message, BellandeMeshError> {
let mut len_buf = [0u8; 4]; let mut len_buf = [0u8; 4];
stream.read_exact(&mut len_buf)?; stream.read_exact(&mut len_buf)?;
@ -729,8 +1172,9 @@ impl BellandeMeshSync {
let stats = self let stats = self
.stats .stats
.read() .read()
.map(|stats| stats.clone()) .map(|guard| guard.clone())
.unwrap_or_else(|_| NetworkStats::default()); .unwrap_or_else(|_| NetworkStats::default());
serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "Error getting status".to_string()) serde_json::to_string_pretty(&stats).unwrap_or_else(|_| "Error getting status".to_string())
} }
@ -750,12 +1194,75 @@ impl BellandeMeshSync {
.map(|guard| *guard) .map(|guard| *guard)
} }
fn get_local_id(&self) -> Result<NodeId, BellandeMeshError> { pub async fn get_local_id(&self) -> Result<NodeId, BellandeMeshError> {
self.nodes self.nodes
.read() .read()
.map_err(|_| BellandeMeshError::LockError) .map_err(|_| BellandeMeshError::LockError)
.map(|nodes| nodes.first().map(|n| n.id).unwrap_or_else(|| NodeId::new())) .map(|nodes| nodes.first().map(|n| n.id).unwrap_or_else(|| NodeId::new()))
} }
async fn broadcast_new_node(&self, new_node: &Node) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
let message = Message::JoinResponse {
accepted: true,
nodes: vec![new_node.clone()],
};
for node in nodes.iter() {
if node.id != new_node.id {
if let Err(e) = self.send_message(node.address, &message).await {
eprintln!("Failed to broadcast new node to {}: {}", node.address, e);
}
}
}
Ok(())
}
pub async fn broadcast_message(&self, message: Message) -> Result<(), BellandeMeshError> {
let nodes = self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?;
for node in nodes.iter() {
if let Err(e) = self.send_message(node.address, &message).await {
eprintln!("Failed to broadcast message to {}: {}", node.address, e);
}
}
Ok(())
}
pub async fn get_node_count(&self) -> Result<usize, BellandeMeshError> {
Ok(self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?
.len())
}
pub async fn get_node_list(&self) -> Result<Vec<NodeId>, BellandeMeshError> {
Ok(self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?
.iter()
.map(|node| node.id)
.collect())
}
pub async fn is_node_connected(&self, node_id: &NodeId) -> Result<bool, BellandeMeshError> {
Ok(self
.nodes
.read()
.map_err(|_| BellandeMeshError::LockError)?
.iter()
.any(|node| &node.id == node_id))
}
} }
impl Clone for BellandeMeshSync { impl Clone for BellandeMeshSync {
@ -768,8 +1275,10 @@ impl Clone for BellandeMeshSync {
http_client: self.http_client.clone(), http_client: self.http_client.clone(),
https_client: self.https_client.clone(), https_client: self.https_client.clone(),
stats: Arc::clone(&self.stats), stats: Arc::clone(&self.stats),
metrics: Arc::clone(&self.metrics),
message_tx: self.message_tx.clone(), message_tx: self.message_tx.clone(),
message_rx: Arc::clone(&self.message_rx), message_rx: Arc::clone(&self.message_rx),
cancel_token: CancellationToken::new(),
} }
} }
} }

View File

@ -13,6 +13,7 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::data::data::DataChunk;
use rand::{thread_rng, RngCore}; use rand::{thread_rng, RngCore};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::Ordering; use std::cmp::Ordering;
@ -31,6 +32,10 @@ impl NodeId {
NodeId(id) NodeId(id)
} }
pub fn to_bytes(&self) -> Vec<u8> {
bincode::serialize(self).unwrap_or_default()
}
pub fn from_bytes(bytes: &[u8]) -> Self { pub fn from_bytes(bytes: &[u8]) -> Self {
let mut id = [0u8; 32]; let mut id = [0u8; 32];
let len = std::cmp::min(bytes.len(), 32); let len = std::cmp::min(bytes.len(), 32);
@ -68,29 +73,6 @@ impl PublicKey {
} }
} }
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DataChunk {
pub id: NodeId,
pub content: Vec<u8>,
pub timestamp: SystemTime,
pub author: NodeId,
}
impl DataChunk {
pub fn new(id: NodeId, content: Vec<u8>, author: NodeId) -> Self {
Self {
id,
content,
timestamp: SystemTime::now(),
author,
}
}
pub fn size(&self) -> usize {
std::mem::size_of::<NodeId>() * 2 + self.content.len() + std::mem::size_of::<SystemTime>()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub enum Message { pub enum Message {
Ping { Ping {
@ -203,7 +185,7 @@ impl Node {
Ok(mut data) => { Ok(mut data) => {
let now = SystemTime::now(); let now = SystemTime::now();
data.retain(|_, chunk| { data.retain(|_, chunk| {
now.duration_since(chunk.timestamp) now.duration_since(chunk.last_modified)
.map(|age| age <= max_age) .map(|age| age <= max_age)
.unwrap_or(false) .unwrap_or(false)
}); });

View File

@ -13,8 +13,9 @@
// You should have received a copy of the GNU General Public License // You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>. // along with this program. If not, see <https://www.gnu.org/licenses/>.
use crate::data::data::DataChunk;
use crate::error::error::BellandeMeshError; use crate::error::error::BellandeMeshError;
use crate::node::node::{DataChunk, Node, NodeId, PublicKey}; use crate::node::node::{Node, NodeId, PublicKey};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::{File, OpenOptions}; use std::fs::{File, OpenOptions};