Skip to content

Commit 2e5f0a2

Browse files
authored
Improve LocalStreamWriter along with error types (#106)
This PR disallows taking stream_writer as mutable if any inner mutex is held. File creation logic is improved and is now atomic, no two threads can call set_entry at the same time. File creation was not guarded well so there could have arrived a situation where two threads tried to create the file then in that case one of thread would have failed and errored. Changes - set_entry takes writer which ensures no other thread can call set_entry at the same time. File creation is atomic. - Added error type StreamWriterError
1 parent 196c93e commit 2e5f0a2

File tree

2 files changed

+69
-44
lines changed

2 files changed

+69
-44
lines changed

server/src/event.rs

+68-43
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ use std::fs::OpenOptions;
2929
use std::io::BufReader;
3030
use std::sync::Arc;
3131
use std::sync::Mutex;
32+
use std::sync::MutexGuard;
3233
use std::sync::RwLock;
3334

3435
use crate::metadata;
35-
use crate::metadata::STREAM_INFO;
3636
use crate::option::CONFIG;
3737
use crate::response;
3838
use crate::storage::ObjectStorage;
3939
use crate::Error;
4040

4141
type LocalWriter = Mutex<Option<StreamWriter<std::fs::File>>>;
42+
type LocalWriterGuard<'a> = MutexGuard<'a, Option<StreamWriter<std::fs::File>>>;
4243

4344
lazy_static! {
4445
#[derive(Default)]
@@ -47,102 +48,126 @@ lazy_static! {
4748

4849
impl STREAM_WRITERS {
4950
// append to a existing stream
50-
fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), ()> {
51-
let hashmap_guard = STREAM_WRITERS.read().unwrap();
51+
fn append_to_local(stream: &str, record: &RecordBatch) -> Result<(), StreamWriterError> {
52+
let hashmap_guard = STREAM_WRITERS
53+
.read()
54+
.map_err(|_| StreamWriterError::RwPoisioned)?;
55+
5256
match hashmap_guard.get(stream) {
5357
Some(localwriter) => {
54-
let mut writer_guard = localwriter.lock().unwrap();
58+
let mut writer_guard = localwriter
59+
.lock()
60+
.map_err(|_| StreamWriterError::MutexPoisioned)?;
61+
62+
// if it's some writer then we write without dropping any lock
63+
// hashmap cannot be brought mutably at any point until this finishes
5564
if let Some(ref mut writer) = *writer_guard {
56-
writer.write(record).map_err(|_| ())?;
65+
writer.write(record).map_err(StreamWriterError::Writer)?;
5766
} else {
58-
drop(writer_guard);
59-
drop(hashmap_guard);
60-
STREAM_WRITERS::set_entry(stream, record).unwrap();
67+
// pass on this mutex to set entry so that it can be reused
68+
// we have a guard for underlying entry thus
69+
// hashmap must not be availible as mutable to any other thread
70+
STREAM_WRITERS::set_entry(writer_guard, stream, record)?;
6171
}
6272
}
73+
// entry is not present thus we create it
6374
None => {
75+
// this requires mutable borrow of the map so we drop this read lock and wait for write lock
6476
drop(hashmap_guard);
65-
STREAM_WRITERS::create_entry(stream.to_string(), record).unwrap();
77+
STREAM_WRITERS::create_entry(stream.to_string(), record)?;
6678
}
6779
};
6880
Ok(())
6981
}
7082

7183
// create a new entry with new stream_writer
72-
// todo: error type
7384
// Only create entry for valid streams
74-
fn create_entry(stream: String, record: &RecordBatch) -> Result<(), ()> {
75-
let mut hashmap_guard = STREAM_WRITERS.write().unwrap();
76-
77-
if STREAM_INFO.schema(&stream).is_err() {
78-
return Err(());
79-
}
85+
fn create_entry(stream: String, record: &RecordBatch) -> Result<(), StreamWriterError> {
86+
let mut hashmap_guard = STREAM_WRITERS
87+
.write()
88+
.map_err(|_| StreamWriterError::RwPoisioned)?;
8089

8190
let file = OpenOptions::new()
8291
.append(true)
8392
.create_new(true)
8493
.open(data_file_path(&stream))
85-
.map_err(|_| ())?;
94+
.map_err(StreamWriterError::Io)?;
8695

87-
let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?;
88-
stream_writer.write(record).map_err(|_| ())?;
96+
let mut stream_writer = StreamWriter::try_new(file, &record.schema())
97+
.expect("File and RecordBatch both are checked");
98+
99+
stream_writer
100+
.write(record)
101+
.map_err(StreamWriterError::Writer)?;
89102

90103
hashmap_guard.insert(stream, Mutex::new(Some(stream_writer)));
91104

92105
Ok(())
93106
}
94107

95108
// Deleting a logstream requires that metadata is deleted first
96-
pub fn delete_entry(stream: &str) -> Result<(), ()> {
97-
let mut hashmap_guard = STREAM_WRITERS.write().unwrap();
98-
99-
if STREAM_INFO.schema(stream).is_ok() {
100-
return Err(());
101-
}
109+
pub fn delete_entry(stream: &str) -> Result<(), StreamWriterError> {
110+
let mut hashmap_guard = STREAM_WRITERS
111+
.write()
112+
.map_err(|_| StreamWriterError::RwPoisioned)?;
102113

103114
hashmap_guard.remove(stream);
104115

105116
Ok(())
106117
}
107118

108-
fn set_entry(stream: &str, record: &RecordBatch) -> Result<(), ()> {
119+
fn set_entry(
120+
mut writer_guard: LocalWriterGuard,
121+
stream: &str,
122+
record: &RecordBatch,
123+
) -> Result<(), StreamWriterError> {
109124
let file = OpenOptions::new()
110125
.append(true)
111126
.create_new(true)
112127
.open(data_file_path(stream))
113-
.map_err(|_| ())?;
128+
.map_err(StreamWriterError::Io)?;
114129

115-
let mut stream_writer = StreamWriter::try_new(file, &record.schema()).map_err(|_| ())?;
116-
stream_writer.write(record).map_err(|_| ())?;
130+
let mut stream_writer = StreamWriter::try_new(file, &record.schema())
131+
.expect("File and RecordBatch both are checked");
117132

118-
STREAM_WRITERS
119-
.read()
120-
.expect("Current Thread should not hold any lock")
121-
.get(stream)
122-
.expect("set entry is only called on valid entries")
123-
.lock()
124-
.expect("Poisioning is not handled yet")
125-
.replace(stream_writer); // replace the stream writer behind this mutex
133+
stream_writer
134+
.write(record)
135+
.map_err(StreamWriterError::Writer)?;
136+
137+
writer_guard.replace(stream_writer); // replace the stream writer behind this mutex
126138

127139
Ok(())
128140
}
129141

130142
// Unset the entry so that
131-
pub fn unset_entry(stream: &str) {
132-
let guard = STREAM_WRITERS.read().unwrap();
143+
pub fn unset_entry(stream: &str) -> Result<(), StreamWriterError> {
144+
let guard = STREAM_WRITERS
145+
.read()
146+
.map_err(|_| StreamWriterError::RwPoisioned)?;
133147
let stream_writer = match guard.get(stream) {
134148
Some(writer) => writer,
135-
None => return,
149+
None => return Ok(()),
136150
};
137151
stream_writer
138152
.lock()
139-
.expect("Poisioning is not handled yet")
153+
.map_err(|_| StreamWriterError::MutexPoisioned)?
140154
.take();
155+
156+
Ok(())
141157
}
142158
}
143159

144160
#[derive(Debug, thiserror::Error)]
145-
enum StreamWriterError {}
161+
pub enum StreamWriterError {
162+
#[error("Arrow writer failed: {0}")]
163+
Writer(arrow::error::ArrowError),
164+
#[error("Io Error when creating new file: {0}")]
165+
Io(std::io::Error),
166+
#[error("RwLock was poisioned")]
167+
RwPoisioned,
168+
#[error("Mutex was poisioned")]
169+
MutexPoisioned,
170+
}
146171

147172
fn data_file_path(stream_name: &str) -> String {
148173
format!(
@@ -253,7 +278,7 @@ impl Event {
253278
let rb = event.next()?.ok_or(Error::MissingRecord)?;
254279
let stream_name = &self.stream_name;
255280

256-
STREAM_WRITERS::append_to_local(stream_name, &rb).map_err(|_| Error::MissingRecord)?;
281+
STREAM_WRITERS::append_to_local(stream_name, &rb).unwrap();
257282

258283
Ok(0)
259284
}

server/src/storage.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl StorageDir {
199199
let record_tmp_file_path = self.temp_dir.join(filename.clone() + ".tmp");
200200
fs::rename(self.data_path.join("data.records"), &record_tmp_file_path)
201201
.map_err(|_| MoveDataError::Rename)?;
202-
event::STREAM_WRITERS::unset_entry(&self.stream_name);
202+
event::STREAM_WRITERS::unset_entry(&self.stream_name).unwrap();
203203
let file = File::open(&record_tmp_file_path).map_err(|_| MoveDataError::Open)?;
204204
let reader = StreamReader::try_new(file, None)?;
205205
let schema = reader.schema();

0 commit comments

Comments
 (0)