Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add error module #22

Merged
merged 7 commits into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ slog = "2.7.0"
slog-term = "2.9.1"
rand = "0.8"
chrono = "0.4"
thiserror = "1.0"

[dev-dependencies]
tempfile = "3.10.1"
Expand Down
56 changes: 56 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
// organization : SpacewalkHq
// License : MIT License

use std::net::SocketAddr;
use thiserror::Error;

/// wrapper around std library error
pub type Result<T> = std::result::Result<T, Error>;

#[derive(Error, Debug)]
pub enum Error {
// Storage layer specific error
#[error("Storage error {0}")]
Store(#[from] StorageError),
// Network layer specific error
#[error("Network error {0}")]
Network(#[from] NetworkError),
// To handle all std lib io error
#[error("File error {0}")]
Io(#[from] std::io::Error),
/// Some other error occurred.
#[error("unknown error {0}")]
Unknown(#[from] Box<dyn std::error::Error + Sync + Send>),
}

#[derive(Error, Debug)]
pub enum NetworkError {
#[error("Accepting incoming connection failed")]
AcceptError,
#[error("Connection is closed")]
ConnectionClosedError,
#[error("Connection to {0} failed")]
ConnectError(SocketAddr),
#[error("Failed binding to {0}")]
BindError(SocketAddr),
#[error("Broadcast failed")]
BroadcastError,
}

#[derive(Error, Debug)]
pub enum StorageError {
#[error("File is empty")]
EmptyFile,
#[error("File is corrupted")]
CorruptFile,
#[error("Data integrity check failed!")]
DataIntegrityError,
#[error("Storing log failed")]
StoreError,
#[error("Log compaction failed")]
CompactionError,
#[error("Log retrieval failed")]
RetrieveError,
#[error("Reading file metadata failed")]
MetaDataError,
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// organization : SpacewalkHq
// License : MIT License

pub mod error;
pub mod log;
pub mod network;
pub mod server;
Expand Down
83 changes: 35 additions & 48 deletions src/network.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,27 @@
// organization : SpacewalkHq
// License : MIT License

use crate::parse_ip_address;
use async_trait::async_trait;
use futures::future::join_all;
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;

use async_trait::async_trait;
use futures::future::join_all;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::Mutex;

use crate::error::NetworkError::ConnectionClosedError;
use crate::error::Result;
use crate::error::{Error, NetworkError};
use crate::parse_ip_address;

#[async_trait]
pub trait NetworkLayer: Send + Sync {
async fn send(
&self,
address: &str,
port: &str,
data: &[u8],
) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn receive(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>>;
async fn broadcast(
&self,
data: &[u8],
addresses: Vec<String>,
) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn open(&self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn close(self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn send(&self, address: &str, port: &str, data: &[u8]) -> Result<()>;
async fn receive(&self) -> Result<Vec<u8>>;
async fn broadcast(&self, data: &[u8], addresses: Vec<String>) -> Result<()>;
async fn open(&self) -> Result<()>;
async fn close(self) -> Result<()>;
}

#[derive(Debug, Clone)]
Expand All @@ -47,23 +42,20 @@ impl TCPManager {
}
}

async fn async_send(
data: &[u8],
address: SocketAddr,
) -> Result<(), Box<dyn Error + Send + Sync>> {
let mut stream = TcpStream::connect(address).await?;
stream.write_all(data).await?;
async fn async_send(data: &[u8], address: SocketAddr) -> Result<()> {
let mut stream = TcpStream::connect(address).await.map_err(Error::Io)?;
stream.write_all(data).await.map_err(Error::Io)?;
Ok(())
}

async fn handle_receive(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
async fn handle_receive(&self) -> Result<Vec<u8>> {
let mut data = Vec::new();
let listener = self.listener.lock().await;
if let Some(listener) = &*listener {
let (mut stream, _) = listener.accept().await?;
let (mut stream, _) = listener.accept().await.map_err(Error::Io)?;
let mut buffer = Vec::new();
let mut reader = tokio::io::BufReader::new(&mut stream);
reader.read_to_end(&mut buffer).await?;
reader.read_to_end(&mut buffer).await.map_err(Error::Io)?;
data = buffer;
}
Ok(data)
Expand All @@ -72,26 +64,17 @@ impl TCPManager {

#[async_trait]
impl NetworkLayer for TCPManager {
async fn send(
&self,
address: &str,
port: &str,
data: &[u8],
) -> Result<(), Box<dyn Error + Send + Sync>> {
let addr: SocketAddr = format!("{}:{}", address, port).parse()?;
async fn send(&self, address: &str, port: &str, data: &[u8]) -> Result<()> {
let addr: SocketAddr = format!("{}:{}", address, port).parse().unwrap();
Self::async_send(data, addr).await?;
Ok(())
}

async fn receive(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
async fn receive(&self) -> Result<Vec<u8>> {
self.handle_receive().await
}

async fn broadcast(
&self,
data: &[u8],
addresses: Vec<String>,
) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn broadcast(&self, data: &[u8], addresses: Vec<String>) -> Result<()> {
let futures = addresses.into_iter().map(|address| {
let (ip, port) = parse_ip_address(&address);
let addr: SocketAddr = format!("{}:{}", ip, port).parse().unwrap();
Expand All @@ -100,26 +83,29 @@ impl NetworkLayer for TCPManager {
join_all(futures)
.await
.into_iter()
.collect::<Result<_, _>>()?;
.collect::<std::result::Result<_, _>>()
.map_err(|_e| NetworkError::BroadcastError)?;
Ok(())
}

async fn open(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn open(&self) -> Result<()> {
let mut is_open = self.is_open.lock().await;
if *is_open {
return Err("Listener is already open".into());
return Err(Error::Unknown("Listener is already open".into()));
}
let addr: SocketAddr = format!("{}:{}", self.address, self.port).parse()?;
let listener = TcpListener::bind(addr).await?;
let addr: SocketAddr = format!("{}:{}", self.address, self.port).parse().unwrap();
let listener = TcpListener::bind(addr)
.await
.map_err(|_e| NetworkError::BindError(addr))?;
*self.listener.lock().await = Some(listener);
*is_open = true;
Ok(())
}

async fn close(self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn close(self) -> Result<()> {
let mut is_open = self.is_open.lock().await;
if !*is_open {
return Err("Listener is not open".into());
return Err(Error::Network(ConnectionClosedError));
}
*self.listener.lock().await = None;
*is_open = false;
Expand All @@ -129,9 +115,10 @@ impl NetworkLayer for TCPManager {

#[cfg(test)]
mod tests {
use crate::network::{NetworkLayer, TCPManager};
use tokio::task::JoinSet;

use crate::network::{NetworkLayer, TCPManager};

const LOCALHOST: &str = "127.0.0.1";

#[tokio::test]
Expand Down
59 changes: 31 additions & 28 deletions src/storage.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Organization: SpacewalkHq
// License: MIT License

use std::error::Error;
use std::path::{Path, PathBuf};

use async_trait::async_trait;
Expand All @@ -10,16 +9,19 @@ use sha2::{Digest, Sha256};
use tokio::fs::{self, File};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

use crate::error::StorageError::CorruptFile;
use crate::error::{Error, Result, StorageError};

const MAX_FILE_SIZE: u64 = 1_000_000;
const CHECKSUM_LEN: usize = 64;

#[async_trait]
pub trait Storage {
async fn store(&self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn retrieve(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>>;
async fn compaction(&self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn delete(&self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn turned_malicious(&self) -> Result<(), Box<dyn Error + Send + Sync>>;
async fn store(&self, data: &[u8]) -> Result<()>;
async fn retrieve(&self) -> Result<Vec<u8>>;
async fn compaction(&self) -> Result<()>;
async fn delete(&self) -> Result<()>;
async fn turned_malicious(&self) -> Result<()>;
}

#[derive(Clone)]
Expand All @@ -38,49 +40,50 @@ impl LocalStorage {
}
}

async fn store_async(&self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn store_async(&self, data: &[u8]) -> Result<()> {
let checksum = calculate_checksum(data);
let data_with_checksum = [data, checksum.as_slice()].concat();

let mut file = File::create(&self.path).await?;
file.write_all(&data_with_checksum).await?;
file.flush().await?;
let mut file = File::create(&self.path).await.map_err(Error::Io)?;
file.write_all(&data_with_checksum)
.await
.map_err(Error::Io)?;
file.flush().await.map_err(Error::Io)?;
Ok(())
}

async fn retrieve_async(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
let mut file = File::open(&self.path).await?;
async fn retrieve_async(&self) -> Result<Vec<u8>> {
let mut file = File::open(&self.path).await.map_err(Error::Io)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer).await?;
file.read_to_end(&mut buffer).await.map_err(Error::Io)?;

if buffer.is_empty() {
return Err("File is empty".into());
return Err(Error::Store(StorageError::EmptyFile));
}

if buffer.len() < CHECKSUM_LEN {
return Err("File is potentially malicious".into());
return Err(Error::Store(StorageError::CorruptFile));
}

let data = &buffer[..buffer.len() - 64];
let stored_checksum = retrieve_checksum(&buffer);
let calculated_checksum = calculate_checksum(data);

if stored_checksum != calculated_checksum {
return Err("Data integrity check failed!".into());
return Err(Error::Store(StorageError::DataIntegrityError));
}

Ok(data.to_vec())
}

async fn delete_async(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
fs::remove_file(&self.path).await?;
async fn delete_async(&self) -> Result<()> {
fs::remove_file(&self.path).await.map_err(Error::Io)?;
Ok(())
}

async fn compaction_async(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn compaction_async(&self) -> Result<()> {
// If file size is greater than 1MB, then compact it
let metadata = fs::metadata(&self.path).await?;
println!("file size {}", metadata.len());
let metadata = fs::metadata(&self.path).await.map_err(Error::Io)?;
if metadata.len() > MAX_FILE_SIZE {
self.delete_async().await?;
}
Expand All @@ -90,29 +93,29 @@ impl LocalStorage {

#[async_trait]
impl Storage for LocalStorage {
async fn store(&self, data: &[u8]) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn store(&self, data: &[u8]) -> Result<()> {
self.store_async(data).await
}

async fn retrieve(&self) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
async fn retrieve(&self) -> Result<Vec<u8>> {
self.retrieve_async().await
}

async fn compaction(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn compaction(&self) -> Result<()> {
self.compaction_async().await
}

async fn delete(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn delete(&self) -> Result<()> {
self.delete_async().await
}

async fn turned_malicious(&self) -> Result<(), Box<dyn Error + Send + Sync>> {
async fn turned_malicious(&self) -> Result<()> {
// Check if the file is tampered with
self.retrieve().await?;
let metadata = fs::metadata(&self.path).await?;
let metadata = fs::metadata(&self.path).await.map_err(Error::Io)?;

if metadata.len() > MAX_FILE_SIZE {
return Err("File is potentially malicious".into());
return Err(Error::Store(CorruptFile));
}
Ok(())
}
Expand Down
Loading