spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.2.1 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.3.0

- old
+ new

@@ -82,9 +82,242 @@ insist { event }.is_a?(LogStash::Event) insist { event.get("message") } == [ "ohayo" ] end + + # This spec is an adapter-spec, ensuring that we send the right sequence of messages to our Elasticsearch Client + # to support sliced scrolling. The underlying implementation will spawn its own threads to consume, so we must be + # careful to use thread-safe constructs. + context "with managed sliced scrolling" do + let(:config) do + { + 'query' => "#{LogStash::Json.dump(query)}", + 'slices' => slices, + 'docinfo' => true, # include ids + } + end + let(:query) do + { + "query" => { + "match" => { "city_name" => "Okinawa" } + }, + "fields" => ["message"] + } + end + let(:slices) { 2 } + + context 'with `slices => 0`' do + let(:slices) { 0 } + it 'fails to register' do + expect { plugin.register }.to raise_error(LogStash::ConfigurationError) + end + end + + context 'with `slices => 1`' do + let(:slices) { 1 } + it 'runs just one slice' do + expect(plugin).to receive(:do_run_slice).with(duck_type(:<<)) + expect(Thread).to_not receive(:new) + + plugin.register + plugin.run([]) + end + end + + context 'without slices directive' do + let(:config) { super().except('slices') } + it 'runs just one slice' do + expect(plugin).to receive(:do_run_slice).with(duck_type(:<<)) + expect(Thread).to_not receive(:new) + + plugin.register + plugin.run([]) + end + end + + 2.upto(8) do |slice_count| + context "with `slices => #{slice_count}`" do + let(:slices) { slice_count } + it "runs #{slice_count} independent slices" do + expect(Thread).to receive(:new).and_call_original.exactly(slice_count).times + slice_count.times do |slice_id| + expect(plugin).to receive(:do_run_slice).with(duck_type(:<<), slice_id) + end + + plugin.register + plugin.run([]) + end + end + end + + # This section of specs heavily mocks the Elasticsearch::Client, and ensures that the Elasticsearch Input Plugin + # behaves as expected when handling a series of sliced, scrolled requests/responses. + context 'adapter/integration' do + let(:response_template) do + { + "took" => 12, + "timed_out" => false, + "shards" => { + "total" => 6, + "successful" => 6, + "failed" => 0 + } + } + end + + let(:hits_template) do + { + "total" => 4, + "max_score" => 1.0, + "hits" => [] + } + end + + let(:hit_template) do + { + "_index" => "logstash-2018.08.23", + "_type" => "logs", + "_score" => 1.0, + "_source" => { "message" => ["hello, world"] } + } + end + + # BEGIN SLICE 0: a sequence of THREE scrolled responses containing 2, 1, and 0 items + # end-of-slice is reached when slice0_response2 is empty. + begin + let(:slice0_response0) do + response_template.merge({ + "_scroll_id" => slice0_scroll1, + "hits" => hits_template.merge("hits" => [ + hit_template.merge('_id' => "slice0-response0-item0"), + hit_template.merge('_id' => "slice0-response0-item1") + ]) + }) + end + let(:slice0_scroll1) { 'slice:0,scroll:1' } + let(:slice0_response1) do + response_template.merge({ + "_scroll_id" => slice0_scroll2, + "hits" => hits_template.merge("hits" => [ + hit_template.merge('_id' => "slice0-response1-item0") + ]) + }) + end + let(:slice0_scroll2) { 'slice:0,scroll:2' } + let(:slice0_response2) do + response_template.merge( + "_scroll_id" => slice0_scroll3, + "hits" => hits_template.merge({"hits" => []}) + ) + end + let(:slice0_scroll3) { 'slice:0,scroll:3' } + end + # END SLICE 0 + + # BEGIN SLICE 1: a sequence of TWO scrolled responses containing 2 and 2 items. + # end-of-slice is reached when slice1_response1 does not contain a next scroll id + begin + let(:slice1_response0) do + response_template.merge({ + "_scroll_id" => slice1_scroll1, + "hits" => hits_template.merge("hits" => [ + hit_template.merge('_id' => "slice1-response0-item0"), + hit_template.merge('_id' => "slice1-response0-item1") + ]) + }) + end + let(:slice1_scroll1) { 'slice:1,scroll:1' } + let(:slice1_response1) do + response_template.merge({ + "hits" => hits_template.merge("hits" => [ + hit_template.merge('_id' => "slice1-response1-item0"), + hit_template.merge('_id' => "slice1-response1-item1") + ]) + }) + end + end + # END SLICE 1 + + let(:client) { Elasticsearch::Client.new } + + # RSpec mocks validations are not threadsafe. + # Allow caller to synchronize. + def synchronize_method!(object, method_name) + original_method = object.method(method_name) + mutex = Mutex.new + allow(object).to receive(method_name).with(any_args) do |*method_args, &method_block| + mutex.synchronize do + original_method.call(*method_args,&method_block) + end + end + end + + before(:each) do + expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + plugin.register + + # SLICE0 is a three-page scroll in which the last page is empty + slice0_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 0, 'max' => 2})) + expect(client).to receive(:search).with(hash_including(:body => slice0_query)).and_return(slice0_response0) + expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll1 })).and_return(slice0_response1) + expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice0_scroll2 })).and_return(slice0_response2) + + # SLICE1 is a two-page scroll in which the last page has no next scroll id + slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2})) + expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0) + expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1) + + synchronize_method!(plugin, :scroll_request) + synchronize_method!(plugin, :search_request) + end + + let(:emitted_events) do + queue = Queue.new # since we are running slices in threads, we need a thread-safe queue. + plugin.run(queue) + events = [] + events << queue.pop until queue.empty? + events + end + + let(:emitted_event_ids) do + emitted_events.map { |event| event.get('[@metadata][_id]') } + end + + it 'emits the hits on the first page of the first slice' do + expect(emitted_event_ids).to include('slice0-response0-item0') + expect(emitted_event_ids).to include('slice0-response0-item1') + end + it 'emits the hits on the second page of the first slice' do + expect(emitted_event_ids).to include('slice0-response1-item0') + end + + it 'emits the hits on the first page of the second slice' do + expect(emitted_event_ids).to include('slice1-response0-item0') + expect(emitted_event_ids).to include('slice1-response0-item1') + end + + it 'emits the hitson the second page of the second slice' do + expect(emitted_event_ids).to include('slice1-response1-item0') + expect(emitted_event_ids).to include('slice1-response1-item1') + end + + it 'does not double-emit' do + expect(emitted_event_ids.uniq).to eq(emitted_event_ids) + end + + it 'emits events with appropriate fields' do + emitted_events.each do |event| + expect(event).to be_a(LogStash::Event) + expect(event.get('message')).to eq(['hello, world']) + expect(event.get('[@metadata][_id]')).to_not be_nil + expect(event.get('[@metadata][_id]')).to_not be_empty + expect(event.get('[@metadata][_index]')).to start_with('logstash-') + end + end + end + end + context "with Elasticsearch document information" do let!(:response) do { "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g", "took" => 27,