Skip to content

Commit 6e7e5be

Browse files
committed
WIP Add per-column encryption keys
1 parent 73ffed0 commit 6e7e5be

File tree

4 files changed

+140
-21
lines changed

4 files changed

+140
-21
lines changed

parquet/src/column/writer/mod.rs

+97-1
Original file line numberDiff line numberDiff line change
@@ -1551,7 +1551,8 @@ mod tests {
15511551
use core::str;
15521552
use rand::distributions::uniform::SampleUniform;
15531553
use std::{fs::File, sync::Arc};
1554-
1554+
#[cfg(feature = "encryption")]
1555+
use crate::encryption::encrypt::EncryptionKey;
15551556
use super::*;
15561557

15571558
#[test]
@@ -3481,6 +3482,101 @@ mod tests {
34813482
assert_eq!(row_count, file_metadata.num_rows() as usize);
34823483
}
34833484

3485+
#[cfg(feature = "encryption")]
3486+
#[test]
3487+
fn test_write_encrypted_column_non_uniform() {
3488+
let message_type = "
3489+
message test_schema {
3490+
OPTIONAL BYTE_ARRAY a (UTF8);
3491+
}
3492+
";
3493+
let schema = Arc::new(parse_message_type(message_type).unwrap());
3494+
let data = vec![ByteArray::from(b"parquet".to_vec()); 7];
3495+
let def_levels = [1, 1, 1, 1, 0, 1, 0, 1, 0, 1];
3496+
3497+
let num_row_groups = 3;
3498+
let num_batches = 3;
3499+
let rows_per_batch = def_levels.len();
3500+
let valid_rows_per_batch = def_levels.iter().filter(|&level| *level > 0).count();
3501+
3502+
let file: File = tempfile::tempfile().unwrap();
3503+
3504+
let builder = WriterProperties::builder();
3505+
let footer_key: &[u8] = "0123456789012345".as_bytes();
3506+
let column_key = EncryptionKey::new(b"1234567890123450".to_vec());
3507+
let file_encryption_properties =
3508+
FileEncryptionProperties::builder(footer_key.to_vec())
3509+
.with_column_key(b"a".to_vec(), column_key.clone()).build();
3510+
3511+
let props = Arc::new(
3512+
builder
3513+
.with_file_encryption_properties(file_encryption_properties)
3514+
.set_data_page_row_count_limit(rows_per_batch)
3515+
.build(),
3516+
);
3517+
let mut writer = SerializedFileWriter::new(&file, schema, props).unwrap();
3518+
for _ in 0..num_row_groups {
3519+
let mut row_group_writer = writer.next_row_group().unwrap();
3520+
let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
3521+
3522+
for _ in 0..num_batches {
3523+
col_writer
3524+
.typed::<ByteArrayType>()
3525+
.write_batch(&data, Some(&def_levels), None)
3526+
.unwrap();
3527+
}
3528+
3529+
col_writer.close().unwrap();
3530+
row_group_writer.close().unwrap();
3531+
}
3532+
3533+
let _file_metadata = writer.close().unwrap();
3534+
3535+
let decryption_properties = FileDecryptionProperties::builder(footer_key.to_vec())
3536+
.with_column_key(b"a".to_vec(), column_key.key().clone())
3537+
.build()
3538+
.unwrap();
3539+
let options = ArrowReaderOptions::default()
3540+
.with_file_decryption_properties(decryption_properties.clone());
3541+
let metadata = ArrowReaderMetadata::load(&file, options.clone()).unwrap();
3542+
let file_metadata = metadata.metadata.file_metadata();
3543+
3544+
let builder = ParquetRecordBatchReaderBuilder::try_new_with_options(file, options).unwrap();
3545+
let record_reader = builder.build().unwrap();
3546+
3547+
assert_eq!(
3548+
file_metadata.num_rows(),
3549+
(num_row_groups * num_batches * rows_per_batch) as i64
3550+
);
3551+
assert_eq!(file_metadata.schema_descr().num_columns(), 1);
3552+
3553+
assert_eq!(metadata.metadata.num_row_groups(), num_row_groups);
3554+
metadata.metadata.row_groups().iter().for_each(|rg| {
3555+
assert_eq!(rg.num_columns(), 1);
3556+
assert_eq!(rg.num_rows(), (num_batches * rows_per_batch) as i64);
3557+
});
3558+
3559+
let mut row_count = 0;
3560+
for batch in record_reader {
3561+
let batch = batch.unwrap();
3562+
row_count += batch.num_rows();
3563+
3564+
let string_col = batch.column(0).as_string_opt::<i32>().unwrap();
3565+
3566+
let mut valid_count = 0;
3567+
for x in string_col.iter().flatten() {
3568+
valid_count += 1;
3569+
assert_eq!(x, "parquet");
3570+
}
3571+
assert_eq!(
3572+
valid_count,
3573+
valid_rows_per_batch * num_batches * num_row_groups
3574+
);
3575+
}
3576+
3577+
assert_eq!(row_count, file_metadata.num_rows() as usize);
3578+
}
3579+
34843580
#[test]
34853581
fn test_increment_max_binary_chars() {
34863582
let r = increment(vec![0xFF, 0xFE, 0xFD, 0xFF, 0xFF]);

parquet/src/encryption/encrypt.rs

+24-12
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,13 @@ impl EncryptionKey {
3737
}
3838
}
3939

40-
pub fn set_metadata(&mut self, metadata: Vec<u8>) {
40+
pub fn with_metadata(mut self, metadata: Vec<u8>) -> Self {
4141
self.key_metadata = Some(metadata);
42+
self
43+
}
44+
45+
pub fn key(&self) -> &Vec<u8> {
46+
&self.key
4247
}
4348
}
4449

@@ -98,7 +103,12 @@ impl EncryptionPropertiesBuilder {
98103
}
99104

100105
pub fn with_footer_key_metadata(mut self, metadata: Vec<u8>) -> Self {
101-
self.footer_key.set_metadata(metadata);
106+
self.footer_key = self.footer_key.with_metadata(metadata);
107+
self
108+
}
109+
110+
pub fn with_column_key(mut self, column_name: Vec<u8>, encryption_key: EncryptionKey) -> Self {
111+
self.column_keys.insert(column_name, encryption_key);
102112
self
103113
}
104114

@@ -155,17 +165,19 @@ impl FileEncryptor {
155165
pub fn aad_file_unique(&self) -> &Vec<u8> {
156166
&self.aad_file_unique
157167
}
158-
159-
// let footer_encryptor = RingGcmBlockEncryptor::new(&encryption_properties.footer_key.clone());
160-
// let mut column_encryptors: HashMap<Vec<u8>, Arc<dyn BlockEncryptor>> = HashMap::new();
161-
// if let Some(column_keys) = encryption_properties.column_keys.clone() {
162-
// for (column_name, key) in column_keys.iter() {
163-
// let column_encryptor = Arc::new(RingGcmBlockEncryptor::new(key));
164-
// column_encryptors.insert(column_name.clone(), column_encryptor);
165-
// }
166-
// }
168+
167169
pub(crate) fn get_footer_encryptor(&self) -> RingGcmBlockEncryptor {
168-
RingGcmBlockEncryptor::new(&self.properties.footer_key.key.clone())
170+
RingGcmBlockEncryptor::new(&self.properties.footer_key.key)
171+
}
172+
173+
pub(crate) fn get_column_encryptor(&self, column_path: &Vec<u8>) -> RingGcmBlockEncryptor {
174+
if self.properties.column_keys.is_empty() {
175+
return RingGcmBlockEncryptor::new(&self.properties.footer_key.key());
176+
}
177+
match self.properties.column_keys.get(column_path) {
178+
None => todo!("Handle unencrypted columns"),
179+
Some(column_key) => RingGcmBlockEncryptor::new(&column_key.key())
180+
}
169181
}
170182
}
171183

parquet/src/encryption/page_encryptor.rs

+3
Original file line numberDiff line numberDiff line change
@@ -30,19 +30,22 @@ pub struct PageEncryptor {
3030
row_group_index: usize,
3131
column_index: usize,
3232
page_index: usize,
33+
column_path: Vec<u8>,
3334
}
3435

3536
impl PageEncryptor {
3637
pub fn new(
3738
file_encryptor: Arc<FileEncryptor>,
3839
row_group_index: usize,
3940
column_index: usize,
41+
column_path: Vec<u8>,
4042
) -> Self {
4143
Self {
4244
file_encryptor,
4345
row_group_index,
4446
column_index,
4547
page_index: 0,
48+
column_path,
4649
}
4750
}
4851

parquet/src/file/writer.rs

+16-8
Original file line numberDiff line numberDiff line change
@@ -579,20 +579,28 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
579579
self.assert_previous_writer_closed()?;
580580

581581
#[cfg(feature = "encryption")]
582-
let page_encryptor = match self.file_encryptor.as_ref() {
583-
None => None,
584-
Some(file_encryptor) => Some(PageEncryptor::new(
585-
file_encryptor.clone(),
586-
self.row_group_index as usize,
587-
self.column_index,
588-
)),
589-
};
582+
let file_encryptor = self.file_encryptor.clone();
583+
#[cfg(feature = "encryption")]
584+
let row_group_index = self.row_group_index as usize;
585+
#[cfg(feature = "encryption")]
586+
let column_index = self.column_index;
590587

591588
Ok(match self.next_column_desc() {
592589
Some(column) => {
593590
let props = self.props.clone();
594591
let (buf, on_close) = self.get_on_close();
595592

593+
#[cfg(feature = "encryption")]
594+
let page_encryptor = match file_encryptor {
595+
None => None,
596+
Some(file_encryptor) => Some(PageEncryptor::new(
597+
file_encryptor,
598+
row_group_index,
599+
column_index,
600+
column.path().string().into_bytes(),
601+
)),
602+
};
603+
596604
#[cfg(feature = "encryption")]
597605
let page_writer =
598606
SerializedPageWriter::new(buf).with_page_encryptor(page_encryptor);

0 commit comments

Comments
 (0)