Skip to content

Commit c66e1e3

Browse files
authored
Fix: TypeError on calling parallel_bulk. (opensearch-project#601)
* Fix: TypeError on calling parallel_bulk. Signed-off-by: dblock <dblock@amazon.com> * Added a sample that uses a bulk function generator. Signed-off-by: dblock <dblock@amazon.com> --------- Signed-off-by: dblock <dblock@amazon.com>
1 parent 567ede3 commit c66e1e3

File tree

5 files changed

+172
-3
lines changed

5 files changed

+172
-3
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ Inspired from [Keep a Changelog](https://keepachangelog.com/en/1.0.0/)
77
### Deprecated
88
### Removed
99
### Fixed
10+
- Fix `TypeError` on `parallel_bulk` ([#601](https://github.com/opensearch-project/opensearch-py/pull/601))
1011
### Security
1112

1213
## [2.4.1]

guides/bulk.md

+58
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
- [Bulk Indexing](#bulk-indexing)
22
- [Line-Delimited JSON](#line-delimited-json)
33
- [Bulk Helper](#bulk-helper)
4+
- [Parallel Bulk](#parallel-bulk)
5+
- [Data Generator](#data-generator)
46

57
# Bulk Indexing
68

@@ -46,6 +48,8 @@ data = [
4648
response = client.bulk(data)
4749
if response["errors"]:
4850
print(f"There were errors!")
51+
for item in response["items"]:
52+
print(f"{item['index']['status']}: {item['index']['error']['type']}")
4953
else:
5054
print(f"Bulk-inserted {len(rc['items'])} items.")
5155
```
@@ -69,3 +73,57 @@ response = helpers.bulk(client, docs, max_retries=3)
6973
print(response)
7074
```
7175

76+
## Parallel Bulk
77+
78+
Bulk helpers support `parallel_bulk` which has options to turn off exceptions, chunk size, etc.
79+
80+
```python
81+
succeeded = []
82+
failed = []
83+
for success, item in helpers.parallel_bulk(client,
84+
actions=data,
85+
chunk_size=10,
86+
raise_on_error=False,
87+
raise_on_exception=False,
88+
max_chunk_bytes=20 * 1024 * 1024,
89+
request_timeout=60):
90+
91+
if success:
92+
succeeded.append(item)
93+
else:
94+
failed.append(item)
95+
96+
if len(failed) > 0:
97+
print(f"There were {len(failed)} errors:")
98+
for item in failed:
99+
print(f"{item['index']['error']}: {item['index']['exception']}")
100+
101+
if len(succeeded) > 0:
102+
print(f"Bulk-inserted {len(succeeded)} items.")
103+
```
104+
105+
## Data Generator
106+
107+
Use a data generator function with bulk helpers instead of building arrays.
108+
109+
```python
110+
def _generate_data():
111+
for i in range(100):
112+
yield {"_index": index_name, "_id": i, "value": i}
113+
114+
succeeded = []
115+
failed = []
116+
for success, item in helpers.parallel_bulk(client, actions=_generate_data()):
117+
if success:
118+
succeeded.append(item)
119+
else:
120+
failed.append(item)
121+
122+
if len(failed) > 0:
123+
print(f"There were {len(failed)} errors:")
124+
for item in failed:
125+
print(item["index"]["error"])
126+
127+
if len(succeeded) > 0:
128+
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")
129+
```

opensearchpy/helpers/actions.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -442,6 +442,8 @@ def parallel_bulk(
442442
max_chunk_bytes: int = 100 * 1024 * 1024,
443443
queue_size: int = 4,
444444
expand_action_callback: Any = expand_action,
445+
raise_on_exception: bool = True,
446+
raise_on_error: bool = True,
445447
ignore_status: Any = (),
446448
*args: Any,
447449
**kwargs: Any
@@ -485,7 +487,14 @@ def _setup_queues(self) -> None:
485487
for result in pool.imap(
486488
lambda bulk_chunk: list(
487489
_process_bulk_chunk(
488-
client, bulk_chunk[1], bulk_chunk[0], ignore_status, *args, **kwargs
490+
client,
491+
bulk_chunk[1],
492+
bulk_chunk[0],
493+
raise_on_exception,
494+
raise_on_error,
495+
ignore_status,
496+
*args,
497+
**kwargs
489498
)
490499
),
491500
_chunk_actions(

samples/bulk/bulk-helpers.py

+51-1
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313

1414
import os
15+
from typing import Any
1516

1617
from opensearchpy import OpenSearch, helpers
1718

@@ -49,8 +50,57 @@
4950
for i in range(100):
5051
data.append({"_index": index_name, "_id": i, "value": i})
5152

53+
# serialized bulk raising an exception on error
5254
rc = helpers.bulk(client, data)
53-
print(f"Bulk-inserted {rc[0]} items.")
55+
print(f"Bulk-inserted {rc[0]} items (bulk).")
56+
57+
# parallel bulk with explicit error checking
58+
succeeded = []
59+
failed = []
60+
for success, item in helpers.parallel_bulk(
61+
client,
62+
actions=data,
63+
chunk_size=10,
64+
raise_on_error=False,
65+
raise_on_exception=False,
66+
max_chunk_bytes=20 * 1024 * 1024,
67+
request_timeout=60,
68+
):
69+
if success:
70+
succeeded.append(item)
71+
else:
72+
failed.append(item)
73+
74+
if len(failed) > 0:
75+
print(f"There were {len(failed)} errors:")
76+
for item in failed:
77+
print(item["index"]["error"])
78+
79+
if len(succeeded) > 0:
80+
print(f"Bulk-inserted {len(succeeded)} items (parallel_bulk).")
81+
82+
83+
# streaming bulk with a data generator
84+
def _generate_data() -> Any:
85+
for i in range(100):
86+
yield {"_index": index_name, "_id": i, "value": i}
87+
88+
89+
succeeded = []
90+
failed = []
91+
for success, item in helpers.streaming_bulk(client, actions=_generate_data()):
92+
if success:
93+
succeeded.append(item)
94+
else:
95+
failed.append(item)
96+
97+
if len(failed) > 0:
98+
print(f"There were {len(failed)} errors:")
99+
for item in failed:
100+
print(item["index"]["error"])
101+
102+
if len(succeeded) > 0:
103+
print(f"Bulk-inserted {len(succeeded)} items (streaming_bulk).")
54104

55105
# delete index
56106
client.indices.delete(index=index_name)

test_opensearchpy/test_helpers/test_actions.py

+52-1
Original file line numberDiff line numberDiff line change
@@ -67,11 +67,62 @@ def test_all_chunks_sent(self, _process_bulk_chunk: Any) -> None:
6767

6868
self.assertEqual(50, mock_process_bulk_chunk.call_count) # type: ignore
6969

70+
@mock.patch("opensearchpy.OpenSearch.bulk")
71+
def test_with_all_options(self, _bulk: Any) -> None:
72+
actions = ({"x": i} for i in range(100))
73+
list(
74+
helpers.parallel_bulk(
75+
OpenSearch(),
76+
actions=actions,
77+
chunk_size=2,
78+
raise_on_error=False,
79+
raise_on_exception=False,
80+
max_chunk_bytes=20 * 1024 * 1024,
81+
request_timeout=160,
82+
ignore_status=(123),
83+
)
84+
)
85+
86+
self.assertEqual(50, _bulk.call_count)
87+
_bulk.assert_called_with(
88+
'{"index":{}}\n{"x":98}\n{"index":{}}\n{"x":99}\n', request_timeout=160
89+
)
90+
91+
@mock.patch("opensearchpy.helpers.actions._process_bulk_chunk")
92+
def test_process_bulk_chunk_with_all_options(
93+
self, _process_bulk_chunk: Any
94+
) -> None:
95+
actions = ({"x": i} for i in range(100))
96+
client = OpenSearch()
97+
list(
98+
helpers.parallel_bulk(
99+
client,
100+
actions=actions,
101+
chunk_size=2,
102+
raise_on_error=True,
103+
raise_on_exception=True,
104+
max_chunk_bytes=20 * 1024 * 1024,
105+
request_timeout=160,
106+
ignore_status=(123),
107+
)
108+
)
109+
110+
self.assertEqual(50, _process_bulk_chunk.call_count)
111+
_process_bulk_chunk.assert_called_with(
112+
client,
113+
['{"index":{}}', '{"x":98}', '{"index":{}}', '{"x":99}'],
114+
[({"index": {}}, {"x": 98}), ({"index": {}}, {"x": 99})],
115+
True,
116+
True,
117+
123,
118+
request_timeout=160,
119+
)
120+
70121
@pytest.mark.skip # type: ignore
71122
@mock.patch(
72123
"opensearchpy.helpers.actions._process_bulk_chunk",
73124
# make sure we spend some time in the thread
74-
side_effect=lambda *a: [
125+
side_effect=lambda *args, **kwargs: [
75126
(True, time.sleep(0.001) or threading.current_thread().ident) # type: ignore
76127
],
77128
)

0 commit comments

Comments
 (0)