spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.12.3 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.13.0

- old
+ new

@@ -17,13 +17,20 @@ let(:plugin) { described_class.new(config) } let(:queue) { Queue.new } before(:each) do - Elasticsearch::Client.send(:define_method, :ping) { } # define no-action ping method + Elasticsearch::Client.send(:define_method, :ping) { } # define no-action ping method end + let(:base_config) do + { + 'hosts' => ["localhost"], + 'query' => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + } + end + context "register" do let(:config) do { "schedule" => "* * * * * UTC" } @@ -76,18 +83,14 @@ before(:each) do allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) end let(:config) do - %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - } - } - ] + { + 'hosts' => ["localhost"], + 'query' => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + } end let(:mock_response) do { "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g", @@ -126,36 +129,32 @@ expect(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(mock_scroll_response) expect(client).to receive(:clear_scroll).and_return(nil) expect(client).to receive(:ping) end + before { plugin.register } + it 'creates the events from the hits' do - event = input(config) do |pipeline, queue| - queue.pop - end + plugin.run queue + event = queue.pop expect(event).to be_a(LogStash::Event) expect(event.get("message")).to eql [ "ohayo" ] end context 'when a target is set' do let(:config) do - %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - target => "[@metadata][_source]" - } - } - ] + { + 'hosts' => ["localhost"], + 'query' => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }', + 'target' => "[@metadata][_source]" + } end it 'creates the event using the target' do - event = input(config) do |pipeline, queue| - queue.pop - end + plugin.run queue + event = queue.pop expect(event).to be_a(LogStash::Event) expect(event.get("[@metadata][_source][message]")).to eql [ "ohayo" ] end end @@ -448,28 +447,25 @@ before(:each) do allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) end - context 'with docinfo enabled' do - let(:config_metadata) do - %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - } - } - ] + before do + if do_register + plugin.register + plugin.run queue end + end - it "provides document info under metadata" do - event = input(config_metadata) do |pipeline, queue| - queue.pop - end + let(:do_register) { true } + let(:event) { queue.pop } + + context 'with docinfo enabled' do + let(:config) { base_config.merge 'docinfo' => true } + + it "provides document info under metadata" do if ecs_select.active_mode == :disabled expect(event.get("[@metadata][_index]")).to eq('logstash-2014.10.12') expect(event.get("[@metadata][_type]")).to eq('logs') expect(event.get("[@metadata][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') else @@ -477,127 +473,76 @@ expect(event.get("[@metadata][input][elasticsearch][_type]")).to eq('logs') expect(event.get("[@metadata][input][elasticsearch][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') end end - it 'merges values if the `docinfo_target` already exist in the `_source` document' do - config_metadata_with_hash = %Q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - docinfo_target => 'metadata_with_hash' - } - } - ] + context 'with docinfo_target' do + let(:config) { base_config.merge 'docinfo' => true, 'docinfo_target' => docinfo_target } + let(:docinfo_target) { 'metadata_with_hash' } - event = input(config_metadata_with_hash) do |pipeline, queue| - queue.pop + it 'merges values if the `docinfo_target` already exist in the `_source` document' do + expect(event.get("[metadata_with_hash][_index]")).to eq('logstash-2014.10.12') + expect(event.get("[metadata_with_hash][_type]")).to eq('logs') + expect(event.get("[metadata_with_hash][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') + expect(event.get("[metadata_with_hash][awesome]")).to eq("logstash") end - expect(event.get("[metadata_with_hash][_index]")).to eq('logstash-2014.10.12') - expect(event.get("[metadata_with_hash][_type]")).to eq('logs') - expect(event.get("[metadata_with_hash][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') - expect(event.get("[metadata_with_hash][awesome]")).to eq("logstash") + context 'non-existent' do + let(:docinfo_target) { 'meta' } + + it 'should move the document information to the specified field' do + expect(event.get("[meta][_index]")).to eq('logstash-2014.10.12') + expect(event.get("[meta][_type]")).to eq('logs') + expect(event.get("[meta][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') + end + + end + end context 'if the `docinfo_target` exist but is not of type hash' do - let (:config) { { - "hosts" => ["localhost"], - "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }', - "docinfo" => true, - "docinfo_target" => 'metadata_with_string' - } } - it 'thows an exception if the `docinfo_target` exist but is not of type hash' do + let(:config) { base_config.merge 'docinfo' => true, "docinfo_target" => 'metadata_with_string' } + let(:do_register) { false } + + it 'raises an exception if the `docinfo_target` exist but is not of type hash' do expect(client).not_to receive(:clear_scroll) plugin.register expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/) end - end - it 'should move the document information to the specified field' do - config = %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - docinfo_target => 'meta' - } - } - ] - event = input(config) do |pipeline, queue| - queue.pop - end - - expect(event.get("[meta][_index]")).to eq('logstash-2014.10.12') - expect(event.get("[meta][_type]")).to eq('logs') - expect(event.get("[meta][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') end - it "allows to specify which fields from the document info to save to metadata" do - fields = ["_index"] - config = %Q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - docinfo_fields => #{fields} - } - }] + context 'with docinfo_fields' do + let(:config) { base_config.merge 'docinfo' => true, "docinfo_fields" => ["_index"] } - event = input(config) do |pipeline, queue| - queue.pop + it "allows to specify which fields from the document info to save to metadata" do + meta_base = event.get(ecs_select.active_mode == :disabled ? "@metadata" : "[@metadata][input][elasticsearch]") + expect(meta_base.keys).to eql ["_index"] end - meta_base = event.get(ecs_select.active_mode == :disabled ? "@metadata" : "[@metadata][input][elasticsearch]") - expect(meta_base.keys).to eq(fields) end - it 'should be able to reference metadata fields in `add_field` decorations' do - config = %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - add_field => { - 'identifier' => "foo:%{[@metadata][_type]}:%{[@metadata][_id]}" - } - } - } - ] + context 'add_field' do + let(:config) { base_config.merge 'docinfo' => true, + 'add_field' => { 'identifier' => "foo:%{[@metadata][_type]}:%{[@metadata][_id]}" } } - event = input(config) do |pipeline, queue| - queue.pop - end + it 'should be able to reference metadata fields in `add_field` decorations' do + expect(event.get('identifier')).to eq('foo:logs:C5b2xLQwTZa76jBmHIbwHQ') + end if ecs_select.active_mode == :disabled - expect(event.get('identifier')).to eq('foo:logs:C5b2xLQwTZa76jBmHIbwHQ') - end if ecs_select.active_mode == :disabled + end end - end + context "when not defining the docinfo" do + let(:config) { base_config } - context "when not defining the docinfo" do - it 'should keep the document information in the root of the event' do - config = %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - } - } - ] - event = input(config) do |pipeline, queue| - queue.pop + it 'should keep the document information in the root of the event' do + expect(event.get("[@metadata]")).to be_empty end - - expect(event.get("[@metadata]")).to be_empty end + end end describe "client" do let(:config) do @@ -738,13 +683,11 @@ @t = java.lang.Thread.new( proc do begin @server = WEBrick::HTTPServer.new :Port => 0, :DocumentRoot => ".", :Logger => Cabin::Channel.get, # silence WEBrick logging - :StartCallback => Proc.new { - queue.push("started") - } + :StartCallback => Proc.new { queue.push("started") } @port = @server.config[:Port] @server.mount_proc '/' do |req, res| res.body = ''' { "name": "ce7ccfb438e8", @@ -809,15 +752,13 @@ res['Content-Type'] = 'application/json' @first_request = req @first_req_waiter.countDown() end - - @server.start rescue => e - puts "Error in webserver thread #{e}" + warn "ERROR in webserver thread #{e.inspect}\n #{e.backtrace.join("\n ")}" # ignore end end ) @t.daemon = true @@ -912,10 +853,12 @@ mock_client end plugin.register end + + after { plugin.do_stop } end end context 'connect_timeout_seconds' do include_examples('configurable timeout', 'connect_timeout_seconds', :connect_timeout) @@ -940,16 +883,19 @@ before do plugin.register end it "should properly schedule" do - expect(plugin).to receive(:do_run) { - queue << LogStash::Event.new({}) - }.at_least(:twice) - runner = Thread.start { plugin.run(queue) } - sleep 3.0 - plugin.stop - runner.join + begin + expect(plugin).to receive(:do_run) { + queue << LogStash::Event.new({}) + }.at_least(:twice) + runner = Thread.start { plugin.run(queue) } + sleep 3.0 + ensure + plugin.do_stop + runner.join if runner + end expect(queue.size).to be >= 2 end end