@@ -100,38 +100,32 @@ pub trait ObjectStorage: Sync + 'static {
100
100
continue ;
101
101
}
102
102
103
- match sync. move_local_to_temp ( ) {
104
- Ok ( parquet_size) => {
105
- if let Err ( e) = STREAM_INFO . update_stats ( & stream, 0 , parquet_size) {
106
- log:: error!( "Couldn't update stream stats. {:?}" , e) ;
107
- }
108
- }
109
- Err ( e) => {
110
- log:: error!(
111
- "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]" ,
112
- sync. dir. data_path. to_string_lossy( ) ,
113
- sync. dir. temp_dir. to_string_lossy( ) ,
114
- e
115
- ) ;
116
- continue ;
117
- }
103
+ if let Err ( e) = sync. move_local_to_temp ( ) {
104
+ log:: error!(
105
+ "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]" ,
106
+ sync. dir. data_path. to_string_lossy( ) ,
107
+ sync. dir. temp_dir. to_string_lossy( ) ,
108
+ e
109
+ ) ;
110
+ continue ;
118
111
}
119
112
}
120
113
121
114
Ok ( ( ) )
122
115
}
123
116
124
- async fn s3_sync ( & self ) -> Result < ( ) , ObjectStorageError > {
117
+ async fn s3_sync ( & self ) -> Result < ( ) , MoveDataError > {
125
118
if !Path :: new ( & CONFIG . parseable . local_disk_path ) . exists ( ) {
126
119
return Ok ( ( ) ) ;
127
120
}
128
121
129
122
let streams = STREAM_INFO . list_streams ( ) ;
130
123
131
124
for stream in streams {
125
+ // get dir
132
126
let dir = StorageDir :: new ( stream. clone ( ) ) ;
133
-
134
- for file in WalkDir :: new ( dir. temp_dir )
127
+ // walk dir, find all .tmp files and convert to parquet
128
+ for file in WalkDir :: new ( & dir. temp_dir )
135
129
. min_depth ( 1 )
136
130
. max_depth ( 1 )
137
131
. into_iter ( )
@@ -144,7 +138,55 @@ pub trait ObjectStorage: Sync + 'static {
144
138
None => false ,
145
139
} ;
146
140
147
- !is_tmp
141
+ is_tmp
142
+ } )
143
+ {
144
+ let record_tmp_file = file. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
145
+ let file = File :: open ( & file) . map_err ( |_| MoveDataError :: Open ) ?;
146
+ let reader = StreamReader :: try_new ( file, None ) ?;
147
+ let schema = reader. schema ( ) ;
148
+ let records = reader. filter_map ( |record| match record {
149
+ Ok ( record) => Some ( record) ,
150
+ Err ( e) => {
151
+ log:: warn!( "error when reading from arrow stream {:?}" , e) ;
152
+ None
153
+ }
154
+ } ) ;
155
+
156
+ let parquet_path = dir. temp_dir . join (
157
+ record_tmp_file
158
+ . strip_suffix ( ".tmp" )
159
+ . expect ( "file has a .tmp extention" ) ,
160
+ ) ;
161
+ let parquet_file =
162
+ fs:: File :: create ( & parquet_path) . map_err ( |_| MoveDataError :: Create ) ?;
163
+ let props = WriterProperties :: builder ( ) . build ( ) ;
164
+ let mut writer = ArrowWriter :: try_new ( parquet_file, schema, Some ( props) ) ?;
165
+
166
+ for ref record in records {
167
+ writer. write ( record) ?;
168
+ }
169
+
170
+ writer. close ( ) ?;
171
+
172
+ fs:: remove_file ( dir. temp_dir . join ( record_tmp_file) )
173
+ . map_err ( |_| MoveDataError :: Delete ) ?;
174
+ }
175
+
176
+ for file in WalkDir :: new ( dir. temp_dir )
177
+ . min_depth ( 1 )
178
+ . max_depth ( 1 )
179
+ . into_iter ( )
180
+ . filter_map ( |file| file. ok ( ) )
181
+ . map ( |file| file. path ( ) . to_path_buf ( ) )
182
+ . filter ( |file| file. is_file ( ) )
183
+ . filter ( |file| {
184
+ let is_parquet = match file. extension ( ) {
185
+ Some ( ext) => ext. eq_ignore_ascii_case ( "parquet" ) ,
186
+ None => false ,
187
+ } ;
188
+
189
+ is_parquet
148
190
} )
149
191
{
150
192
let filename = file. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
@@ -195,40 +237,11 @@ impl StorageDir {
195
237
fs:: create_dir_all ( & self . temp_dir )
196
238
}
197
239
198
- pub fn move_local_to_temp ( & self , filename : String ) -> Result < u64 , MoveDataError > {
199
- let record_tmp_file_path = self . temp_dir . join ( filename. clone ( ) + ".tmp" ) ;
200
- fs:: rename ( self . data_path . join ( "data.records" ) , & record_tmp_file_path)
201
- . map_err ( |_| MoveDataError :: Rename ) ?;
240
+ pub fn move_local_to_temp ( & self , filename : String ) -> io:: Result < ( ) > {
241
+ let record_tmp_file_path = self . temp_dir . join ( filename + ".tmp" ) ;
242
+ fs:: rename ( self . data_path . join ( "data.records" ) , & record_tmp_file_path) ?;
202
243
event:: STREAM_WRITERS :: unset_entry ( & self . stream_name ) . unwrap ( ) ;
203
- let file = File :: open ( & record_tmp_file_path) . map_err ( |_| MoveDataError :: Open ) ?;
204
- let reader = StreamReader :: try_new ( file, None ) ?;
205
- let schema = reader. schema ( ) ;
206
- let records = reader. filter_map ( |record| match record {
207
- Ok ( record) => Some ( record) ,
208
- Err ( e) => {
209
- log:: warn!( "error when reading from arrow stream {:?}" , e) ;
210
- None
211
- }
212
- } ) ;
213
-
214
- let parquet_path = self . temp_dir . join ( filename) ;
215
- let parquet_file = fs:: File :: create ( & parquet_path) . map_err ( |_| MoveDataError :: Create ) ?;
216
- let props = WriterProperties :: builder ( ) . build ( ) ;
217
- let mut writer = ArrowWriter :: try_new ( parquet_file, schema, Some ( props) ) ?;
218
-
219
- for ref record in records {
220
- writer. write ( record) ?;
221
- }
222
-
223
- writer. close ( ) ?;
224
-
225
- fs:: remove_file ( record_tmp_file_path) . map_err ( |_| MoveDataError :: Delete ) ?;
226
-
227
- let compressed_size = fs:: metadata ( parquet_path)
228
- . map_err ( |_| MoveDataError :: Metadata ) ?
229
- . len ( ) ;
230
-
231
- Ok ( compressed_size)
244
+ Ok ( ( ) )
232
245
}
233
246
234
247
pub fn local_data_exists ( & self ) -> bool {
@@ -238,20 +251,18 @@ impl StorageDir {
238
251
239
252
#[ derive( Debug , thiserror:: Error ) ]
240
253
pub enum MoveDataError {
241
- #[ error( "Failed to rename file" ) ]
242
- Rename ,
243
254
#[ error( "Unable to Open file after moving" ) ]
244
255
Open ,
245
256
#[ error( "Unable to create recordbatch stream" ) ]
246
257
Arrow ( #[ from] ArrowError ) ,
247
258
#[ error( "Could not generate parquet file" ) ]
248
259
Parquet ( #[ from] ParquetError ) ,
260
+ #[ error( "Object Storage Error {0}" ) ]
261
+ ObjectStorag ( #[ from] ObjectStorageError ) ,
249
262
#[ error( "Could not generate parquet file" ) ]
250
263
Create ,
251
264
#[ error( "Could not delete temp arrow file" ) ]
252
265
Delete ,
253
- #[ error( "Could not fetch metadata of moved parquet file" ) ]
254
- Metadata ,
255
266
}
256
267
257
268
struct StorageSync {
@@ -266,7 +277,7 @@ impl StorageSync {
266
277
Self { dir, time }
267
278
}
268
279
269
- fn move_local_to_temp ( & self ) -> Result < u64 , MoveDataError > {
280
+ fn move_local_to_temp ( & self ) -> io :: Result < ( ) > {
270
281
let time = self . time - Duration :: minutes ( OBJECT_STORE_DATA_GRANULARITY as i64 ) ;
271
282
let uri = utils:: date_to_prefix ( time. date ( ) )
272
283
+ & utils:: hour_to_prefix ( time. hour ( ) )
0 commit comments