Skip to content

Commit

Permalink
Update Streaming Avro Record Output (#433)
Browse files Browse the repository at this point in the history
PR with the progress made towards debugging the streaming
implementation.

Changes:
* Reverted the revert for Publish Time. The implementation was correct,
just missing one more step.
* Added the missing step for publish time. When reading the stream, a
flag is passed to consider adding the publish time.
  * In the execution phase, no publish time (false).
  * In the preparation phase, publish time (true).
  • Loading branch information
kevinjnguyen authored Jun 17, 2023
2 parents 1911554 + e81d5b7 commit cb631e4
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 40 deletions.
45 changes: 30 additions & 15 deletions crates/sparrow-runtime/src/metadata/raw_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,17 @@ impl RawMetadata {
}
source_data::Source::PulsarSubscription(ps) => {
let config = ps.config.as_ref().ok_or(Error::PulsarSubscription)?;
Ok(Self::try_from_pulsar(config).await?.sparrow_metadata)
// The `_publish_time` is metadata on the pulsar message, and required
// by the `prepare` step. However, that is not part of the user's schema.
// The prepare path calls `try_from_pulsar` directly, so for all other cases
// we explicitly set the schema to not include the `_publish_time` column.
//
// The "prepare from pulsar" step is an experimental feature, and will
// likely change in the future, so we're okay with this hack for now.
let should_include_publish_time = false;
Ok(Self::try_from_pulsar(config, should_include_publish_time)
.await?
.sparrow_metadata)
}
}
}
Expand Down Expand Up @@ -171,6 +181,7 @@ impl RawMetadata {
/// Create a `RawMetadata` from a Pulsar topic.
pub(crate) async fn try_from_pulsar(
config: &PulsarConfig,
include_publish_time: bool,
) -> error_stack::Result<PulsarMetadata, Error> {
// the user-defined schema in the topic
let pulsar_schema = streams::pulsar::schema::get_pulsar_schema(
Expand All @@ -183,21 +194,25 @@ impl RawMetadata {
.await
.change_context_lazy(|| Error::PulsarSchema("unable to get schema".to_owned()))?;

// inject _publish_time field so that we have a consistent column to sort on
// (this will always be our time_column in Pulsar sources)
let publish_time = Arc::new(Field::new(
"_publish_time",
TimestampMillisecondType::DATA_TYPE,
false,
));
let new_fields: Vec<_> = pulsar_schema
.fields
.iter()
.cloned()
.chain(std::iter::once(publish_time))
.collect();
tracing::debug!("pulsar schema fields: {:?}", new_fields);
let new_fields = if include_publish_time {
// inject _publish_time field so that we have a consistent column to sort on
// (this will always be our time_column in Pulsar sources)
let publish_time = Arc::new(Field::new(
"_publish_time",
TimestampMillisecondType::DATA_TYPE,
false,
));
pulsar_schema
.fields
.iter()
.cloned()
.chain(std::iter::once(publish_time))
.collect()
} else {
pulsar_schema.fields.clone()
};

tracing::debug!("pulsar schema fields: {:?}", new_fields);
Ok(PulsarMetadata {
user_schema: Arc::new(pulsar_schema),
sparrow_metadata: Self::from_raw_schema(Arc::new(Schema::new(new_fields)))?,
Expand Down
2 changes: 1 addition & 1 deletion crates/sparrow-runtime/src/prepare.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ async fn reader_from_pulsar<'a>(
) -> error_stack::Result<BoxStream<'a, error_stack::Result<(RecordBatch, RecordBatch), Error>>, Error>
{
let pulsar_config = pulsar_subscription.config.as_ref().ok_or(Error::Internal)?;
let pm = RawMetadata::try_from_pulsar(pulsar_config)
let pm = RawMetadata::try_from_pulsar(pulsar_config, true)
.await
.change_context(Error::CreatePulsarReader)?;

Expand Down
23 changes: 12 additions & 11 deletions crates/sparrow-runtime/src/read/stream_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,32 +65,33 @@ pub(crate) async fn stream_reader(
subscription_id: pulsar_subscription,
last_publish_time: 0,
};
let raw_metadata = RawMetadata::try_from_pulsar(pulsar_config)
let pulsar_metadata = RawMetadata::try_from_pulsar(pulsar_config, false)
.await
.change_context(Error::CreateStream)?;

// Verify the provided table schema matches the topic schema
verify_schema_match(
raw_metadata.user_schema.clone(),
pulsar_metadata.user_schema.clone(),
table_info.schema().clone(),
)?;

// The projected schema should come from the table_schema, which includes converted
// timestamp column, dropped decimal columns, etc.
// i.e. any changes we make to the raw schema to be able to process rows.
let projected_schema = if let Some(columns) = &projected_columns {
projected_schema(raw_metadata.sparrow_metadata.table_schema, columns)
projected_schema(pulsar_metadata.sparrow_metadata.table_schema, columns)
.change_context(Error::CreateStream)?
} else {
raw_metadata.sparrow_metadata.table_schema
pulsar_metadata.sparrow_metadata.table_schema
};

let consumer =
streams::pulsar::stream::consumer(&pulsar_subscription, raw_metadata.user_schema.clone())
.await
.change_context(Error::CreateStream)?;
let consumer = streams::pulsar::stream::consumer(
&pulsar_subscription,
pulsar_metadata.user_schema.clone(),
)
.await
.change_context(Error::CreateStream)?;
let stream = streams::pulsar::stream::execution_stream(
raw_metadata.sparrow_metadata.raw_schema.clone(),
pulsar_metadata.sparrow_metadata.raw_schema.clone(),
projected_schema.clone(),
consumer,
pulsar_subscription.last_publish_time,
Expand All @@ -106,7 +107,7 @@ pub(crate) async fn stream_reader(
let mut input_stream = prepare::execute_input_stream::prepare_input(
stream.boxed(),
table_config,
raw_metadata.user_schema.clone(),
pulsar_metadata.user_schema.clone(),
projected_schema,
0,
requested_slice,
Expand Down
39 changes: 26 additions & 13 deletions crates/sparrow-runtime/src/streams/pulsar/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ use std::time::Duration;
use tokio::time::timeout;

pub struct AvroWrapper {
value: Value,
user_record: Value,
projected_record: Value,
}

/// Creates a pulsar stream to be used during execution in a long-lived process.
Expand All @@ -37,7 +38,7 @@ pub fn execution_stream(
last_publish_time: i64,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
async_stream::try_stream! {
let mut reader = PulsarReader::new(raw_schema, projected_schema, consumer, last_publish_time, false);
let mut reader = PulsarReader::new(raw_schema, projected_schema, consumer, last_publish_time, false, false);
loop {
// Indefinitely reads messages from the stream
if let Some(next) = reader.next_result_async().await? {
Expand All @@ -59,7 +60,7 @@ pub fn preparation_stream(
last_publish_time: i64,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
async_stream::try_stream! {
let mut reader = PulsarReader::new(schema.clone(), schema, consumer, last_publish_time, true );
let mut reader = PulsarReader::new(schema.clone(), schema, consumer, last_publish_time, true , true);
while let Some(next) = reader.next_result_async().await? {
yield next
}
Expand All @@ -79,6 +80,7 @@ struct PulsarReader {
/// when publishing a message at the client. There is a chance that the broker
/// reorders messages internally.
require_ordered_publish_time: bool,
should_include_publish_time: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -136,18 +138,20 @@ impl DeserializeMessage for AvroWrapper {
})
.into_report()
.change_context(DeserializeError::Avro)?;
let mut fields = match value {
let user_fields = match value {
Value::Record(record) => record,
_ => error_stack::bail!(DeserializeError::UnsupportedType),
};
let mut projected_fields = user_fields.clone();
// the Payload data only contains the fields for the user-defined (raw) schema,
// so we inject the publish time from the metadata
fields.push((
projected_fields.push((
"_publish_time".to_string(),
Value::TimestampMillis(payload.metadata.publish_time as i64),
));
Ok(AvroWrapper {
value: Value::Record(fields),
user_record: Value::Record(user_fields),
projected_record: Value::Record(projected_fields),
})
}
}
Expand All @@ -159,13 +163,15 @@ impl PulsarReader {
consumer: Consumer<AvroWrapper, TokioExecutor>,
last_publish_time: i64,
require_ordered_publish_time: bool,
should_include_publish_time: bool,
) -> Self {
PulsarReader {
raw_schema,
projected_schema,
consumer,
last_publish_time,
require_ordered_publish_time,
should_include_publish_time,
}
}

Expand Down Expand Up @@ -210,15 +216,21 @@ impl PulsarReader {
return Err(ArrowError::from_external_error(Box::new(wrapped_error)));
}
};
match aw.value {
let aw_records = if self.should_include_publish_time {
aw.projected_record
} else {
aw.user_record
};

match aw_records {
Value::Record(fields) => {
avro_values.push(fields);
}
_ => {
let e = error_stack::report!(DeserializeError::UnsupportedType)
.attach_printable(format!(
"expected a record but got {:?}",
aw.value
aw_records
));
return Err(ArrowError::from_external_error(Box::new(
DeserializeErrorWrapper::from(e),
Expand Down Expand Up @@ -260,17 +272,18 @@ impl PulsarReader {
}
}
}

tracing::debug!("read {} messages", avro_values.len());
match avro_values.len() {
0 => Ok(None),
_ => {
let arrow_data = sparrow_arrow::avro::avro_to_arrow(avro_values)
.map_err(|e| ArrowError::from_external_error(Box::new(e)))?;
let arrow_data = sparrow_arrow::avro::avro_to_arrow(avro_values).map_err(|e| {
tracing::error!("avro_to_arrow error: {}", e);
ArrowError::from_external_error(Box::new(e))
})?;
let batch = RecordBatch::try_new(self.raw_schema.clone(), arrow_data)?;

// Note that the _last_publish_time is dropped here. This field is added for the purposes of
// prepare, where the `time` column is automatically set to the `_last_publish_time`.
// Note that the _publish_time is dropped here. This field is added for the purposes of
// prepare, where the `time` column is automatically set to the `_publish_time`.
let columns_to_read = get_columns_to_read(&self.raw_schema, &self.projected_schema);
let columns: Vec<_> = columns_to_read
.iter()
Expand Down

0 comments on commit cb631e4

Please sign in to comment.