diff --git a/CHANGELOG.md b/CHANGELOG.md index c9d9ae6f..1a46b00a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,5 @@ +## 11.22.12 + - Properly handle http code 413 (Payload Too Large) [#1199](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1199) ## 11.22.11 - Remove irrelevant log warning about elastic stack version [#1202](https://github.com/logstash-plugins/logstash-output-elasticsearch/pull/1202) diff --git a/docs/index.asciidoc b/docs/index.asciidoc index 514722a3..07f7efdc 100644 --- a/docs/index.asciidoc +++ b/docs/index.asciidoc @@ -196,7 +196,22 @@ This plugin uses the Elasticsearch bulk API to optimize its imports into Elastic either partial or total failures. The bulk API sends batches of requests to an HTTP endpoint. Error codes for the HTTP request are handled differently than error codes for individual documents. -HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely. + +HTTP requests to the bulk API are expected to return a 200 response code. All other response codes are retried indefinitely, +including 413 (Payload Too Large) responses. + +If you want to handle large payloads differently, you can configure 413 responses to go to the Dead Letter Queue instead: + +[source,ruby] +----- +output { + elasticsearch { + hosts => ["localhost:9200"] + dlq_custom_codes => [413] # Send 413 errors to DLQ instead of retrying + } +----- + +This will capture oversized payloads in the DLQ for analysis rather than retrying them. The following document errors are handled as follows: diff --git a/lib/logstash/outputs/elasticsearch/http_client.rb b/lib/logstash/outputs/elasticsearch/http_client.rb index e0b70e36..120d3e67 100644 --- a/lib/logstash/outputs/elasticsearch/http_client.rb +++ b/lib/logstash/outputs/elasticsearch/http_client.rb @@ -182,22 +182,20 @@ def join_bulk_responses(bulk_responses) def bulk_send(body_stream, batch_actions) params = compression_level? ? {:headers => {"Content-Encoding" => "gzip"}} : {} - response = @pool.post(@bulk_path, params, body_stream.string) - - @bulk_response_metrics.increment(response.code.to_s) - - case response.code - when 200 # OK - LogStash::Json.load(response.body) - when 413 # Payload Too Large + begin + response = @pool.post(@bulk_path, params, body_stream.string) + @bulk_response_metrics.increment(response.code.to_s) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + @bulk_response_metrics.increment(e.response_code.to_s) + raise e unless e.response_code == 413 + # special handling for 413, treat it as a document level issue logger.warn("Bulk request rejected: `413 Payload Too Large`", :action_count => batch_actions.size, :content_length => body_stream.size) - emulate_batch_error_response(batch_actions, response.code, 'payload_too_large') - else - url = ::LogStash::Util::SafeURI.new(response.final_url) - raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( - response.code, url, body_stream.to_s, response.body - ) + return emulate_batch_error_response(batch_actions, 413, 'payload_too_large') + rescue => e # it may be a network issue instead, re-raise + raise e end + + LogStash::Json.load(response.body) end def emulate_batch_error_response(actions, http_code, reason) @@ -411,6 +409,9 @@ def host_to_url(h) def exists?(path, use_get=false) response = use_get ? @pool.get(path) : @pool.head(path) response.code >= 200 && response.code <= 299 + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + return false if e.response_code == 404 + raise e end def template_exists?(template_endpoint, name) @@ -421,6 +422,8 @@ def template_put(template_endpoint, name, template) path = "#{template_endpoint}/#{name}" logger.info("Installing Elasticsearch template", name: name) @pool.put(path, nil, LogStash::Json.dump(template)) + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + raise e unless e.response_code == 404 end # ILM methods @@ -432,17 +435,15 @@ def rollover_alias_exists?(name) # Create a new rollover alias def rollover_alias_put(alias_name, alias_definition) - begin - @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) - logger.info("Created rollover alias", name: alias_name) - # If the rollover alias already exists, ignore the error that comes back from Elasticsearch - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - if e.response_code == 400 - logger.info("Rollover alias already exists, skipping", name: alias_name) - return - end - raise e + @pool.put(CGI::escape(alias_name), nil, LogStash::Json.dump(alias_definition)) + logger.info("Created rollover alias", name: alias_name) + # If the rollover alias already exists, ignore the error that comes back from Elasticsearch + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + if e.response_code == 400 + logger.info("Rollover alias already exists, skipping", name: alias_name) + return end + raise e end def get_xpack_info diff --git a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb index c9e49ec7..11f85b53 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/manticore_adapter.rb @@ -76,11 +76,8 @@ def perform_request(url, method, path, params={}, body=nil) raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::HostUnreachableError.new(e, request_uri_as_string) end - # 404s are excluded because they are valid codes in the case of - # template installation. We might need a better story around this later - # but for our current purposes this is correct code = resp.code - if code < 200 || code > 299 && code != 404 + if code < 200 || code > 299 # assume anything not 2xx is an error that the layer above needs to interpret raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new(code, request_uri, body, resp.body) end diff --git a/lib/logstash/outputs/elasticsearch/http_client/pool.rb b/lib/logstash/outputs/elasticsearch/http_client/pool.rb index 68715066..1ef9d0f9 100644 --- a/lib/logstash/outputs/elasticsearch/http_client/pool.rb +++ b/lib/logstash/outputs/elasticsearch/http_client/pool.rb @@ -253,13 +253,11 @@ def get_license(url) def health_check_request(url) logger.debug("Running health check to see if an Elasticsearch connection is working", :healthcheck_url => url.sanitized.to_s, :path => @healthcheck_path) - begin - response = perform_request_to_url(url, :head, @healthcheck_path) - return response, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) - return nil, e - end + response = perform_request_to_url(url, :head, @healthcheck_path) + return response, nil + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + logger.warn("Health check failed", code: e.response_code, url: e.url, message: e.message) + return nil, e end def healthcheck!(register_phase = true) @@ -312,13 +310,11 @@ def healthcheck!(register_phase = true) end def get_root_path(url, params={}) - begin - resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) - return resp, nil - rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e - logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) - return nil, e - end + resp = perform_request_to_url(url, :get, ROOT_URI_PATH, params) + return resp, nil + rescue ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError => e + logger.warn("Elasticsearch main endpoint returns #{e.response_code}", message: e.message, body: e.response_body) + return nil, e end def test_serverless_connection(url, root_response) diff --git a/logstash-output-elasticsearch.gemspec b/logstash-output-elasticsearch.gemspec index e8d5f858..1909085c 100644 --- a/logstash-output-elasticsearch.gemspec +++ b/logstash-output-elasticsearch.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-elasticsearch' - s.version = '11.22.11' + s.version = '11.22.12' s.licenses = ['apache-2.0'] s.summary = "Stores logs in Elasticsearch" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" diff --git a/spec/unit/outputs/elasticsearch_spec.rb b/spec/unit/outputs/elasticsearch_spec.rb index b71e8720..35c90804 100644 --- a/spec/unit/outputs/elasticsearch_spec.rb +++ b/spec/unit/outputs/elasticsearch_spec.rb @@ -915,7 +915,12 @@ allow(elasticsearch_output_instance.client.pool).to receive(:post) do |path, params, body| if body.length > max_bytes max_bytes *= 2 # ensure a successful retry - double("Response", :code => 413, :body => "") + raise ::LogStash::Outputs::ElasticSearch::HttpClient::Pool::BadResponseCodeError.new( + 413, + "test-url", + body, + "" + ) else double("Response", :code => 200, :body => '{"errors":false,"items":[{"index":{"status":200,"result":"created"}}]}') end