Skip to content

Commit

Permalink
Iceberg sync (#771)
Browse files Browse the repository at this point in the history
  • Loading branch information
SergeiPatiakin authored Jan 8, 2025
1 parent 565b168 commit c126a02
Show file tree
Hide file tree
Showing 7 changed files with 271 additions and 35 deletions.
26 changes: 8 additions & 18 deletions src/context/iceberg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use datafusion::error::Result;
use datafusion::execution::{RecordBatchStream, TaskContext};
use datafusion::physical_plan::ExecutionPlan;
use datafusion::physical_plan::ExecutionPlanProperties;
use datafusion_common::{DataFusionError, TableReference};
use datafusion_common::DataFusionError;
use futures::stream::select_all;
use futures::{pin_mut, StreamExt, TryStream, TryStreamExt};
use iceberg::io::FileIO;
Expand All @@ -22,7 +22,6 @@ use iceberg::spec::{
ManifestWriter, Operation, PartitionSpec, Snapshot, SnapshotReference,
SnapshotRetention, Struct, Summary, TableMetadata, TableMetadataBuilder,
};
use iceberg::table::Table;
use iceberg::writer::file_writer::location_generator::{
DefaultFileNameGenerator, DefaultLocationGenerator,
};
Expand All @@ -35,7 +34,7 @@ use tracing::{info, warn};
use url::Url;
use uuid::Uuid;

use super::{LakehouseTableProvider, SeafowlContext};
use super::SeafowlContext;

use thiserror::Error;

Expand Down Expand Up @@ -156,15 +155,13 @@ const DEFAULT_SCHEMA_ID: i32 = 0;
pub async fn record_batches_to_iceberg(
record_batch_stream: impl TryStream<Item = Result<RecordBatch, DataLoadingError>>,
arrow_schema: SchemaRef,
table: &Table,
file_io: &FileIO,
table_location: &str,
) -> Result<(), DataLoadingError> {
pin_mut!(record_batch_stream);

let table_location = table.metadata().location();
let table_base_url = Url::parse(table_location).unwrap();

let file_io = table.file_io();

let version_hint_location = format!("{}/metadata/version-hint.text", table_base_url);
let version_hint_input = file_io.new_input(&version_hint_location)?;
let old_version_hint: Option<u64> = if version_hint_input.exists().await? {
Expand Down Expand Up @@ -386,18 +383,10 @@ pub async fn record_batches_to_iceberg(
impl SeafowlContext {
pub async fn plan_to_iceberg_table(
&self,
name: impl Into<TableReference>,
file_io: &FileIO,
table_location: &str,
plan: &Arc<dyn ExecutionPlan>,
) -> Result<()> {
let provider = match self.get_lakehouse_table_provider(name).await? {
LakehouseTableProvider::Iceberg(p) => p,
_ => {
return Err(DataFusionError::Internal(
"Expected iceberg provider".to_string(),
));
}
};
let table = provider.table();
let schema = plan.schema();
let mut streams: Vec<Pin<Box<dyn RecordBatchStream + Send>>> = vec![];
for i in 0..plan.output_partitioning().partition_count() {
Expand All @@ -411,7 +400,8 @@ impl SeafowlContext {
DataLoadingError::BadInputError(format!("Datafusion error: {}", e))
}),
schema,
&table,
file_io,
table_location,
)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Expand Down
7 changes: 5 additions & 2 deletions src/context/physical.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,11 @@ impl SeafowlContext {
.await?;
Ok(make_dummy_exec())
}
LakehouseTableProvider::Iceberg(_) => {
self.plan_to_iceberg_table(table_name.clone(), &physical)
LakehouseTableProvider::Iceberg(provider) => {
let table = provider.table();
let table_location = table.metadata().location();
let file_io = table.file_io();
self.plan_to_iceberg_table(file_io, table_location, &physical)
.await?;
Ok(make_dummy_exec())
}
Expand Down
38 changes: 33 additions & 5 deletions src/frontend/flight/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use dashmap::DashMap;
use datafusion::common::Result;
use datafusion::execution::SendableRecordBatchStream;
use datafusion_common::DataFusionError;
use iceberg::io::FileIO;
use lazy_static::lazy_static;
use object_store_factory::object_store_opts_to_file_io_props;
use prost::Message;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -23,7 +25,7 @@ use url::Url;
use crate::context::SeafowlContext;
use crate::sync::schema::SyncSchema;
use crate::sync::writer::SeafowlDataSyncWriter;
use crate::sync::{LakehouseSyncTarget, SyncError, SyncResult};
use crate::sync::{IcebergSyncTarget, LakehouseSyncTarget, SyncError, SyncResult};

lazy_static! {
pub static ref SEAFOWL_SQL_DATA: SqlInfoData = {
Expand Down Expand Up @@ -163,7 +165,7 @@ impl SeafowlFlightHandler {
});
}

let (sync_target, url) = match cmd.format() {
let sync_target = match cmd.format() {
TableFormat::Delta => {
let log_store = match cmd.store {
None => self
Expand All @@ -186,14 +188,40 @@ impl SeafowlFlightHandler {
.await?
}
};
let url = log_store.root_uri();
(LakehouseSyncTarget::Delta(log_store), url)
LakehouseSyncTarget::Delta(log_store)
}
TableFormat::Iceberg => {
return Err(SyncError::NotImplemented);
let (location, file_io) = match cmd.store {
None => {
return Err(SyncError::NotImplemented);
}
Some(store_loc) => {
let location = store_loc.location;
let options = store_loc.options;
let file_io_props = object_store_opts_to_file_io_props(&options);
let file_io = FileIO::from_path(&location)
.unwrap()
.with_props(file_io_props)
.build()?;
(location, file_io)
}
};

// Create the full path to table metadata by combining the object store location and
// relative table metadata path
let absolute_path = format!(
"{}/{}",
location.trim_end_matches("/"),
cmd.path.trim_start_matches("/")
);
LakehouseSyncTarget::Iceberg(IcebergSyncTarget {
file_io,
url: absolute_path.clone(),
})
}
};

let url = sync_target.get_url();
let num_batches = batches.len();

debug!("Processing data change with {num_rows} rows, {num_batches} batches, descriptor {sync_schema}, url {url} from origin {:?} at position {:?}",
Expand Down
16 changes: 15 additions & 1 deletion src/sync/mod.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use crate::sync::writer::SeafowlDataSyncWriter;
use deltalake::logstore::LogStore;
use iceberg::io::FileIO;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -38,6 +39,9 @@ pub enum SyncError {
#[error(transparent)]
DeltaTableError(#[from] deltalake::errors::DeltaTableError),

#[error(transparent)]
IcebergError(#[from] iceberg::Error),

#[error(transparent)]
ObjectStoreError(#[from] object_store::Error),
}
Expand All @@ -59,7 +63,8 @@ pub(super) struct SyncCommitInfo {

#[derive(Clone, Debug)]
pub struct IcebergSyncTarget {
url: String,
pub file_io: FileIO,
pub url: String,
}

#[derive(Clone, Debug)]
Expand All @@ -68,6 +73,15 @@ pub enum LakehouseSyncTarget {
Iceberg(IcebergSyncTarget),
}

impl LakehouseSyncTarget {
pub fn get_url(&self) -> String {
match self {
LakehouseSyncTarget::Iceberg(IcebergSyncTarget { url, .. }) => url.clone(),
LakehouseSyncTarget::Delta(log_store) => log_store.root_uri(),
}
}
}

impl SyncCommitInfo {
pub(super) fn new(
origin: impl Into<Origin>,
Expand Down
53 changes: 51 additions & 2 deletions src/sync/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ use datafusion::execution::session_state::{SessionState, SessionStateBuilder};
use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_optimizer::pruning::PruningPredicate;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::{Column, DFSchemaRef, JoinType, ScalarValue, ToDFSchema};
use datafusion_common::{
Column, DFSchema, DFSchemaRef, JoinType, ScalarValue, ToDFSchema,
};
use datafusion_expr::execution_props::ExecutionProps;
use datafusion_expr::{
col, lit, when, Expr, LogicalPlan, LogicalPlanBuilder, Projection,
col, lit, when, EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder, Projection,
};
use datafusion_functions_nested::expr_fn::make_array;
use deltalake::delta_datafusion::DeltaTableProvider;
Expand All @@ -30,6 +32,8 @@ use std::sync::Arc;
use std::time::Instant;
use tracing::{debug, info, trace, warn};

use super::SyncError;

pub(super) const LOWER_REL: &str = "__lower_rel";
pub(super) const UPPER_REL: &str = "__upper_rel";
pub(super) const UPSERT_COL: &str = "__upsert_col";
Expand All @@ -48,6 +52,51 @@ impl SeafowlSyncPlanner {
}
}

pub(super) async fn plan_iceberg_syncs(
&self,
syncs: &[DataSyncItem],
table_schema: Arc<arrow_schema::Schema>,
) -> SyncResult<Arc<dyn ExecutionPlan>> {
for sync in syncs {
for sc in sync.sync_schema.columns() {
if sc.role() == ColumnRole::OldPk {
for batch in &sync.data {
let null_count = batch
.column_by_name(sc.field().name())
.expect("Old PK array must exist")
.null_count();
if batch.num_rows() != null_count {
return Err(SyncError::InvalidMessage {
reason: "Old PK for Iceberg contains non-null value"
.to_string(),
});
}
}
}
}
}

let df_table_schema = DFSchema::try_from(table_schema.clone())?;

let base_plan =
LogicalPlanBuilder::new(LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::new(df_table_schema),
}))
.alias(LOWER_REL)?
.build()?;

let base_df = DataFrame::new(self.session_state(), base_plan);

let (sync_schema, sync_df) = self.squash_syncs(syncs)?;
let (sync_schema, sync_df) = self.normalize_syncs(&sync_schema, sync_df)?;
let input_df = self
.apply_syncs(table_schema, base_df, sync_df, &sync_schema)
.await?;
let input_plan = input_df.create_physical_plan().await?;
Ok(input_plan)
}

// Construct a plan for flushing the pending syncs to the provided table.
// Return the plan and the files that are re-written by it (to be removed from the table state).
pub(super) async fn plan_delta_syncs(
Expand Down
Loading

0 comments on commit c126a02

Please sign in to comment.