spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.18.0 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.19.0

- old
+ new

@@ -16,11 +16,12 @@ describe LogStash::Inputs::Elasticsearch, :ecs_compatibility_support do let(:plugin) { described_class.new(config) } let(:queue) { Queue.new } let(:build_flavor) { "default" } - let(:cluster_info) { {"version" => {"number" => "7.5.0", "build_flavor" => build_flavor}, "tagline" => "You Know, for Search"} } + let(:es_version) { "7.5.0" } + let(:cluster_info) { {"version" => {"number" => es_version, "build_flavor" => build_flavor}, "tagline" => "You Know, for Search"} } before(:each) do Elasticsearch::Client.send(:define_method, :ping) { } # define no-action ping method allow_any_instance_of(Elasticsearch::Client).to receive(:info).and_return(cluster_info) end @@ -100,10 +101,30 @@ end it "should raise an exception with negative number" do expect { plugin.register }.to raise_error(LogStash::ConfigurationError) end end + + context "search_api" do + before(:each) do + plugin.register + end + + context "ES 8" do + let(:es_version) { "8.10.0" } + it "resolves `auto` to `search_after`" do + expect(plugin.instance_variable_get(:@paginated_search)).to be_a LogStash::Inputs::Elasticsearch::SearchAfter + end + end + + context "ES 7" do + let(:es_version) { "7.17.0" } + it "resolves `auto` to `scroll`" do + expect(plugin.instance_variable_get(:@paginated_search)).to be_a LogStash::Inputs::Elasticsearch::Scroll + end + end + end end it_behaves_like "an interruptible input plugin" do let(:config) do { @@ -242,40 +263,43 @@ end end context 'with `slices => 1`' do let(:slices) { 1 } + before { plugin.register } + it 'runs just one slice' do - expect(plugin).to receive(:do_run_slice).with(duck_type(:<<)) + expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), nil) expect(Thread).to_not receive(:new) - plugin.register plugin.run([]) end end context 'without slices directive' do let(:config) { super().tap { |h| h.delete('slices') } } + before { plugin.register } + it 'runs just one slice' do - expect(plugin).to receive(:do_run_slice).with(duck_type(:<<)) + expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), nil) expect(Thread).to_not receive(:new) - plugin.register plugin.run([]) end end 2.upto(8) do |slice_count| context "with `slices => #{slice_count}`" do let(:slices) { slice_count } + before { plugin.register } + it "runs #{slice_count} independent slices" do expect(Thread).to receive(:new).and_call_original.exactly(slice_count).times slice_count.times do |slice_id| - expect(plugin).to receive(:do_run_slice).with(duck_type(:<<), slice_id) + expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), slice_id) end - plugin.register plugin.run([]) end end end @@ -397,12 +421,12 @@ # SLICE1 is a two-page scroll in which the last page has no next scroll id slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2})) expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0) expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1) - synchronize_method!(plugin, :scroll_request) - synchronize_method!(plugin, :search_request) + synchronize_method!(plugin.instance_variable_get(:@paginated_search), :next_page) + synchronize_method!(plugin.instance_variable_get(:@paginated_search), :initial_search) end let(:client) { Elasticsearch::Client.new } let(:emitted_events) do @@ -467,18 +491,18 @@ # SLICE1 is a two-page scroll in which the last page throws exception slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2})) expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0) expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_raise("boom") - synchronize_method!(plugin, :scroll_request) - synchronize_method!(plugin, :search_request) + synchronize_method!(plugin.instance_variable_get(:@paginated_search), :next_page) + synchronize_method!(plugin.instance_variable_get(:@paginated_search), :initial_search) end let(:client) { Elasticsearch::Client.new } it 'insert event to queue without waiting other slices' do - expect(plugin).to receive(:do_run_slice).twice.and_wrap_original do |m, *args| + expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).twice.and_wrap_original do |m, *args| q = args[0] slice_id = args[1] if slice_id == 0 m.call(*args) expect(q.size).to eq(3) @@ -994,11 +1018,11 @@ plugin.register end it "should properly schedule" do begin - expect(plugin).to receive(:do_run) { + expect(plugin.instance_variable_get(:@paginated_search)).to receive(:do_run) { queue << LogStash::Event.new({}) }.at_least(:twice) runner = Thread.start { plugin.run(queue) } expect(queue.pop).not_to be_nil cron_jobs = plugin.instance_variable_get(:@_scheduler).instance_variable_get(:@impl).jobs @@ -1011,80 +1035,110 @@ end end context "retries" do - let(:mock_response) do - { - "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g", - "took" => 27, - "timed_out" => false, - "_shards" => { - "total" => 169, - "successful" => 169, - "failed" => 0 - }, - "hits" => { - "total" => 1, - "max_score" => 1.0, - "hits" => [ { - "_index" => "logstash-2014.10.12", - "_type" => "logs", - "_id" => "C5b2xLQwTZa76jBmHIbwHQ", - "_score" => 1.0, - "_source" => { "message" => ["ohayo"] } - } ] - } - } - end - - let(:mock_scroll_response) do - { - "_scroll_id" => "r453Wc1jh0caLJhSDg", - "hits" => { "hits" => [] } - } - end - - before(:each) do - client = Elasticsearch::Client.new - allow(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) - allow(client).to receive(:search).with(any_args).and_return(mock_response) - allow(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(mock_scroll_response) - allow(client).to receive(:clear_scroll).and_return(nil) - allow(client).to receive(:ping) - end - + let(:client) { Elasticsearch::Client.new } let(:config) do { "hosts" => ["localhost"], "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }', "retries" => 1 } end - it "retry and log error when all search request fail" do - expect(plugin.logger).to receive(:error).with(/Tried .* unsuccessfully/, - hash_including(:message => 'Manticore::UnknownException')) - expect(plugin.logger).to receive(:warn).twice.with(/Attempt to .* but failed/, - hash_including(:exception => "Manticore::UnknownException")) - expect(plugin).to receive(:search_request).with(instance_of(Hash)).and_raise(Manticore::UnknownException).at_least(:twice) + shared_examples "a retryable plugin" do + it "retry and log error when all search request fail" do + expect_any_instance_of(LogStash::Helpers::LoggableTry).to receive(:log_failure).with(instance_of(Manticore::UnknownException), instance_of(Integer), instance_of(String)).twice + expect(client).to receive(:search).with(instance_of(Hash)).and_raise(Manticore::UnknownException).at_least(:twice) - plugin.register + plugin.register - expect{ plugin.run(queue) }.not_to raise_error - expect(queue.size).to eq(0) + expect{ plugin.run(queue) }.not_to raise_error + end + + it "retry successfully when search request fail for one time" do + expect_any_instance_of(LogStash::Helpers::LoggableTry).to receive(:log_failure).with(instance_of(Manticore::UnknownException), 1, instance_of(String)) + expect(client).to receive(:search).with(instance_of(Hash)).once.and_raise(Manticore::UnknownException) + expect(client).to receive(:search).with(instance_of(Hash)).once.and_return(search_response) + + plugin.register + + expect{ plugin.run(queue) }.not_to raise_error + end end - it "retry successfully when search request fail for one time" do - expect(plugin.logger).to receive(:warn).once.with(/Attempt to .* but failed/, - hash_including(:exception => "Manticore::UnknownException")) - expect(plugin).to receive(:search_request).with(instance_of(Hash)).once.and_raise(Manticore::UnknownException) - expect(plugin).to receive(:search_request).with(instance_of(Hash)).once.and_call_original + describe "scroll" do + let(:search_response) do + { + "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g", + "took" => 27, + "timed_out" => false, + "_shards" => { + "total" => 169, + "successful" => 169, + "failed" => 0 + }, + "hits" => { + "total" => 1, + "max_score" => 1.0, + "hits" => [ { + "_index" => "logstash-2014.10.12", + "_type" => "logs", + "_id" => "C5b2xLQwTZa76jBmHIbwHQ", + "_score" => 1.0, + "_source" => { "message" => ["ohayo"] } + } ] + } + } + end - plugin.register + let(:empty_scroll_response) do + { + "_scroll_id" => "r453Wc1jh0caLJhSDg", + "hits" => { "hits" => [] } + } + end - expect{ plugin.run(queue) }.not_to raise_error - expect(queue.size).to eq(1) + before(:each) do + allow(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + allow(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(empty_scroll_response) + allow(client).to receive(:clear_scroll).and_return(nil) + allow(client).to receive(:ping) + end + + it_behaves_like "a retryable plugin" + end + + describe "search_after" do + let(:es_version) { "8.10.0" } + let(:config) { super().merge({ "search_api" => "search_after" }) } + + let(:search_response) do + { + "took" => 27, + "timed_out" => false, + "_shards" => { + "total" => 169, + "successful" => 169, + "failed" => 0 + }, + "hits" => { + "total" => 1, + "max_score" => 1.0, + "hits" => [ ] # empty hits to break the loop + } + } + end + + before(:each) do + expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + expect(client).to receive(:open_point_in_time).once.and_return({ "id" => "cXVlcnlUaGVuRmV0Y2g"}) + expect(client).to receive(:close_point_in_time).once.and_return(nil) + expect(client).to receive(:ping) + end + + it_behaves_like "a retryable plugin" end end # @note can be removed once we depends on elasticsearch gem >= 6.x def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client