diff --git a/Cargo.toml b/Cargo.toml index 6f66f91..82d2bba 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ slog = "2.7.0" slog-term = "2.9.1" rand = "0.8" chrono = "0.4" +thiserror = "1.0" [dev-dependencies] tempfile = "3.10.1" diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..3af5544 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,56 @@ +// organization : SpacewalkHq +// License : MIT License + +use std::net::SocketAddr; +use thiserror::Error; + +/// wrapper around std library error +pub type Result = std::result::Result; + +#[derive(Error, Debug)] +pub enum Error { + // Storage layer specific error + #[error("Storage error {0}")] + Store(#[from] StorageError), + // Network layer specific error + #[error("Network error {0}")] + Network(#[from] NetworkError), + // To handle all std lib io error + #[error("File error {0}")] + Io(#[from] std::io::Error), + /// Some other error occurred. + #[error("unknown error {0}")] + Unknown(#[from] Box), +} + +#[derive(Error, Debug)] +pub enum NetworkError { + #[error("Accepting incoming connection failed")] + AcceptError, + #[error("Connection is closed")] + ConnectionClosedError, + #[error("Connection to {0} failed")] + ConnectError(SocketAddr), + #[error("Failed binding to {0}")] + BindError(SocketAddr), + #[error("Broadcast failed")] + BroadcastError, +} + +#[derive(Error, Debug)] +pub enum StorageError { + #[error("File is empty")] + EmptyFile, + #[error("File is corrupted")] + CorruptFile, + #[error("Data integrity check failed!")] + DataIntegrityError, + #[error("Storing log failed")] + StoreError, + #[error("Log compaction failed")] + CompactionError, + #[error("Log retrieval failed")] + RetrieveError, + #[error("Reading file metadata failed")] + MetaDataError, +} diff --git a/src/lib.rs b/src/lib.rs index dacd69a..a710423 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,7 @@ // organization : SpacewalkHq // License : MIT License +pub mod error; pub mod log; pub mod network; pub mod server; diff --git a/src/network.rs b/src/network.rs index f5e93ea..cfde7d5 100644 --- a/src/network.rs +++ b/src/network.rs @@ -1,32 +1,27 @@ // organization : SpacewalkHq // License : MIT License -use crate::parse_ip_address; -use async_trait::async_trait; -use futures::future::join_all; -use std::error::Error; use std::net::SocketAddr; use std::sync::Arc; + +use async_trait::async_trait; +use futures::future::join_all; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::net::{TcpListener, TcpStream}; use tokio::sync::Mutex; +use crate::error::NetworkError::ConnectionClosedError; +use crate::error::Result; +use crate::error::{Error, NetworkError}; +use crate::parse_ip_address; + #[async_trait] pub trait NetworkLayer: Send + Sync { - async fn send( - &self, - address: &str, - port: &str, - data: &[u8], - ) -> Result<(), Box>; - async fn receive(&self) -> Result, Box>; - async fn broadcast( - &self, - data: &[u8], - addresses: Vec, - ) -> Result<(), Box>; - async fn open(&self) -> Result<(), Box>; - async fn close(self) -> Result<(), Box>; + async fn send(&self, address: &str, port: &str, data: &[u8]) -> Result<()>; + async fn receive(&self) -> Result>; + async fn broadcast(&self, data: &[u8], addresses: Vec) -> Result<()>; + async fn open(&self) -> Result<()>; + async fn close(self) -> Result<()>; } #[derive(Debug, Clone)] @@ -47,23 +42,20 @@ impl TCPManager { } } - async fn async_send( - data: &[u8], - address: SocketAddr, - ) -> Result<(), Box> { - let mut stream = TcpStream::connect(address).await?; - stream.write_all(data).await?; + async fn async_send(data: &[u8], address: SocketAddr) -> Result<()> { + let mut stream = TcpStream::connect(address).await.map_err(Error::Io)?; + stream.write_all(data).await.map_err(Error::Io)?; Ok(()) } - async fn handle_receive(&self) -> Result, Box> { + async fn handle_receive(&self) -> Result> { let mut data = Vec::new(); let listener = self.listener.lock().await; if let Some(listener) = &*listener { - let (mut stream, _) = listener.accept().await?; + let (mut stream, _) = listener.accept().await.map_err(Error::Io)?; let mut buffer = Vec::new(); let mut reader = tokio::io::BufReader::new(&mut stream); - reader.read_to_end(&mut buffer).await?; + reader.read_to_end(&mut buffer).await.map_err(Error::Io)?; data = buffer; } Ok(data) @@ -72,26 +64,17 @@ impl TCPManager { #[async_trait] impl NetworkLayer for TCPManager { - async fn send( - &self, - address: &str, - port: &str, - data: &[u8], - ) -> Result<(), Box> { - let addr: SocketAddr = format!("{}:{}", address, port).parse()?; + async fn send(&self, address: &str, port: &str, data: &[u8]) -> Result<()> { + let addr: SocketAddr = format!("{}:{}", address, port).parse().unwrap(); Self::async_send(data, addr).await?; Ok(()) } - async fn receive(&self) -> Result, Box> { + async fn receive(&self) -> Result> { self.handle_receive().await } - async fn broadcast( - &self, - data: &[u8], - addresses: Vec, - ) -> Result<(), Box> { + async fn broadcast(&self, data: &[u8], addresses: Vec) -> Result<()> { let futures = addresses.into_iter().map(|address| { let (ip, port) = parse_ip_address(&address); let addr: SocketAddr = format!("{}:{}", ip, port).parse().unwrap(); @@ -100,26 +83,29 @@ impl NetworkLayer for TCPManager { join_all(futures) .await .into_iter() - .collect::>()?; + .collect::>() + .map_err(|_e| NetworkError::BroadcastError)?; Ok(()) } - async fn open(&self) -> Result<(), Box> { + async fn open(&self) -> Result<()> { let mut is_open = self.is_open.lock().await; if *is_open { - return Err("Listener is already open".into()); + return Err(Error::Unknown("Listener is already open".into())); } - let addr: SocketAddr = format!("{}:{}", self.address, self.port).parse()?; - let listener = TcpListener::bind(addr).await?; + let addr: SocketAddr = format!("{}:{}", self.address, self.port).parse().unwrap(); + let listener = TcpListener::bind(addr) + .await + .map_err(|_e| NetworkError::BindError(addr))?; *self.listener.lock().await = Some(listener); *is_open = true; Ok(()) } - async fn close(self) -> Result<(), Box> { + async fn close(self) -> Result<()> { let mut is_open = self.is_open.lock().await; if !*is_open { - return Err("Listener is not open".into()); + return Err(Error::Network(ConnectionClosedError)); } *self.listener.lock().await = None; *is_open = false; @@ -129,9 +115,10 @@ impl NetworkLayer for TCPManager { #[cfg(test)] mod tests { - use crate::network::{NetworkLayer, TCPManager}; use tokio::task::JoinSet; + use crate::network::{NetworkLayer, TCPManager}; + const LOCALHOST: &str = "127.0.0.1"; #[tokio::test] diff --git a/src/storage.rs b/src/storage.rs index deb0922..ff7496c 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,7 +1,6 @@ // Organization: SpacewalkHq // License: MIT License -use std::error::Error; use std::path::{Path, PathBuf}; use async_trait::async_trait; @@ -10,16 +9,19 @@ use sha2::{Digest, Sha256}; use tokio::fs::{self, File}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use crate::error::StorageError::CorruptFile; +use crate::error::{Error, Result, StorageError}; + const MAX_FILE_SIZE: u64 = 1_000_000; const CHECKSUM_LEN: usize = 64; #[async_trait] pub trait Storage { - async fn store(&self, data: &[u8]) -> Result<(), Box>; - async fn retrieve(&self) -> Result, Box>; - async fn compaction(&self) -> Result<(), Box>; - async fn delete(&self) -> Result<(), Box>; - async fn turned_malicious(&self) -> Result<(), Box>; + async fn store(&self, data: &[u8]) -> Result<()>; + async fn retrieve(&self) -> Result>; + async fn compaction(&self) -> Result<()>; + async fn delete(&self) -> Result<()>; + async fn turned_malicious(&self) -> Result<()>; } #[derive(Clone)] @@ -38,27 +40,29 @@ impl LocalStorage { } } - async fn store_async(&self, data: &[u8]) -> Result<(), Box> { + async fn store_async(&self, data: &[u8]) -> Result<()> { let checksum = calculate_checksum(data); let data_with_checksum = [data, checksum.as_slice()].concat(); - let mut file = File::create(&self.path).await?; - file.write_all(&data_with_checksum).await?; - file.flush().await?; + let mut file = File::create(&self.path).await.map_err(Error::Io)?; + file.write_all(&data_with_checksum) + .await + .map_err(Error::Io)?; + file.flush().await.map_err(Error::Io)?; Ok(()) } - async fn retrieve_async(&self) -> Result, Box> { - let mut file = File::open(&self.path).await?; + async fn retrieve_async(&self) -> Result> { + let mut file = File::open(&self.path).await.map_err(Error::Io)?; let mut buffer = Vec::new(); - file.read_to_end(&mut buffer).await?; + file.read_to_end(&mut buffer).await.map_err(Error::Io)?; if buffer.is_empty() { - return Err("File is empty".into()); + return Err(Error::Store(StorageError::EmptyFile)); } if buffer.len() < CHECKSUM_LEN { - return Err("File is potentially malicious".into()); + return Err(Error::Store(StorageError::CorruptFile)); } let data = &buffer[..buffer.len() - 64]; @@ -66,21 +70,20 @@ impl LocalStorage { let calculated_checksum = calculate_checksum(data); if stored_checksum != calculated_checksum { - return Err("Data integrity check failed!".into()); + return Err(Error::Store(StorageError::DataIntegrityError)); } Ok(data.to_vec()) } - async fn delete_async(&self) -> Result<(), Box> { - fs::remove_file(&self.path).await?; + async fn delete_async(&self) -> Result<()> { + fs::remove_file(&self.path).await.map_err(Error::Io)?; Ok(()) } - async fn compaction_async(&self) -> Result<(), Box> { + async fn compaction_async(&self) -> Result<()> { // If file size is greater than 1MB, then compact it - let metadata = fs::metadata(&self.path).await?; - println!("file size {}", metadata.len()); + let metadata = fs::metadata(&self.path).await.map_err(Error::Io)?; if metadata.len() > MAX_FILE_SIZE { self.delete_async().await?; } @@ -90,29 +93,29 @@ impl LocalStorage { #[async_trait] impl Storage for LocalStorage { - async fn store(&self, data: &[u8]) -> Result<(), Box> { + async fn store(&self, data: &[u8]) -> Result<()> { self.store_async(data).await } - async fn retrieve(&self) -> Result, Box> { + async fn retrieve(&self) -> Result> { self.retrieve_async().await } - async fn compaction(&self) -> Result<(), Box> { + async fn compaction(&self) -> Result<()> { self.compaction_async().await } - async fn delete(&self) -> Result<(), Box> { + async fn delete(&self) -> Result<()> { self.delete_async().await } - async fn turned_malicious(&self) -> Result<(), Box> { + async fn turned_malicious(&self) -> Result<()> { // Check if the file is tampered with self.retrieve().await?; - let metadata = fs::metadata(&self.path).await?; + let metadata = fs::metadata(&self.path).await.map_err(Error::Io)?; if metadata.len() > MAX_FILE_SIZE { - return Err("File is potentially malicious".into()); + return Err(Error::Store(CorruptFile)); } Ok(()) }