spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.14.0 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.15.0

- old
+ new

@@ -49,10 +49,21 @@ it "should raise ConfigurationError" do expect { plugin.register }.to raise_error(LogStash::ConfigurationError) end end + + context "retry" do + let(:config) do + { + "retries" => -1 + } + end + it "should raise an exception with negative number" do + expect { plugin.register }.to raise_error(LogStash::ConfigurationError) + end + end end it_behaves_like "an interruptible input plugin" do let(:config) do { @@ -191,22 +202,22 @@ end context 'with `slices => 1`' do let(:slices) { 1 } it 'runs just one slice' do - expect(plugin).to receive(:do_run_slice).with(duck_type(:<<)) + expect(plugin).to receive(:do_run_slice).with(duck_type(:<<), nil) expect(Thread).to_not receive(:new) plugin.register plugin.run([]) end end context 'without slices directive' do let(:config) { super().tap { |h| h.delete('slices') } } it 'runs just one slice' do - expect(plugin).to receive(:do_run_slice).with(duck_type(:<<)) + expect(plugin).to receive(:do_run_slice).with(duck_type(:<<), nil) expect(Thread).to_not receive(:new) plugin.register plugin.run([]) end @@ -313,11 +324,10 @@ }) end end # END SLICE 1 - let(:client) { Elasticsearch::Client.new } # RSpec mocks validations are not threadsafe. # Allow caller to synchronize. def synchronize_method!(object, method_name) original_method = object.method(method_name) @@ -327,74 +337,117 @@ original_method.call(*method_args,&method_block) end end end - before(:each) do - expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) - plugin.register + describe "with normal response" do + before(:each) do + expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + plugin.register - expect(client).to receive(:clear_scroll).and_return(nil) + expect(client).to receive(:clear_scroll).and_return(nil) - # SLICE0 is a three-page scroll in which the last page is empty - slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2})) - expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0) - expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll1 })).and_return(slice0_response1) - expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll2 })).and_return(slice0_response2) - allow(client).to receive(:ping) + # SLICE0 is a three-page scroll in which the last page is empty + slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2})) + expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0) + expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll1 })).and_return(slice0_response1) + expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll2 })).and_return(slice0_response2) + allow(client).to receive(:ping) - # SLICE1 is a two-page scroll in which the last page has no next scroll id - slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2})) - expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0) - expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1) + # SLICE1 is a two-page scroll in which the last page has no next scroll id + slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2})) + expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0) + expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1) - synchronize_method!(plugin, :scroll_request) - synchronize_method!(plugin, :search_request) - end + synchronize_method!(plugin, :scroll_request) + synchronize_method!(plugin, :search_request) + end - let(:emitted_events) do - queue = Queue.new # since we are running slices in threads, we need a thread-safe queue. - plugin.run(queue) - events = [] - events << queue.pop until queue.empty? - events - end + let(:client) { Elasticsearch::Client.new } - let(:emitted_event_ids) do - emitted_events.map { |event| event.get('[@metadata][_id]') } - end + let(:emitted_events) do + queue = Queue.new # since we are running slices in threads, we need a thread-safe queue. + plugin.run(queue) + events = [] + events << queue.pop until queue.empty? + events + end - it 'emits the hits on the first page of the first slice' do - expect(emitted_event_ids).to include('slice0-response0-item0') - expect(emitted_event_ids).to include('slice0-response0-item1') - end - it 'emits the hits on the second page of the first slice' do - expect(emitted_event_ids).to include('slice0-response1-item0') - end + let(:emitted_event_ids) do + emitted_events.map { |event| event.get('[@metadata][_id]') } + end - it 'emits the hits on the first page of the second slice' do - expect(emitted_event_ids).to include('slice1-response0-item0') - expect(emitted_event_ids).to include('slice1-response0-item1') - end + it 'emits the hits on the first page of the first slice' do + expect(emitted_event_ids).to include('slice0-response0-item0') + expect(emitted_event_ids).to include('slice0-response0-item1') + end + it 'emits the hits on the second page of the first slice' do + expect(emitted_event_ids).to include('slice0-response1-item0') + end - it 'emits the hitson the second page of the second slice' do - expect(emitted_event_ids).to include('slice1-response1-item0') - expect(emitted_event_ids).to include('slice1-response1-item1') - end + it 'emits the hits on the first page of the second slice' do + expect(emitted_event_ids).to include('slice1-response0-item0') + expect(emitted_event_ids).to include('slice1-response0-item1') + end - it 'does not double-emit' do - expect(emitted_event_ids.uniq).to eq(emitted_event_ids) + it 'emits the hits on the second page of the second slice' do + expect(emitted_event_ids).to include('slice1-response1-item0') + expect(emitted_event_ids).to include('slice1-response1-item1') + end + + it 'does not double-emit' do + expect(emitted_event_ids.uniq).to eq(emitted_event_ids) + end + + it 'emits events with appropriate fields' do + emitted_events.each do |event| + expect(event).to be_a(LogStash::Event) + expect(event.get('message')).to eq(['hello, world']) + expect(event.get('[@metadata][_id]')).to_not be_nil + expect(event.get('[@metadata][_id]')).to_not be_empty + expect(event.get('[@metadata][_index]')).to start_with('logstash-') + end + end end - it 'emits events with appropriate fields' do - emitted_events.each do |event| - expect(event).to be_a(LogStash::Event) - expect(event.get('message')).to eq(['hello, world']) - expect(event.get('[@metadata][_id]')).to_not be_nil - expect(event.get('[@metadata][_id]')).to_not be_empty - expect(event.get('[@metadata][_index]')).to start_with('logstash-') + describe "with scroll request fail" do + before(:each) do + expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + plugin.register + + expect(client).to receive(:clear_scroll).and_return(nil) + + # SLICE0 is a three-page scroll in which the second page throw exception + slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2})) + expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0) + expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll1 })).and_raise("boom") + allow(client).to receive(:ping) + + # SLICE1 is a two-page scroll in which the last page has no next scroll id + slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2})) + expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0) + expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1) + + synchronize_method!(plugin, :scroll_request) + synchronize_method!(plugin, :search_request) end + + let(:client) { Elasticsearch::Client.new } + + it 'does not insert event to queue' do + expect(plugin).to receive(:parallel_slice).and_wrap_original do |m, *args| + slice0, slice1 = m.call + expect(slice0[0]).to be_falsey + expect(slice1[0]).to be_truthy + expect(slice1[1].size).to eq(4) # four items from SLICE1 + [slice0, slice1] + end + + queue = Queue.new + plugin.run(queue) + expect(queue.size).to eq(0) + end end end end context "with Elasticsearch document information" do @@ -888,17 +941,94 @@ begin expect(plugin).to receive(:do_run) { queue << LogStash::Event.new({}) }.at_least(:twice) runner = Thread.start { plugin.run(queue) } - sleep 3.0 + expect(queue.pop).not_to be_nil + cron_jobs = plugin.instance_variable_get(:@_scheduler).instance_variable_get(:@impl).jobs + expect(cron_jobs[0].next_time - cron_jobs[0].last_time).to be <= 5.0 + expect(queue.pop).not_to be_nil ensure plugin.do_stop runner.join if runner end - expect(queue.size).to be >= 2 end + end + + context "retries" do + let(:mock_response) do + { + "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g", + "took" => 27, + "timed_out" => false, + "_shards" => { + "total" => 169, + "successful" => 169, + "failed" => 0 + }, + "hits" => { + "total" => 1, + "max_score" => 1.0, + "hits" => [ { + "_index" => "logstash-2014.10.12", + "_type" => "logs", + "_id" => "C5b2xLQwTZa76jBmHIbwHQ", + "_score" => 1.0, + "_source" => { "message" => ["ohayo"] } + } ] + } + } + end + + let(:mock_scroll_response) do + { + "_scroll_id" => "r453Wc1jh0caLJhSDg", + "hits" => { "hits" => [] } + } + end + + before(:each) do + client = Elasticsearch::Client.new + allow(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + allow(client).to receive(:search).with(any_args).and_return(mock_response) + allow(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(mock_scroll_response) + allow(client).to receive(:clear_scroll).and_return(nil) + allow(client).to receive(:ping) + end + + let(:config) do + { + "hosts" => ["localhost"], + "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }', + "retries" => 1 + } + end + + it "retry and log error when all search request fail" do + expect(plugin.logger).to receive(:error).with(/Tried .* unsuccessfully/, + hash_including(:message => 'Manticore::UnknownException')) + expect(plugin.logger).to receive(:warn).twice.with(/Attempt to .* but failed/, + hash_including(:exception => "Manticore::UnknownException")) + expect(plugin).to receive(:search_request).with(instance_of(Hash)).and_raise(Manticore::UnknownException).at_least(:twice) + + plugin.register + + expect{ plugin.run(queue) }.not_to raise_error + expect(queue.size).to eq(0) + end + + it "retry successfully when search request fail for one time" do + expect(plugin.logger).to receive(:warn).once.with(/Attempt to .* but failed/, + hash_including(:exception => "Manticore::UnknownException")) + expect(plugin).to receive(:search_request).with(instance_of(Hash)).once.and_raise(Manticore::UnknownException) + expect(plugin).to receive(:search_request).with(instance_of(Hash)).once.and_call_original + + plugin.register + + expect{ plugin.run(queue) }.not_to raise_error + expect(queue.size).to eq(1) + end end # @note can be removed once we depends on elasticsearch gem >= 6.x def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client client.transport.respond_to?(:transport) ? client.transport.transport : client.transport