Skip to content

Commit 5308808

Browse files
fix: detect datetime column on ingestion (#975)
This PR adds a logic to override known timestamp fields which were inferred as string fields earlier. This ensures we always identify timestamps correctly in case of dynamic streams also. Part of #965 --------- Co-authored-by: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com>
1 parent 5400db8 commit 5308808

File tree

3 files changed

+100
-12
lines changed

3 files changed

+100
-12
lines changed

server/src/event/format.rs

+94-8
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ pub trait EventFormat: Sized {
102102
if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) {
103103
return Err(anyhow!("Schema mismatch"));
104104
}
105-
new_schema = update_field_type_in_schema(new_schema, time_partition);
105+
new_schema = update_field_type_in_schema(new_schema, None, time_partition, None);
106106
let rb = Self::decode(data, new_schema.clone())?;
107107
let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows()));
108108
let metadata_arr =
@@ -147,19 +147,101 @@ pub trait EventFormat: Sized {
147147
}
148148
}
149149

150+
pub fn get_existing_fields(
151+
inferred_schema: Arc<Schema>,
152+
existing_schema: Option<&HashMap<String, Arc<Field>>>,
153+
) -> Vec<Arc<Field>> {
154+
let mut existing_fields = Vec::new();
155+
156+
for field in inferred_schema.fields.iter() {
157+
if existing_schema.map_or(false, |schema| schema.contains_key(field.name())) {
158+
existing_fields.push(field.clone());
159+
}
160+
}
161+
162+
existing_fields
163+
}
164+
165+
pub fn get_existing_timestamp_fields(
166+
existing_schema: &HashMap<String, Arc<Field>>,
167+
) -> Vec<Arc<Field>> {
168+
let mut timestamp_fields = Vec::new();
169+
170+
for field in existing_schema.values() {
171+
if let DataType::Timestamp(TimeUnit::Millisecond, None) = field.data_type() {
172+
timestamp_fields.push(field.clone());
173+
}
174+
}
175+
176+
timestamp_fields
177+
}
178+
179+
pub fn override_timestamp_fields(
180+
inferred_schema: Arc<Schema>,
181+
existing_timestamp_fields: &[Arc<Field>],
182+
) -> Arc<Schema> {
183+
let timestamp_field_names: Vec<&str> = existing_timestamp_fields
184+
.iter()
185+
.map(|field| field.name().as_str())
186+
.collect();
187+
188+
let updated_fields: Vec<Arc<Field>> = inferred_schema
189+
.fields()
190+
.iter()
191+
.map(|field| {
192+
if timestamp_field_names.contains(&field.name().as_str()) {
193+
Arc::new(Field::new(
194+
field.name(),
195+
DataType::Timestamp(TimeUnit::Millisecond, None),
196+
field.is_nullable(),
197+
))
198+
} else {
199+
field.clone()
200+
}
201+
})
202+
.collect();
203+
204+
Arc::new(Schema::new(updated_fields))
205+
}
206+
150207
pub fn update_field_type_in_schema(
151-
schema: Arc<Schema>,
208+
inferred_schema: Arc<Schema>,
209+
existing_schema: Option<&HashMap<String, Arc<Field>>>,
152210
time_partition: Option<String>,
211+
log_records: Option<&Vec<Value>>,
153212
) -> Arc<Schema> {
213+
let mut updated_schema = inferred_schema.clone();
214+
215+
if let Some(existing_schema) = existing_schema {
216+
let existing_fields = get_existing_fields(inferred_schema.clone(), Some(existing_schema));
217+
let existing_timestamp_fields = get_existing_timestamp_fields(existing_schema);
218+
// overriding known timestamp fields which were inferred as string fields
219+
updated_schema = override_timestamp_fields(updated_schema, &existing_timestamp_fields);
220+
let existing_field_names: Vec<String> = existing_fields
221+
.iter()
222+
.map(|field| field.name().clone())
223+
.collect();
224+
225+
if let Some(log_records) = log_records {
226+
for log_record in log_records {
227+
updated_schema = Arc::new(update_data_type_to_datetime(
228+
(*updated_schema).clone(),
229+
log_record.clone(),
230+
existing_field_names.clone(),
231+
));
232+
}
233+
}
234+
}
235+
154236
if time_partition.is_none() {
155-
return schema;
237+
return updated_schema;
156238
}
157-
let field_name = time_partition.unwrap();
158-
let new_schema: Vec<Field> = schema
239+
let time_partition_field_name = time_partition.unwrap();
240+
let new_schema: Vec<Field> = updated_schema
159241
.fields()
160242
.iter()
161243
.map(|field| {
162-
if *field.name() == field_name {
244+
if *field.name() == time_partition_field_name {
163245
if field.data_type() == &DataType::Utf8 {
164246
let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None);
165247
Field::new(field.name().clone(), new_data_type, true)
@@ -174,12 +256,16 @@ pub fn update_field_type_in_schema(
174256
Arc::new(Schema::new(new_schema))
175257
}
176258

177-
pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema {
259+
pub fn update_data_type_to_datetime(
260+
schema: Schema,
261+
value: Value,
262+
ignore_field_names: Vec<String>,
263+
) -> Schema {
178264
let new_schema: Vec<Field> = schema
179265
.fields()
180266
.iter()
181267
.map(|field| {
182-
if field.data_type() == &DataType::Utf8 {
268+
if field.data_type() == &DataType::Utf8 && !ignore_field_names.contains(field.name()) {
183269
if let Value::Object(map) = &value {
184270
if let Some(Value::String(s)) = map.get(field.name()) {
185271
if DateTime::parse_from_rfc3339(s).is_ok() {

server/src/event/format/json.rs

+2
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,9 @@ impl EventFormat for Event {
7171
Ok(mut infer_schema) => {
7272
let new_infer_schema = super::super::format::update_field_type_in_schema(
7373
Arc::new(infer_schema),
74+
Some(&stream_schema),
7475
time_partition,
76+
Some(&value_arr),
7577
);
7678
infer_schema = Schema::new(new_infer_schema.fields().clone());
7779
if let Err(err) = Schema::try_merge(vec![

server/src/handlers/http/logstream.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ pub async fn list(_: HttpRequest) -> impl Responder {
9393

9494
pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
9595
let body_val: Value = serde_json::from_slice(&body)?;
96-
let value_arr: Vec<Value> = match body_val {
96+
let log_records: Vec<Value> = match body_val {
9797
Value::Array(arr) => arr,
9898
value @ Value::Object(_) => vec![value],
9999
_ => {
@@ -104,9 +104,9 @@ pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
104104
}
105105
};
106106

107-
let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap();
108-
for value in value_arr {
109-
schema = update_data_type_to_datetime(schema, value);
107+
let mut schema = infer_json_schema_from_iterator(log_records.iter().map(Ok)).unwrap();
108+
for log_record in log_records {
109+
schema = update_data_type_to_datetime(schema, log_record, Vec::new());
110110
}
111111
Ok((web::Json(schema), StatusCode::OK))
112112
}

0 commit comments

Comments
 (0)