Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature Request] Ingestion management APIs for pull-based ingestion #17442

Open
varunbharadwaj opened this issue Feb 24, 2025 · 0 comments
Open
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing untriaged

Comments

@varunbharadwaj
Copy link
Contributor

varunbharadwaj commented Feb 24, 2025

Is your feature request related to a problem? Please describe

Create ingestion management APIs for pull-based ingestion for the following.

  1. Pause and resume ingestion.
  2. Update offset.
  3. Get ingestion state.
  4. Update error strategy (update_settings API will be used)

As a separate effort, we also plan to support custom consumer configurations and add an API to update consumer config.

Describe the solution you'd like

Following APIs will be created.

Pause Ingestion

POST /<index>/ingestion/_pause

Path parameters:
index: the index for which ingestion needs to be paused.

Description:
This API can be used to pause ingestion for a given index. Optionally, a list of shards can be provided in the request body. If shards are provided, ingestion will be paused only on the provided shards.

Request:
Request body is optional

{
  "shards": [1,2,3,4] # list of shardIDs
}

Response:

{
  "shards" : {
    "total": 4,
    "successful": 4,
    "failed": 0
  }
}

Resume Ingestion

POST /<index>/ingestion/_resume

Path parameters:
index: the index for which ingestion needs to be resumed.

Description:
This API can be used to resume ingestion for a given index. Optionally, a list of shards and reset pointers can be provided in the request body.

  • If shards are provided, only provided shards will be resumed.
  • If reset settings are provided, consumers will be reset before resuming.
  • If both shards and reset settings are provided, consumers for provided shards will be reset and resumed.
  • If no shards/reset settings are provided, all shards will be resumed without resetting consumers.

Request:

{
  "shards": [1,2,3,4], # list of shardIDs
  "reset_settings": {
    "mode": "reset mode, example, rewind_by_offset/rewind_by_timestamp",
    "value": "1"
  }
}

Response:

{
  "shards" : {
    "total": 4,
    "successful": 4,
    "failed": 0
  }
}

Get Ingestion State

GET /<index>/ingestion/_state

Path parameters:
index: the index for which ingestion state needs to be returned.

Description:
This API returns the current state of ingestion for the provided index. Optionally, a list of shards can be provided.

Request:
Request body is optional

{
  "shards": [1,2,3,4] # list of shardIDs
}

Response:

{
  "index": "indexName",
  "ingestion_state" [
    {
      "shard": 1,
      "poller_state": "paused/polling",
      "pointer": "current ingestion shard (batch) pointer",
      "error_strategy": "block/drop"
    },
    {
      "shard": 2,
      "poller_state": "paused/polling",
      "pointer": "\",
      "error_strategy": "block/drop"
    }

  ]
}

Update Error Strategy

update_settings API will be used to update the error strategy. IndexSettingsHandler for “index.ingestion_source.error_strategy” will be registered in the IngestionEngine, which will update the error strategy in the poller and writer threads.

Related component

Indexing

Describe alternatives you've considered

No response

Additional context

No response

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement Enhancement or improvement to existing feature or request Indexing Indexing, Bulk Indexing and anything related to indexing untriaged
Projects
None yet
Development

No branches or pull requests

1 participant