spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.9.3 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.10.0

- old
+ new

@@ -6,17 +6,15 @@ require "timecop" require "stud/temporary" require "time" require "date" -class LogStash::Inputs::TestableElasticsearch < LogStash::Inputs::Elasticsearch - attr_reader :client -end +require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper' -describe LogStash::Inputs::TestableElasticsearch do +describe LogStash::Inputs::Elasticsearch, :ecs_compatibility_support do - let(:plugin) { LogStash::Inputs::TestableElasticsearch.new(config) } + let(:plugin) { described_class.new(config) } let(:queue) { Queue.new } it_behaves_like "an interruptible input plugin" do let(:esclient) { double("elasticsearch-client") } let(:config) do @@ -38,11 +36,17 @@ allow(esclient).to receive(:scroll) { { "hits" => { "hits" => [hit] } } } allow(esclient).to receive(:clear_scroll).and_return(nil) end end - context 'creating events from Elasticsearch' do + + ecs_compatibility_matrix(:disabled, :v1, :v8) do |ecs_select| + + before(:each) do + allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) + end + let(:config) do %q[ input { elasticsearch { hosts => ["localhost"] @@ -95,11 +99,10 @@ event = input(config) do |pipeline, queue| queue.pop end expect(event).to be_a(LogStash::Event) - puts event.to_hash_with_metadata expect(event.get("message")).to eql [ "ohayo" ] end context 'when a target is set' do let(:config) do @@ -118,14 +121,14 @@ event = input(config) do |pipeline, queue| queue.pop end expect(event).to be_a(LogStash::Event) - puts event.to_hash_with_metadata expect(event.get("[@metadata][_source][message]")).to eql [ "ohayo" ] end end + end # This spec is an adapter-spec, ensuring that we send the right sequence of messages to our Elasticsearch Client # to support sliced scrolling. The underlying implementation will spawn its own threads to consume, so we must be # careful to use thread-safe constructs. @@ -133,10 +136,11 @@ let(:config) do { 'query' => "#{LogStash::Json.dump(query)}", 'slices' => slices, 'docinfo' => true, # include ids + 'docinfo_target' => '[@metadata]' } end let(:query) do { "query" => { @@ -403,131 +407,144 @@ expect(client).to receive(:search).with(any_args).and_return(response) allow(client).to receive(:scroll).with({ :body => {:scroll_id => "cXVlcnlUaGVuRmV0Y2g"}, :scroll => "1m" }).and_return(scroll_reponse) allow(client).to receive(:clear_scroll).and_return(nil) end - context 'when defining docinfo' do - let(:config_metadata) do - %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - } - } - ] + ecs_compatibility_matrix(:disabled, :v1, :v8) do |ecs_select| + + before(:each) do + allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility) end - it 'merges the values if the `docinfo_target` already exist in the `_source` document' do - config_metadata_with_hash = %Q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - docinfo_target => 'metadata_with_hash' + context 'with docinfo enabled' do + let(:config_metadata) do + %q[ + input { + elasticsearch { + hosts => ["localhost"] + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + } } - } - ] + ] + end - event = input(config_metadata_with_hash) do |pipeline, queue| - queue.pop + it "provides document info under metadata" do + event = input(config_metadata) do |pipeline, queue| + queue.pop + end + + if ecs_select.active_mode == :disabled + expect(event.get("[@metadata][_index]")).to eq('logstash-2014.10.12') + expect(event.get("[@metadata][_type]")).to eq('logs') + expect(event.get("[@metadata][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') + else + expect(event.get("[@metadata][input][elasticsearch][_index]")).to eq('logstash-2014.10.12') + expect(event.get("[@metadata][input][elasticsearch][_type]")).to eq('logs') + expect(event.get("[@metadata][input][elasticsearch][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') + end end - expect(event.get("[metadata_with_hash][_index]")).to eq('logstash-2014.10.12') - expect(event.get("[metadata_with_hash][_type]")).to eq('logs') - expect(event.get("[metadata_with_hash][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') - expect(event.get("[metadata_with_hash][awesome]")).to eq("logstash") - end + it 'merges values if the `docinfo_target` already exist in the `_source` document' do + config_metadata_with_hash = %Q[ + input { + elasticsearch { + hosts => ["localhost"] + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + docinfo_target => 'metadata_with_hash' + } + } + ] - context 'if the `docinfo_target` exist but is not of type hash' do - let (:config) { { - "hosts" => ["localhost"], - "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }', - "docinfo" => true, - "docinfo_target" => 'metadata_with_string' - } } - it 'thows an exception if the `docinfo_target` exist but is not of type hash' do - expect(client).not_to receive(:clear_scroll) - plugin.register - expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/) + event = input(config_metadata_with_hash) do |pipeline, queue| + queue.pop + end + + expect(event.get("[metadata_with_hash][_index]")).to eq('logstash-2014.10.12') + expect(event.get("[metadata_with_hash][_type]")).to eq('logs') + expect(event.get("[metadata_with_hash][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') + expect(event.get("[metadata_with_hash][awesome]")).to eq("logstash") end - end - it "should move the document info to the @metadata field" do - event = input(config_metadata) do |pipeline, queue| - queue.pop + context 'if the `docinfo_target` exist but is not of type hash' do + let (:config) { { + "hosts" => ["localhost"], + "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }', + "docinfo" => true, + "docinfo_target" => 'metadata_with_string' + } } + it 'thows an exception if the `docinfo_target` exist but is not of type hash' do + expect(client).not_to receive(:clear_scroll) + plugin.register + expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/) + end end - expect(event.get("[@metadata][_index]")).to eq('logstash-2014.10.12') - expect(event.get("[@metadata][_type]")).to eq('logs') - expect(event.get("[@metadata][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') - end - - it 'should move the document information to the specified field' do - config = %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - docinfo_target => 'meta' + it 'should move the document information to the specified field' do + config = %q[ + input { + elasticsearch { + hosts => ["localhost"] + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + docinfo_target => 'meta' + } } - } - ] - event = input(config) do |pipeline, queue| - queue.pop + ] + event = input(config) do |pipeline, queue| + queue.pop + end + + expect(event.get("[meta][_index]")).to eq('logstash-2014.10.12') + expect(event.get("[meta][_type]")).to eq('logs') + expect(event.get("[meta][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') end - expect(event.get("[meta][_index]")).to eq('logstash-2014.10.12') - expect(event.get("[meta][_type]")).to eq('logs') - expect(event.get("[meta][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ') - end + it "allows to specify which fields from the document info to save to metadata" do + fields = ["_index"] + config = %Q[ + input { + elasticsearch { + hosts => ["localhost"] + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + docinfo_fields => #{fields} + } + }] - it "should allow to specify which fields from the document info to save to the @metadata field" do - fields = ["_index"] - config = %Q[ + event = input(config) do |pipeline, queue| + queue.pop + end + + meta_base = event.get(ecs_select.active_mode == :disabled ? "@metadata" : "[@metadata][input][elasticsearch]") + expect(meta_base.keys).to eq(fields) + end + + it 'should be able to reference metadata fields in `add_field` decorations' do + config = %q[ input { elasticsearch { hosts => ["localhost"] query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' docinfo => true - docinfo_fields => #{fields} + add_field => { + 'identifier' => "foo:%{[@metadata][_type]}:%{[@metadata][_id]}" + } } - }] - - event = input(config) do |pipeline, queue| - queue.pop - end - - expect(event.get("@metadata").keys).to eq(fields) - expect(event.get("[@metadata][_type]")).to eq(nil) - expect(event.get("[@metadata][_index]")).to eq('logstash-2014.10.12') - expect(event.get("[@metadata][_id]")).to eq(nil) - end - - it 'should be able to reference metadata fields in `add_field` decorations' do - config = %q[ - input { - elasticsearch { - hosts => ["localhost"] - query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' - docinfo => true - add_field => { - 'identifier' => "foo:%{[@metadata][_type]}:%{[@metadata][_id]}" - } } - } - ] + ] - event = input(config) do |pipeline, queue| - queue.pop - end + event = input(config) do |pipeline, queue| + queue.pop + end - expect(event.get('identifier')).to eq('foo:logs:C5b2xLQwTZa76jBmHIbwHQ') + expect(event.get('identifier')).to eq('foo:logs:C5b2xLQwTZa76jBmHIbwHQ') + end if ecs_select.active_mode == :disabled + end + end context "when not defining the docinfo" do it 'should keep the document information in the root of the event' do config = %q[ @@ -540,13 +557,11 @@ ] event = input(config) do |pipeline, queue| queue.pop end - expect(event.get("[@metadata][_index]")).to eq(nil) - expect(event.get("[@metadata][_type]")).to eq(nil) - expect(event.get("[@metadata][_id]")).to eq(nil) + expect(event.get("[@metadata]")).to be_empty end end end describe "client" do @@ -566,17 +581,17 @@ let(:config) { super().merge({ 'cloud_id' => valid_cloud_id }) } it "should set host(s)" do plugin.register client = plugin.send(:client) - expect( client.transport.instance_variable_get(:@hosts) ).to eql [{ - :scheme => "https", - :host => "ac31ebb90241773157043c34fd26fd46.us-central1.gcp.cloud.es.io", - :port => 9243, - :path => "", - :protocol => "https" - }] + expect( extract_transport(client).hosts ).to eql [{ + :scheme => "https", + :host => "ac31ebb90241773157043c34fd26fd46.us-central1.gcp.cloud.es.io", + :port => 9243, + :path => "", + :protocol => "https" + }] end context 'invalid' do let(:config) { super().merge({ 'cloud_id' => 'invalid:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlv' }) } @@ -598,11 +613,11 @@ let(:config) { super().merge({ 'cloud_auth' => LogStash::Util::Password.new('elastic:my-passwd-00') }) } it "should set authorization" do plugin.register client = plugin.send(:client) - auth_header = client.transport.instance_variable_get(:@options)[:transport_options][:headers]['Authorization'] + auth_header = extract_transport(client).options[:transport_options][:headers]['Authorization'] expect( auth_header ).to eql "Basic #{Base64.encode64('elastic:my-passwd-00').rstrip}" end context 'invalid' do @@ -635,11 +650,11 @@ let(:config) { super().merge({ 'api_key' => LogStash::Util::Password.new('foo:bar'), "ssl" => true }) } it "should set authorization" do plugin.register client = plugin.send(:client) - auth_header = client.transport.instance_variable_get(:@options)[:transport_options][:headers]['Authorization'] + auth_header = extract_transport(client).options[:transport_options][:headers]['Authorization'] expect( auth_header ).to eql "ApiKey #{Base64.strict_encode64('foo:bar')}" end context 'user also set' do @@ -656,11 +671,11 @@ let(:config) { super().merge({ 'proxy' => 'http://localhost:1234' }) } it "should set proxy" do plugin.register client = plugin.send(:client) - proxy = client.transport.instance_variable_get(:@options)[:transport_options][:proxy] + proxy = extract_transport(client).options[:transport_options][:proxy] expect( proxy ).to eql "http://localhost:1234" end context 'invalid' do @@ -668,11 +683,11 @@ it "should not set proxy" do plugin.register client = plugin.send(:client) - expect( client.transport.instance_variable_get(:@options)[:transport_options] ).to_not include(:proxy) + expect( extract_transport(client).options[:transport_options] ).to_not include(:proxy) end end end shared_examples'configurable timeout' do |config_name, manticore_transport_option| @@ -754,6 +769,12 @@ expect(queue.size).to eq(2) Timecop.return end end + + # @note can be removed once we depends on elasticsearch gem >= 6.x + def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client + client.transport.respond_to?(:transport) ? client.transport.transport : client.transport + end + end