Skip to content

Commit

Permalink
fix: Fix deadlock in new streaming CSV / NDJSON sinks (#21598)
Browse files Browse the repository at this point in the history
  • Loading branch information
nameexhaustion authored Mar 5, 2025
1 parent 241d34b commit 7943dc1
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 4 deletions.
7 changes: 5 additions & 2 deletions crates/polars-stream/src/nodes/io_sinks/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,14 @@ impl SinkNode for CsvSinkNode {
writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());

// Must drop before linearizer insert or will deadlock.
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.

if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
Expand Down
7 changes: 5 additions & 2 deletions crates/polars-stream/src/nodes/io_sinks/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,14 @@ impl SinkNode for NDJsonSinkNode {
writer.write_batch(&df)?;

allocation_size = allocation_size.max(buffer.len());

// Must drop before linearizer insert or will deadlock.
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.

if lin_tx.insert(Priority(Reverse(seq), buffer)).await.is_err() {
return Ok(());
}
drop(consume_token); // Keep the consume_token until here to increase the
// backpressure.
}

PolarsResult::Ok(())
Expand Down

0 comments on commit 7943dc1

Please sign in to comment.