spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.14.0 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.15.0
- old
+ new
@@ -49,10 +49,21 @@
it "should raise ConfigurationError" do
expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end
+
+ context "retry" do
+ let(:config) do
+ {
+ "retries" => -1
+ }
+ end
+ it "should raise an exception with negative number" do
+ expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
+ end
+ end
end
it_behaves_like "an interruptible input plugin" do
let(:config) do
{
@@ -191,22 +202,22 @@
end
context 'with `slices => 1`' do
let(:slices) { 1 }
it 'runs just one slice' do
- expect(plugin).to receive(:do_run_slice).with(duck_type(:<<))
+ expect(plugin).to receive(:do_run_slice).with(duck_type(:<<), nil)
expect(Thread).to_not receive(:new)
plugin.register
plugin.run([])
end
end
context 'without slices directive' do
let(:config) { super().tap { |h| h.delete('slices') } }
it 'runs just one slice' do
- expect(plugin).to receive(:do_run_slice).with(duck_type(:<<))
+ expect(plugin).to receive(:do_run_slice).with(duck_type(:<<), nil)
expect(Thread).to_not receive(:new)
plugin.register
plugin.run([])
end
@@ -313,11 +324,10 @@
})
end
end
# END SLICE 1
- let(:client) { Elasticsearch::Client.new }
# RSpec mocks validations are not threadsafe.
# Allow caller to synchronize.
def synchronize_method!(object, method_name)
original_method = object.method(method_name)
@@ -327,74 +337,117 @@
original_method.call(*method_args,&method_block)
end
end
end
- before(:each) do
- expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
- plugin.register
+ describe "with normal response" do
+ before(:each) do
+ expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
+ plugin.register
- expect(client).to receive(:clear_scroll).and_return(nil)
+ expect(client).to receive(:clear_scroll).and_return(nil)
- # SLICE0 is a three-page scroll in which the last page is empty
- slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2}))
- expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0)
- expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll1 })).and_return(slice0_response1)
- expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll2 })).and_return(slice0_response2)
- allow(client).to receive(:ping)
+ # SLICE0 is a three-page scroll in which the last page is empty
+ slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2}))
+ expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0)
+ expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll1 })).and_return(slice0_response1)
+ expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll2 })).and_return(slice0_response2)
+ allow(client).to receive(:ping)
- # SLICE1 is a two-page scroll in which the last page has no next scroll id
- slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2}))
- expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0)
- expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1)
+ # SLICE1 is a two-page scroll in which the last page has no next scroll id
+ slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2}))
+ expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0)
+ expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1)
- synchronize_method!(plugin, :scroll_request)
- synchronize_method!(plugin, :search_request)
- end
+ synchronize_method!(plugin, :scroll_request)
+ synchronize_method!(plugin, :search_request)
+ end
- let(:emitted_events) do
- queue = Queue.new # since we are running slices in threads, we need a thread-safe queue.
- plugin.run(queue)
- events = []
- events << queue.pop until queue.empty?
- events
- end
+ let(:client) { Elasticsearch::Client.new }
- let(:emitted_event_ids) do
- emitted_events.map { |event| event.get('[@metadata][_id]') }
- end
+ let(:emitted_events) do
+ queue = Queue.new # since we are running slices in threads, we need a thread-safe queue.
+ plugin.run(queue)
+ events = []
+ events << queue.pop until queue.empty?
+ events
+ end
- it 'emits the hits on the first page of the first slice' do
- expect(emitted_event_ids).to include('slice0-response0-item0')
- expect(emitted_event_ids).to include('slice0-response0-item1')
- end
- it 'emits the hits on the second page of the first slice' do
- expect(emitted_event_ids).to include('slice0-response1-item0')
- end
+ let(:emitted_event_ids) do
+ emitted_events.map { |event| event.get('[@metadata][_id]') }
+ end
- it 'emits the hits on the first page of the second slice' do
- expect(emitted_event_ids).to include('slice1-response0-item0')
- expect(emitted_event_ids).to include('slice1-response0-item1')
- end
+ it 'emits the hits on the first page of the first slice' do
+ expect(emitted_event_ids).to include('slice0-response0-item0')
+ expect(emitted_event_ids).to include('slice0-response0-item1')
+ end
+ it 'emits the hits on the second page of the first slice' do
+ expect(emitted_event_ids).to include('slice0-response1-item0')
+ end
- it 'emits the hitson the second page of the second slice' do
- expect(emitted_event_ids).to include('slice1-response1-item0')
- expect(emitted_event_ids).to include('slice1-response1-item1')
- end
+ it 'emits the hits on the first page of the second slice' do
+ expect(emitted_event_ids).to include('slice1-response0-item0')
+ expect(emitted_event_ids).to include('slice1-response0-item1')
+ end
- it 'does not double-emit' do
- expect(emitted_event_ids.uniq).to eq(emitted_event_ids)
+ it 'emits the hits on the second page of the second slice' do
+ expect(emitted_event_ids).to include('slice1-response1-item0')
+ expect(emitted_event_ids).to include('slice1-response1-item1')
+ end
+
+ it 'does not double-emit' do
+ expect(emitted_event_ids.uniq).to eq(emitted_event_ids)
+ end
+
+ it 'emits events with appropriate fields' do
+ emitted_events.each do |event|
+ expect(event).to be_a(LogStash::Event)
+ expect(event.get('message')).to eq(['hello, world'])
+ expect(event.get('[@metadata][_id]')).to_not be_nil
+ expect(event.get('[@metadata][_id]')).to_not be_empty
+ expect(event.get('[@metadata][_index]')).to start_with('logstash-')
+ end
+ end
end
- it 'emits events with appropriate fields' do
- emitted_events.each do |event|
- expect(event).to be_a(LogStash::Event)
- expect(event.get('message')).to eq(['hello, world'])
- expect(event.get('[@metadata][_id]')).to_not be_nil
- expect(event.get('[@metadata][_id]')).to_not be_empty
- expect(event.get('[@metadata][_index]')).to start_with('logstash-')
+ describe "with scroll request fail" do
+ before(:each) do
+ expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
+ plugin.register
+
+ expect(client).to receive(:clear_scroll).and_return(nil)
+
+ # SLICE0 is a three-page scroll in which the second page throw exception
+ slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2}))
+ expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0)
+ expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll1 })).and_raise("boom")
+ allow(client).to receive(:ping)
+
+ # SLICE1 is a two-page scroll in which the last page has no next scroll id
+ slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2}))
+ expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0)
+ expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1)
+
+ synchronize_method!(plugin, :scroll_request)
+ synchronize_method!(plugin, :search_request)
end
+
+ let(:client) { Elasticsearch::Client.new }
+
+ it 'does not insert event to queue' do
+ expect(plugin).to receive(:parallel_slice).and_wrap_original do |m, *args|
+ slice0, slice1 = m.call
+ expect(slice0[0]).to be_falsey
+ expect(slice1[0]).to be_truthy
+ expect(slice1[1].size).to eq(4) # four items from SLICE1
+ [slice0, slice1]
+ end
+
+ queue = Queue.new
+ plugin.run(queue)
+ expect(queue.size).to eq(0)
+ end
end
end
end
context "with Elasticsearch document information" do
@@ -888,17 +941,94 @@
begin
expect(plugin).to receive(:do_run) {
queue << LogStash::Event.new({})
}.at_least(:twice)
runner = Thread.start { plugin.run(queue) }
- sleep 3.0
+ expect(queue.pop).not_to be_nil
+ cron_jobs = plugin.instance_variable_get(:@_scheduler).instance_variable_get(:@impl).jobs
+ expect(cron_jobs[0].next_time - cron_jobs[0].last_time).to be <= 5.0
+ expect(queue.pop).not_to be_nil
ensure
plugin.do_stop
runner.join if runner
end
- expect(queue.size).to be >= 2
end
+ end
+
+ context "retries" do
+ let(:mock_response) do
+ {
+ "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g",
+ "took" => 27,
+ "timed_out" => false,
+ "_shards" => {
+ "total" => 169,
+ "successful" => 169,
+ "failed" => 0
+ },
+ "hits" => {
+ "total" => 1,
+ "max_score" => 1.0,
+ "hits" => [ {
+ "_index" => "logstash-2014.10.12",
+ "_type" => "logs",
+ "_id" => "C5b2xLQwTZa76jBmHIbwHQ",
+ "_score" => 1.0,
+ "_source" => { "message" => ["ohayo"] }
+ } ]
+ }
+ }
+ end
+
+ let(:mock_scroll_response) do
+ {
+ "_scroll_id" => "r453Wc1jh0caLJhSDg",
+ "hits" => { "hits" => [] }
+ }
+ end
+
+ before(:each) do
+ client = Elasticsearch::Client.new
+ allow(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
+ allow(client).to receive(:search).with(any_args).and_return(mock_response)
+ allow(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(mock_scroll_response)
+ allow(client).to receive(:clear_scroll).and_return(nil)
+ allow(client).to receive(:ping)
+ end
+
+ let(:config) do
+ {
+ "hosts" => ["localhost"],
+ "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }',
+ "retries" => 1
+ }
+ end
+
+ it "retry and log error when all search request fail" do
+ expect(plugin.logger).to receive(:error).with(/Tried .* unsuccessfully/,
+ hash_including(:message => 'Manticore::UnknownException'))
+ expect(plugin.logger).to receive(:warn).twice.with(/Attempt to .* but failed/,
+ hash_including(:exception => "Manticore::UnknownException"))
+ expect(plugin).to receive(:search_request).with(instance_of(Hash)).and_raise(Manticore::UnknownException).at_least(:twice)
+
+ plugin.register
+
+ expect{ plugin.run(queue) }.not_to raise_error
+ expect(queue.size).to eq(0)
+ end
+
+ it "retry successfully when search request fail for one time" do
+ expect(plugin.logger).to receive(:warn).once.with(/Attempt to .* but failed/,
+ hash_including(:exception => "Manticore::UnknownException"))
+ expect(plugin).to receive(:search_request).with(instance_of(Hash)).once.and_raise(Manticore::UnknownException)
+ expect(plugin).to receive(:search_request).with(instance_of(Hash)).once.and_call_original
+
+ plugin.register
+
+ expect{ plugin.run(queue) }.not_to raise_error
+ expect(queue.size).to eq(1)
+ end
end
# @note can be removed once we depends on elasticsearch gem >= 6.x
def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client
client.transport.respond_to?(:transport) ? client.transport.transport : client.transport