spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.9.3 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.10.0
- old
+ new
@@ -6,17 +6,15 @@
require "timecop"
require "stud/temporary"
require "time"
require "date"
-class LogStash::Inputs::TestableElasticsearch < LogStash::Inputs::Elasticsearch
- attr_reader :client
-end
+require 'logstash/plugin_mixins/ecs_compatibility_support/spec_helper'
-describe LogStash::Inputs::TestableElasticsearch do
+describe LogStash::Inputs::Elasticsearch, :ecs_compatibility_support do
- let(:plugin) { LogStash::Inputs::TestableElasticsearch.new(config) }
+ let(:plugin) { described_class.new(config) }
let(:queue) { Queue.new }
it_behaves_like "an interruptible input plugin" do
let(:esclient) { double("elasticsearch-client") }
let(:config) do
@@ -38,11 +36,17 @@
allow(esclient).to receive(:scroll) { { "hits" => { "hits" => [hit] } } }
allow(esclient).to receive(:clear_scroll).and_return(nil)
end
end
- context 'creating events from Elasticsearch' do
+
+ ecs_compatibility_matrix(:disabled, :v1, :v8) do |ecs_select|
+
+ before(:each) do
+ allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
+ end
+
let(:config) do
%q[
input {
elasticsearch {
hosts => ["localhost"]
@@ -95,11 +99,10 @@
event = input(config) do |pipeline, queue|
queue.pop
end
expect(event).to be_a(LogStash::Event)
- puts event.to_hash_with_metadata
expect(event.get("message")).to eql [ "ohayo" ]
end
context 'when a target is set' do
let(:config) do
@@ -118,14 +121,14 @@
event = input(config) do |pipeline, queue|
queue.pop
end
expect(event).to be_a(LogStash::Event)
- puts event.to_hash_with_metadata
expect(event.get("[@metadata][_source][message]")).to eql [ "ohayo" ]
end
end
+
end
# This spec is an adapter-spec, ensuring that we send the right sequence of messages to our Elasticsearch Client
# to support sliced scrolling. The underlying implementation will spawn its own threads to consume, so we must be
# careful to use thread-safe constructs.
@@ -133,10 +136,11 @@
let(:config) do
{
'query' => "#{LogStash::Json.dump(query)}",
'slices' => slices,
'docinfo' => true, # include ids
+ 'docinfo_target' => '[@metadata]'
}
end
let(:query) do
{
"query" => {
@@ -403,131 +407,144 @@
expect(client).to receive(:search).with(any_args).and_return(response)
allow(client).to receive(:scroll).with({ :body => {:scroll_id => "cXVlcnlUaGVuRmV0Y2g"}, :scroll => "1m" }).and_return(scroll_reponse)
allow(client).to receive(:clear_scroll).and_return(nil)
end
- context 'when defining docinfo' do
- let(:config_metadata) do
- %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- }
- }
- ]
+ ecs_compatibility_matrix(:disabled, :v1, :v8) do |ecs_select|
+
+ before(:each) do
+ allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
end
- it 'merges the values if the `docinfo_target` already exist in the `_source` document' do
- config_metadata_with_hash = %Q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- docinfo_target => 'metadata_with_hash'
+ context 'with docinfo enabled' do
+ let(:config_metadata) do
+ %q[
+ input {
+ elasticsearch {
+ hosts => ["localhost"]
+ query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
+ docinfo => true
+ }
}
- }
- ]
+ ]
+ end
- event = input(config_metadata_with_hash) do |pipeline, queue|
- queue.pop
+ it "provides document info under metadata" do
+ event = input(config_metadata) do |pipeline, queue|
+ queue.pop
+ end
+
+ if ecs_select.active_mode == :disabled
+ expect(event.get("[@metadata][_index]")).to eq('logstash-2014.10.12')
+ expect(event.get("[@metadata][_type]")).to eq('logs')
+ expect(event.get("[@metadata][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
+ else
+ expect(event.get("[@metadata][input][elasticsearch][_index]")).to eq('logstash-2014.10.12')
+ expect(event.get("[@metadata][input][elasticsearch][_type]")).to eq('logs')
+ expect(event.get("[@metadata][input][elasticsearch][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
+ end
end
- expect(event.get("[metadata_with_hash][_index]")).to eq('logstash-2014.10.12')
- expect(event.get("[metadata_with_hash][_type]")).to eq('logs')
- expect(event.get("[metadata_with_hash][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
- expect(event.get("[metadata_with_hash][awesome]")).to eq("logstash")
- end
+ it 'merges values if the `docinfo_target` already exist in the `_source` document' do
+ config_metadata_with_hash = %Q[
+ input {
+ elasticsearch {
+ hosts => ["localhost"]
+ query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
+ docinfo => true
+ docinfo_target => 'metadata_with_hash'
+ }
+ }
+ ]
- context 'if the `docinfo_target` exist but is not of type hash' do
- let (:config) { {
- "hosts" => ["localhost"],
- "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }',
- "docinfo" => true,
- "docinfo_target" => 'metadata_with_string'
- } }
- it 'thows an exception if the `docinfo_target` exist but is not of type hash' do
- expect(client).not_to receive(:clear_scroll)
- plugin.register
- expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/)
+ event = input(config_metadata_with_hash) do |pipeline, queue|
+ queue.pop
+ end
+
+ expect(event.get("[metadata_with_hash][_index]")).to eq('logstash-2014.10.12')
+ expect(event.get("[metadata_with_hash][_type]")).to eq('logs')
+ expect(event.get("[metadata_with_hash][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
+ expect(event.get("[metadata_with_hash][awesome]")).to eq("logstash")
end
- end
- it "should move the document info to the @metadata field" do
- event = input(config_metadata) do |pipeline, queue|
- queue.pop
+ context 'if the `docinfo_target` exist but is not of type hash' do
+ let (:config) { {
+ "hosts" => ["localhost"],
+ "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }',
+ "docinfo" => true,
+ "docinfo_target" => 'metadata_with_string'
+ } }
+ it 'thows an exception if the `docinfo_target` exist but is not of type hash' do
+ expect(client).not_to receive(:clear_scroll)
+ plugin.register
+ expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/)
+ end
end
- expect(event.get("[@metadata][_index]")).to eq('logstash-2014.10.12')
- expect(event.get("[@metadata][_type]")).to eq('logs')
- expect(event.get("[@metadata][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
- end
-
- it 'should move the document information to the specified field' do
- config = %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- docinfo_target => 'meta'
+ it 'should move the document information to the specified field' do
+ config = %q[
+ input {
+ elasticsearch {
+ hosts => ["localhost"]
+ query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
+ docinfo => true
+ docinfo_target => 'meta'
+ }
}
- }
- ]
- event = input(config) do |pipeline, queue|
- queue.pop
+ ]
+ event = input(config) do |pipeline, queue|
+ queue.pop
+ end
+
+ expect(event.get("[meta][_index]")).to eq('logstash-2014.10.12')
+ expect(event.get("[meta][_type]")).to eq('logs')
+ expect(event.get("[meta][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
end
- expect(event.get("[meta][_index]")).to eq('logstash-2014.10.12')
- expect(event.get("[meta][_type]")).to eq('logs')
- expect(event.get("[meta][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
- end
+ it "allows to specify which fields from the document info to save to metadata" do
+ fields = ["_index"]
+ config = %Q[
+ input {
+ elasticsearch {
+ hosts => ["localhost"]
+ query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
+ docinfo => true
+ docinfo_fields => #{fields}
+ }
+ }]
- it "should allow to specify which fields from the document info to save to the @metadata field" do
- fields = ["_index"]
- config = %Q[
+ event = input(config) do |pipeline, queue|
+ queue.pop
+ end
+
+ meta_base = event.get(ecs_select.active_mode == :disabled ? "@metadata" : "[@metadata][input][elasticsearch]")
+ expect(meta_base.keys).to eq(fields)
+ end
+
+ it 'should be able to reference metadata fields in `add_field` decorations' do
+ config = %q[
input {
elasticsearch {
hosts => ["localhost"]
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
docinfo => true
- docinfo_fields => #{fields}
+ add_field => {
+ 'identifier' => "foo:%{[@metadata][_type]}:%{[@metadata][_id]}"
+ }
}
- }]
-
- event = input(config) do |pipeline, queue|
- queue.pop
- end
-
- expect(event.get("@metadata").keys).to eq(fields)
- expect(event.get("[@metadata][_type]")).to eq(nil)
- expect(event.get("[@metadata][_index]")).to eq('logstash-2014.10.12')
- expect(event.get("[@metadata][_id]")).to eq(nil)
- end
-
- it 'should be able to reference metadata fields in `add_field` decorations' do
- config = %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- add_field => {
- 'identifier' => "foo:%{[@metadata][_type]}:%{[@metadata][_id]}"
- }
}
- }
- ]
+ ]
- event = input(config) do |pipeline, queue|
- queue.pop
- end
+ event = input(config) do |pipeline, queue|
+ queue.pop
+ end
- expect(event.get('identifier')).to eq('foo:logs:C5b2xLQwTZa76jBmHIbwHQ')
+ expect(event.get('identifier')).to eq('foo:logs:C5b2xLQwTZa76jBmHIbwHQ')
+ end if ecs_select.active_mode == :disabled
+
end
+
end
context "when not defining the docinfo" do
it 'should keep the document information in the root of the event' do
config = %q[
@@ -540,13 +557,11 @@
]
event = input(config) do |pipeline, queue|
queue.pop
end
- expect(event.get("[@metadata][_index]")).to eq(nil)
- expect(event.get("[@metadata][_type]")).to eq(nil)
- expect(event.get("[@metadata][_id]")).to eq(nil)
+ expect(event.get("[@metadata]")).to be_empty
end
end
end
describe "client" do
@@ -566,17 +581,17 @@
let(:config) { super().merge({ 'cloud_id' => valid_cloud_id }) }
it "should set host(s)" do
plugin.register
client = plugin.send(:client)
- expect( client.transport.instance_variable_get(:@hosts) ).to eql [{
- :scheme => "https",
- :host => "ac31ebb90241773157043c34fd26fd46.us-central1.gcp.cloud.es.io",
- :port => 9243,
- :path => "",
- :protocol => "https"
- }]
+ expect( extract_transport(client).hosts ).to eql [{
+ :scheme => "https",
+ :host => "ac31ebb90241773157043c34fd26fd46.us-central1.gcp.cloud.es.io",
+ :port => 9243,
+ :path => "",
+ :protocol => "https"
+ }]
end
context 'invalid' do
let(:config) { super().merge({ 'cloud_id' => 'invalid:dXMtY2VudHJhbDEuZ2NwLmNsb3VkLmVzLmlv' }) }
@@ -598,11 +613,11 @@
let(:config) { super().merge({ 'cloud_auth' => LogStash::Util::Password.new('elastic:my-passwd-00') }) }
it "should set authorization" do
plugin.register
client = plugin.send(:client)
- auth_header = client.transport.instance_variable_get(:@options)[:transport_options][:headers]['Authorization']
+ auth_header = extract_transport(client).options[:transport_options][:headers]['Authorization']
expect( auth_header ).to eql "Basic #{Base64.encode64('elastic:my-passwd-00').rstrip}"
end
context 'invalid' do
@@ -635,11 +650,11 @@
let(:config) { super().merge({ 'api_key' => LogStash::Util::Password.new('foo:bar'), "ssl" => true }) }
it "should set authorization" do
plugin.register
client = plugin.send(:client)
- auth_header = client.transport.instance_variable_get(:@options)[:transport_options][:headers]['Authorization']
+ auth_header = extract_transport(client).options[:transport_options][:headers]['Authorization']
expect( auth_header ).to eql "ApiKey #{Base64.strict_encode64('foo:bar')}"
end
context 'user also set' do
@@ -656,11 +671,11 @@
let(:config) { super().merge({ 'proxy' => 'http://localhost:1234' }) }
it "should set proxy" do
plugin.register
client = plugin.send(:client)
- proxy = client.transport.instance_variable_get(:@options)[:transport_options][:proxy]
+ proxy = extract_transport(client).options[:transport_options][:proxy]
expect( proxy ).to eql "http://localhost:1234"
end
context 'invalid' do
@@ -668,11 +683,11 @@
it "should not set proxy" do
plugin.register
client = plugin.send(:client)
- expect( client.transport.instance_variable_get(:@options)[:transport_options] ).to_not include(:proxy)
+ expect( extract_transport(client).options[:transport_options] ).to_not include(:proxy)
end
end
end
shared_examples'configurable timeout' do |config_name, manticore_transport_option|
@@ -754,6 +769,12 @@
expect(queue.size).to eq(2)
Timecop.return
end
end
+
+ # @note can be removed once we depends on elasticsearch gem >= 6.x
+ def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client
+ client.transport.respond_to?(:transport) ? client.transport.transport : client.transport
+ end
+
end