Skip to content

Commit 92d72cf

Browse files
[Internal] Update Jobs list_runs function to support paginated responses (databricks#890)
## What changes are proposed in this pull request? Introduces logic in the extension for jobs ListRuns call. The extended logic accounts for the new response format of API 2.2. API 2.1 format returns all tasks and job_cluster for each runs in the runs list. API 2.2 format truncates tasks and job_cluster to 100 elements. The extended ListRuns logic calls GetRun for each run in the list to populate the full list of tasks and job_clusters. The code consumes generator that is returned from super().list_runs and it produces generator as well. ## How is this tested? Unit tests and manual tests. Manual tests were done in two modes: using API 2.2 and using API 2.1. So this code is code is compatible with both API versions. --------- Co-authored-by: hectorcast-db <hector.castejon@databricks.com>
1 parent e550ca1 commit 92d72cf

File tree

3 files changed

+282
-5
lines changed

3 files changed

+282
-5
lines changed

NEXT_CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
### Internal Changes
1212
* Update Jobs ListJobs API to support paginated responses ([#896](https://github.com/databricks/databricks-sdk-py/pull/896))
13+
* Update Jobs ListRuns API to support paginated responses ([#890](https://github.com/databricks/databricks-sdk-py/pull/890))
1314
* Introduce automated tagging ([#888](https://github.com/databricks/databricks-sdk-py/pull/888))
1415
* Update Jobs GetJob API to support paginated responses ([#869](https://github.com/databricks/databricks-sdk-py/pull/869)).
1516

databricks/sdk/mixins/jobs.py

+80-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from typing import Iterator, Optional
22

33
from databricks.sdk.service import jobs
4-
from databricks.sdk.service.jobs import BaseJob, Job
4+
from databricks.sdk.service.jobs import BaseJob, BaseRun, Job, RunType
55

66

77
class JobsExt(jobs.JobsAPI):
@@ -59,6 +59,85 @@ def list(self,
5959
delattr(job, 'has_more')
6060
yield job
6161

62+
def list_runs(self,
63+
*,
64+
active_only: Optional[bool] = None,
65+
completed_only: Optional[bool] = None,
66+
expand_tasks: Optional[bool] = None,
67+
job_id: Optional[int] = None,
68+
limit: Optional[int] = None,
69+
offset: Optional[int] = None,
70+
page_token: Optional[str] = None,
71+
run_type: Optional[RunType] = None,
72+
start_time_from: Optional[int] = None,
73+
start_time_to: Optional[int] = None) -> Iterator[BaseRun]:
74+
"""List job runs.
75+
76+
List runs in descending order by start time. If the job has multiple pages of tasks, job_clusters, parameters or repair history,
77+
it will paginate through all pages and aggregate the results.
78+
79+
:param active_only: bool (optional)
80+
If active_only is `true`, only active runs are included in the results; otherwise, lists both active
81+
and completed runs. An active run is a run in the `QUEUED`, `PENDING`, `RUNNING`, or `TERMINATING`.
82+
This field cannot be `true` when completed_only is `true`.
83+
:param completed_only: bool (optional)
84+
If completed_only is `true`, only completed runs are included in the results; otherwise, lists both
85+
active and completed runs. This field cannot be `true` when active_only is `true`.
86+
:param expand_tasks: bool (optional)
87+
Whether to include task and cluster details in the response. Note that in API 2.2, only the first
88+
100 elements will be shown. Use :method:jobs/getrun to paginate through all tasks and clusters.
89+
:param job_id: int (optional)
90+
The job for which to list runs. If omitted, the Jobs service lists runs from all jobs.
91+
:param limit: int (optional)
92+
The number of runs to return. This value must be greater than 0 and less than 25. The default value
93+
is 20. If a request specifies a limit of 0, the service instead uses the maximum limit.
94+
:param offset: int (optional)
95+
The offset of the first run to return, relative to the most recent run. Deprecated since June 2023.
96+
Use `page_token` to iterate through the pages instead.
97+
:param page_token: str (optional)
98+
Use `next_page_token` or `prev_page_token` returned from the previous request to list the next or
99+
previous page of runs respectively.
100+
:param run_type: :class:`RunType` (optional)
101+
The type of runs to return. For a description of run types, see :method:jobs/getRun.
102+
:param start_time_from: int (optional)
103+
Show runs that started _at or after_ this value. The value must be a UTC timestamp in milliseconds.
104+
Can be combined with _start_time_to_ to filter by a time range.
105+
:param start_time_to: int (optional)
106+
Show runs that started _at or before_ this value. The value must be a UTC timestamp in milliseconds.
107+
Can be combined with _start_time_from_ to filter by a time range.
108+
109+
:returns: Iterator over :class:`BaseRun`
110+
"""
111+
# fetch runs with limited elements in top level arrays
112+
runs_list = super().list_runs(active_only=active_only,
113+
completed_only=completed_only,
114+
expand_tasks=expand_tasks,
115+
job_id=job_id,
116+
limit=limit,
117+
offset=offset,
118+
page_token=page_token,
119+
run_type=run_type,
120+
start_time_from=start_time_from,
121+
start_time_to=start_time_to)
122+
123+
if not expand_tasks:
124+
yield from runs_list
125+
126+
# fully fetch all top level arrays for each run in the list
127+
for run in runs_list:
128+
if run.has_more:
129+
run_from_get_call = self.get_run(run.run_id)
130+
run.tasks = run_from_get_call.tasks
131+
run.job_clusters = run_from_get_call.job_clusters
132+
run.job_parameters = run_from_get_call.job_parameters
133+
run.repair_history = run_from_get_call.repair_history
134+
# Remove has_more fields for each run in the list.
135+
# This field in Jobs API 2.2 is useful for pagination. It indicates if there are more than 100 tasks or job_clusters in the run.
136+
# This function hides pagination details from the user. So the field does not play useful role here.
137+
if hasattr(run, 'has_more'):
138+
delattr(run, 'has_more')
139+
yield run
140+
62141
def get_run(self,
63142
run_id: int,
64143
*,

tests/test_jobs_mixin.py

+201-4
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,14 @@
55
from databricks.sdk import WorkspaceClient
66

77

8-
def make_getrun_path_pattern(run_id: int, page_token: str) -> Pattern[str]:
9-
return re.compile(
10-
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
11-
)
8+
def make_getrun_path_pattern(run_id: int, page_token: Optional[str] = None) -> Pattern[str]:
9+
if page_token:
10+
return re.compile(
11+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?page_token={page_token}&run_id={run_id}")}'
12+
)
13+
else:
14+
return re.compile(
15+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/get?run_id={run_id}")}')
1216

1317

1418
def make_getjob_path_pattern(job_id: int, page_token: Optional[str] = None) -> Pattern[str]:
@@ -27,6 +31,12 @@ def make_listjobs_path_pattern(page_token: str) -> Pattern[str]:
2731
)
2832

2933

34+
def make_listruns_path_pattern(page_token: str) -> Pattern[str]:
35+
return re.compile(
36+
rf'{re.escape("http://localhost/api/")}2.\d{re.escape(f"/jobs/runs/list")}\?(?:expand_tasks=(?:true|false)&)?page_token={re.escape(page_token)}'
37+
)
38+
39+
3040
def test_get_run_with_no_pagination(config, requests_mock):
3141
run1 = {"tasks": [{"run_id": 0}, {"run_id": 1}], }
3242
requests_mock.get(make_getrun_path_pattern(1337, "initialToken"), text=json.dumps(run1))
@@ -617,3 +627,190 @@ def test_list_jobs_with_many_tasks(config, requests_mock):
617627
# check that job_id 300 was never used in jobs/get call
618628
history = requests_mock.request_history
619629
assert all('300' not in request.qs.get("job_id", ['']) for request in history)
630+
631+
632+
def test_list_runs_without_task_expansion(config, requests_mock):
633+
listruns_page1 = {
634+
"runs": [{
635+
"run_id": 100,
636+
"run_name": "run100",
637+
}, {
638+
"run_id":
639+
200,
640+
"run_name":
641+
"run200",
642+
"job_parameters": [{
643+
"name": "param1",
644+
"default": "default1"
645+
}, {
646+
"name": "param2",
647+
"default": "default2"
648+
}]
649+
}, {
650+
"run_id": 300,
651+
"run_name": "run300",
652+
}],
653+
"next_page_token":
654+
"tokenToSecondPage"
655+
}
656+
listruns_page2 = {
657+
"runs": [{
658+
"run_id": 400,
659+
"run_name": "run400",
660+
"repair_history": [{
661+
"id": "repair400_1",
662+
}, {
663+
"id": "repair400_2",
664+
}]
665+
}]
666+
}
667+
668+
requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1))
669+
requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2))
670+
w = WorkspaceClient(config=config)
671+
672+
runs_list = list(w.jobs.list_runs(expand_tasks=False, page_token="initialToken"))
673+
runs_dict = [run.as_dict() for run in runs_list]
674+
675+
assert runs_dict == [{
676+
"run_id": 100,
677+
"run_name": "run100",
678+
}, {
679+
"run_id":
680+
200,
681+
"run_name":
682+
"run200",
683+
"job_parameters": [{
684+
"name": "param1",
685+
"default": "default1"
686+
}, {
687+
"name": "param2",
688+
"default": "default2"
689+
}]
690+
}, {
691+
"run_id": 300,
692+
"run_name": "run300",
693+
}, {
694+
"run_id": 400,
695+
"run_name": "run400",
696+
"repair_history": [{
697+
"id": "repair400_1",
698+
}, {
699+
"id": "repair400_2",
700+
}]
701+
}]
702+
703+
# only two requests should be made which are jobs/list requests
704+
assert requests_mock.call_count == 2
705+
706+
707+
def test_list_runs(config, requests_mock):
708+
listruns_page1 = {
709+
"runs": [{
710+
"run_id": 100,
711+
"tasks": [{
712+
"task_key": "taskkey101"
713+
}, {
714+
"task_key": "taskkey102"
715+
}],
716+
"has_more": True
717+
}, {
718+
"run_id": 200,
719+
"tasks": [{
720+
"task_key": "taskkey201"
721+
}]
722+
}, {
723+
"run_id": 300,
724+
"tasks": [{
725+
"task_key": "taskkey301"
726+
}]
727+
}],
728+
"next_page_token":
729+
"tokenToSecondPage"
730+
}
731+
listruns_page2 = {
732+
"runs": [{
733+
"run_id": 400,
734+
"tasks": [{
735+
"task_key": "taskkey401"
736+
}, {
737+
"task_key": "taskkey402"
738+
}],
739+
"has_more": True
740+
}]
741+
}
742+
743+
getrun_100_page1 = {
744+
"run_id": 100,
745+
"tasks": [{
746+
"task_key": "taskkey101"
747+
}, {
748+
"task_key": "taskkey102"
749+
}],
750+
"next_page_token": "tokenToSecondPage_100"
751+
}
752+
getrun_100_page2 = {"run_id": 100, "tasks": [{"task_key": "taskkey103"}]}
753+
getrun_400_page1 = {
754+
"run_id": 400,
755+
"tasks": [{
756+
"task_key": "taskkey401"
757+
}, {
758+
"task_key": "taskkey403"
759+
}],
760+
"next_page_token": "tokenToSecondPage_400"
761+
}
762+
getrun_400_page2 = {"run_id": 400, "tasks": [{"task_key": "taskkey402"}, {"task_key": "taskkey404"}]}
763+
764+
requests_mock.get(make_listruns_path_pattern("initialToken"), text=json.dumps(listruns_page1))
765+
requests_mock.get(make_listruns_path_pattern("tokenToSecondPage"), text=json.dumps(listruns_page2))
766+
767+
requests_mock.get(make_getrun_path_pattern(100), text=json.dumps(getrun_100_page1))
768+
requests_mock.get(make_getrun_path_pattern(100, "tokenToSecondPage_100"),
769+
text=json.dumps(getrun_100_page2))
770+
771+
requests_mock.get(make_getrun_path_pattern(400), text=json.dumps(getrun_400_page1))
772+
requests_mock.get(make_getrun_path_pattern(400, "tokenToSecondPage_400"),
773+
text=json.dumps(getrun_400_page2))
774+
w = WorkspaceClient(config=config)
775+
776+
runs_list = list(w.jobs.list_runs(expand_tasks=True, page_token="initialToken"))
777+
runs_dict = [run.as_dict() for run in runs_list]
778+
779+
assert runs_dict == [{
780+
"run_id":
781+
100,
782+
"tasks": [{
783+
"task_key": "taskkey101",
784+
}, {
785+
"task_key": "taskkey102",
786+
}, {
787+
"task_key": "taskkey103",
788+
}],
789+
}, {
790+
"run_id": 200,
791+
"tasks": [{
792+
"task_key": "taskkey201",
793+
}],
794+
}, {
795+
"run_id": 300,
796+
"tasks": [{
797+
"task_key": "taskkey301",
798+
}],
799+
}, {
800+
"run_id":
801+
400,
802+
"tasks": [{
803+
"task_key": "taskkey401",
804+
}, {
805+
"task_key": "taskkey403",
806+
}, {
807+
"task_key": "taskkey402",
808+
}, {
809+
"task_key": "taskkey404",
810+
}],
811+
}]
812+
813+
# check that job_id 200 and 300 was never used in runs/get call
814+
history = requests_mock.request_history
815+
assert all('300' not in request.qs.get("run_id", ['']) for request in history)
816+
assert all('200' not in request.qs.get("run_id", ['']) for request in history)

0 commit comments

Comments
 (0)