Skip to content

Commit

Permalink
Rewrite execution of microbatch models to avoid blocking the main thr…
Browse files Browse the repository at this point in the history
…ead (#11332)

* Push orchestration of batches previously in the `RunTask` into `MicrobatchModelRunner`

* Split `MicrobatchModelRunner` into two separate runners

`MicrobatchModelRunner` is now an orchestrator of `MicrobatchBatchRunner`s, the latter being what handle actual batch execution

* Introduce new `DbtThreadPool` that knows if it's been closed

* Enable `MicrobatchModelRunner` to shutdown gracefully when it detects the thread pool has been closed
  • Loading branch information
QMalcolm committed Mar 3, 2025
1 parent 5e5d3bd commit 78c1f1d
Show file tree
Hide file tree
Showing 9 changed files with 445 additions and 379 deletions.
6 changes: 6 additions & 0 deletions .changes/unreleased/Fixes-20250303-131440.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Fixes
body: Fix microbatch execution to not block main thread nor hang
time: 2025-03-03T13:14:40.432874-06:00
custom:
Author: QMalcolm
Issue: 11243 11306
18 changes: 18 additions & 0 deletions core/dbt/graph/thread_pool.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from __future__ import annotations

from multiprocessing.pool import ThreadPool


class DbtThreadPool(ThreadPool):
"""A ThreadPool that tracks whether or not it's been closed"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.closed = False

def close(self):
self.closed = True
super().close()

def is_closed(self):
return self.closed
9 changes: 5 additions & 4 deletions core/dbt/materializations/incremental/microbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ def build_batches(self, start: datetime, end: datetime) -> List[BatchType]:

return batches

def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, Any]:
@staticmethod
def build_jinja_context_for_batch(model: ModelNode, incremental_batch: bool) -> Dict[str, Any]:
"""
Create context with entries that reflect microbatch model + incremental execution state
Expand All @@ -109,9 +110,9 @@ def build_jinja_context_for_batch(self, incremental_batch: bool) -> Dict[str, An
jinja_context: Dict[str, Any] = {}

# Microbatch model properties
jinja_context["model"] = self.model.to_dict()
jinja_context["sql"] = self.model.compiled_code
jinja_context["compiled_code"] = self.model.compiled_code
jinja_context["model"] = model.to_dict()
jinja_context["sql"] = model.compiled_code
jinja_context["compiled_code"] = model.compiled_code

# Add incremental context variables for batches running incrementally
if incremental_batch:
Expand Down
12 changes: 7 additions & 5 deletions core/dbt/task/build.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,8 @@ def call_model_and_unit_tests_runner(self, node, pool) -> RunResult:
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
return self.handle_microbatch_model(runner, pool)
runner.set_parent_task(self)
runner.set_pool(pool)

Check warning on line 173 in core/dbt/task/build.py

View check run for this annotation

Codecov / codecov/patch

core/dbt/task/build.py#L172-L173

Added lines #L172 - L173 were not covered by tests

return self.call_runner(runner)

Expand All @@ -184,10 +185,11 @@ def handle_job_queue_node(self, node, pool, callback):
runner.do_skip(cause=cause)

if isinstance(runner, MicrobatchModelRunner):
callback(self.handle_microbatch_model(runner, pool))
else:
args = [runner]
self._submit(pool, args, callback)
runner.set_parent_task(self)
runner.set_pool(pool)

args = [runner]
self._submit(pool, args, callback)

# Make a map of model unique_ids to selected unit test unique_ids,
# for processing before the model.
Expand Down
Loading

0 comments on commit 78c1f1d

Please sign in to comment.