Skip to content

Commit

Permalink
feat: concurrent fetching
Browse files Browse the repository at this point in the history
  • Loading branch information
pacman82 committed Oct 27, 2024
1 parent 7620b34 commit fffa508
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 23 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ features = ["snap", "brotli", "flate2", "lz4", "zstd", "base64"]
# features implying the use of UTF-16 for queries, connection strings and error messages. This
# should work on any system
[target.'cfg(target_os = "windows")'.dependencies]
odbc-api = { version = "8", features = ["odbc_version_3_5"] }
odbc-api = { version = "9", features = ["odbc_version_3_5"] }

# On linux we assume use of a UTF-8 locale. So we set the narrow features implying that for queries,
# connection strings and error messages the driver and driver manager supply utf8-strings. This
Expand All @@ -57,7 +57,7 @@ odbc-api = { version = "8", features = ["odbc_version_3_5"] }
# importantly, UTF-8 is likely to be more battled tested on these platforms, while UTF-16 is "only"
# required by the standard.
[target.'cfg(not(target_os = "windows"))'.dependencies]
odbc-api = { version = "8", features = ["narrow", "odbc_version_3_5"] }
odbc-api = { version = "9", features = ["narrow", "odbc_version_3_5"] }

[dev-dependencies]
assert_cmd = "2.0.16"
Expand Down
2 changes: 0 additions & 2 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,6 @@ pub struct QueryOpt {
/// up two a factor of two in case writing to parquet takes just as much time as fetching from
/// the database. Usually io to the database is the bottlneck so the actual speedup is likely
/// lower, but often still significant.
///
/// CURRENTLY THIS VALUE IS UNSTABLE AND IGNORED
#[arg(long)]
concurrent_fetching: bool,
/// Then the size of the currently written parquet files goes beyond this threshold the current
Expand Down
87 changes: 77 additions & 10 deletions src/query/fetch_batch.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
use anyhow::{bail, Error};
use std::mem::swap;

use anyhow::Error;
use log::info;
use odbc_api::{buffers::ColumnarAnyBuffer, BlockCursor, Cursor};
use odbc_api::{
buffers::ColumnarAnyBuffer, BlockCursor, ConcurrentBlockCursor, Cursor, RowSetBuffer,
};

use crate::parquet_buffer::ParquetBuffer;

use super::{
batch_size_limit::BatchSizeLimit,
conversion_strategy::ConversionStrategy,
};
use super::{batch_size_limit::BatchSizeLimit, conversion_strategy::ConversionStrategy};

pub trait FetchBatch {
/// Maximum batch size in rows. This is used to allocate the parquet buffer of correct size.
Expand All @@ -17,14 +18,18 @@ pub trait FetchBatch {
fn next_batch(&mut self) -> Result<Option<&ColumnarAnyBuffer>, odbc_api::Error>;
}

pub fn fetch_strategy<'a>(
pub fn fetch_strategy(
concurrent_fetching: bool,
cursor: impl Cursor + 'a,
cursor: impl Cursor + 'static + Send,
conversion_strategy: &ConversionStrategy,
batch_size_limit: BatchSizeLimit,
) -> Result<Box<dyn FetchBatch + 'a>, Error> {
) -> Result<Box<dyn FetchBatch>, Error> {
if concurrent_fetching {
bail!("Concurrent fetching not yet supported")
Ok(Box::new(ConcurrentFetch::new(
cursor,
conversion_strategy,
batch_size_limit,
)?))
} else {
Ok(Box::new(SequentialFetch::new(
cursor,
Expand Down Expand Up @@ -83,3 +88,65 @@ where
self.block_cursor.row_array_size()
}
}

/// Use a concurrent cursor and an extra buffers. One buffers content is read and written into
/// parquet, while the other is filled in an extra system thread.
struct ConcurrentFetch<C: Cursor> {
// This buffer is read from and its contents is written into parquet.
buffer: ColumnarAnyBuffer,
block_cursor: ConcurrentBlockCursor<C, ColumnarAnyBuffer>,
}

impl<C> ConcurrentFetch<C>
where
C: Cursor + Send + 'static,
{
pub fn new(
cursor: C,
table_strategy: &ConversionStrategy,
batch_size_limit: BatchSizeLimit,
) -> Result<Self, Error> {
let mem_usage_odbc_buffer_per_row: usize = table_strategy.fetch_buffer_size_per_row();
let total_mem_usage_per_row =
mem_usage_odbc_buffer_per_row + ParquetBuffer::MEMORY_USAGE_BYTES_PER_ROW;
info!(
"Memory usage per row is 2x {} bytes (buffer is alloctated two times, because of \
concurrent fetching). This excludes memory directly allocated by the ODBC driver.",
total_mem_usage_per_row,
);

let batch_size_row = batch_size_limit.batch_size_in_rows(total_mem_usage_per_row)?;

info!("Batch size set to {} rows.", batch_size_row);

let fetch_buffer = table_strategy.allocate_fetch_buffer(batch_size_row);
let buffer = table_strategy.allocate_fetch_buffer(batch_size_row);

let block_cursor = cursor.bind_buffer(fetch_buffer)?;
let block_cursor = ConcurrentBlockCursor::from_block_cursor(block_cursor);
Ok(Self {
buffer,
block_cursor,
})
}
}

impl<C> FetchBatch for ConcurrentFetch<C>
where
C: Cursor,
{
fn next_batch(&mut self) -> Result<Option<&ColumnarAnyBuffer>, odbc_api::Error> {
let batch = self.block_cursor.fetch()?;
if let Some(mut batch) = batch {
swap(&mut batch, &mut self.buffer);
self.block_cursor.fill(batch);
Ok(Some(&self.buffer))
} else {
Ok(None)
}
}

fn max_batch_size_in_rows(&self) -> usize {
self.buffer.row_array_size()
}
}
5 changes: 2 additions & 3 deletions tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,10 +312,9 @@ fn query_sales() {
}

#[test]
#[should_panic]
fn query_sales_concurrently() {
// Setup table for test
let table_name = "QuerySales";
let table_name = "QuerySalesConcurrently";
let mut table = TableMssql::new(table_name, &["DATE", "TIME(7)", "INT", "DECIMAL(10,2)"]);
table.insert_rows_as_text(&[
["2020-09-09", "00:05:34", "54", "9.99"],
Expand Down Expand Up @@ -345,8 +344,8 @@ fn query_sales_concurrently() {
"query",
out_str,
"--connection-string",
"--concurrent_fetching",
MSSQL,
"--concurrent-fetching",
&query,
])
.assert()
Expand Down

0 comments on commit fffa508

Please sign in to comment.