Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Item OnFailure Callbacks Not Executed When Bulk Request Fails #626

Closed
kellen-miller opened this issue Oct 1, 2024 · 5 comments · Fixed by #627
Closed

[BUG] Item OnFailure Callbacks Not Executed When Bulk Request Fails #626

kellen-miller opened this issue Oct 1, 2024 · 5 comments · Fixed by #627
Labels
bug Something isn't working

Comments

@kellen-miller
Copy link
Contributor

kellen-miller commented Oct 1, 2024

Is your feature request related to a problem?

I have been using the bulk indexing API and noticed that in the event a bulk request fails to reach OpenSearch, it is impossible to know which items were in that bulk request. Even if a failure callback is registered to an item, it is not executed. Only the bulk indexer's OnError function is executed, and the items are silently flushed from the worker's buffer.

What solution would you like?

This results in the sender not knowing if an item successfully made it to Opensearch or not, which can result in data loss. I would like that on a bulk request failure, if an item in the buffer has a failure callback registered to it, it is executed and passed the bulk indexer error. I can open a PR if needed.

Specifically this error check:
https://github.com/opensearch-project/opensearch-go/blob/main/opensearchutil/bulk_indexer.go#L505-L511

Would become:

func (w *worker) flush(ctx context.Context) error {
	// ...
	
	blk, err = w.bi.config.Client.Bulk(ctx, req)
	if err != nil {
		return w.handleBulkError(ctx, fmt.Errorf("flush: %w", err))
	}
	
	// ...
}
func (w *worker) handleBulkError(ctx context.Context, err error) error {
	atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))

	if w.bi.config.OnError != nil {
		w.bi.config.OnError(ctx, err)
	}

	for i := range w.items {
		if w.items[i].OnFailure != nil {
			w.items[i].OnFailure(ctx, w.items[i], opensearchapi.BulkRespItem{}, err)
		}
	}

	return err
}

What alternatives have you considered?

An external inbox pattern could be used, but that adds a lot of complexity when a simple code change could be used to mitigate this.

Do you have any additional context?

Implementing this feature would allow developers to handle item-level failures more effectively, especially in applications where data integrity is paramount. It ensures that each item's failure is acknowledged and can be retried or logged as needed.

@kellen-miller kellen-miller added enhancement New feature or request untriaged labels Oct 1, 2024
@kellen-miller
Copy link
Contributor Author

kellen-miller commented Oct 1, 2024

Also unrelated, but I noticed this comment while looking through the code to create this feature request.

https://github.com/opensearch-project/opensearch-go/blob/main/opensearchutil/bulk_indexer.go#L521-L527

Ranging over the map will set the op and info to the last key and value iterated through, not the first. Should probably be removed to not confuse others in the future as to what the behavior is.

@dblock
Copy link
Member

dblock commented Oct 2, 2024

This looks like a bug. Want to try fixing it? Or even a failing test would be very useful.

@dblock dblock added bug Something isn't working and removed enhancement New feature or request untriaged labels Oct 2, 2024
@kellen-miller
Copy link
Contributor Author

kellen-miller commented Oct 2, 2024

Yeah I can open a PR for a bugfix, already have it done on my local. Let me know what the process is.

@kellen-miller kellen-miller changed the title [FEATURE] Execute Item OnFailure Callbacks When Bulk Request Fails [BUG] Item OnFailure Callbacks Not Executed When Bulk Request Fails Oct 2, 2024
@dblock
Copy link
Member

dblock commented Oct 2, 2024

Yeah I can open a PR for a bugfix, already have it done on my local. Let me know what the process is.

Awesome, start here, but basically write a test, fix the bug, make a pull request, iterate until CI passes. I am here to help if you run into any issues.

@kellen-miller
Copy link
Contributor Author

Opened PR #627

dblock pushed a commit that referenced this issue Oct 3, 2024
Signed-off-by: Kellen Miller <kellen.miller@finxact.com>
Co-authored-by: Kellen Miller <kellen.miller@finxact.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants