spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.12.3 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.13.0
- old
+ new
@@ -17,13 +17,20 @@
let(:plugin) { described_class.new(config) }
let(:queue) { Queue.new }
before(:each) do
- Elasticsearch::Client.send(:define_method, :ping) { } # define no-action ping method
+ Elasticsearch::Client.send(:define_method, :ping) { } # define no-action ping method
end
+ let(:base_config) do
+ {
+ 'hosts' => ["localhost"],
+ 'query' => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
+ }
+ end
+
context "register" do
let(:config) do
{
"schedule" => "* * * * * UTC"
}
@@ -76,18 +83,14 @@
before(:each) do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
end
let(:config) do
- %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- }
- }
- ]
+ {
+ 'hosts' => ["localhost"],
+ 'query' => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
+ }
end
let(:mock_response) do
{
"_scroll_id" => "cXVlcnlUaGVuRmV0Y2g",
@@ -126,36 +129,32 @@
expect(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(mock_scroll_response)
expect(client).to receive(:clear_scroll).and_return(nil)
expect(client).to receive(:ping)
end
+ before { plugin.register }
+
it 'creates the events from the hits' do
- event = input(config) do |pipeline, queue|
- queue.pop
- end
+ plugin.run queue
+ event = queue.pop
expect(event).to be_a(LogStash::Event)
expect(event.get("message")).to eql [ "ohayo" ]
end
context 'when a target is set' do
let(:config) do
- %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- target => "[@metadata][_source]"
- }
- }
- ]
+ {
+ 'hosts' => ["localhost"],
+ 'query' => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }',
+ 'target' => "[@metadata][_source]"
+ }
end
it 'creates the event using the target' do
- event = input(config) do |pipeline, queue|
- queue.pop
- end
+ plugin.run queue
+ event = queue.pop
expect(event).to be_a(LogStash::Event)
expect(event.get("[@metadata][_source][message]")).to eql [ "ohayo" ]
end
end
@@ -448,28 +447,25 @@
before(:each) do
allow_any_instance_of(described_class).to receive(:ecs_compatibility).and_return(ecs_compatibility)
end
- context 'with docinfo enabled' do
- let(:config_metadata) do
- %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- }
- }
- ]
+ before do
+ if do_register
+ plugin.register
+ plugin.run queue
end
+ end
- it "provides document info under metadata" do
- event = input(config_metadata) do |pipeline, queue|
- queue.pop
- end
+ let(:do_register) { true }
+ let(:event) { queue.pop }
+
+ context 'with docinfo enabled' do
+ let(:config) { base_config.merge 'docinfo' => true }
+
+ it "provides document info under metadata" do
if ecs_select.active_mode == :disabled
expect(event.get("[@metadata][_index]")).to eq('logstash-2014.10.12')
expect(event.get("[@metadata][_type]")).to eq('logs')
expect(event.get("[@metadata][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
else
@@ -477,127 +473,76 @@
expect(event.get("[@metadata][input][elasticsearch][_type]")).to eq('logs')
expect(event.get("[@metadata][input][elasticsearch][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
end
end
- it 'merges values if the `docinfo_target` already exist in the `_source` document' do
- config_metadata_with_hash = %Q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- docinfo_target => 'metadata_with_hash'
- }
- }
- ]
+ context 'with docinfo_target' do
+ let(:config) { base_config.merge 'docinfo' => true, 'docinfo_target' => docinfo_target }
+ let(:docinfo_target) { 'metadata_with_hash' }
- event = input(config_metadata_with_hash) do |pipeline, queue|
- queue.pop
+ it 'merges values if the `docinfo_target` already exist in the `_source` document' do
+ expect(event.get("[metadata_with_hash][_index]")).to eq('logstash-2014.10.12')
+ expect(event.get("[metadata_with_hash][_type]")).to eq('logs')
+ expect(event.get("[metadata_with_hash][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
+ expect(event.get("[metadata_with_hash][awesome]")).to eq("logstash")
end
- expect(event.get("[metadata_with_hash][_index]")).to eq('logstash-2014.10.12')
- expect(event.get("[metadata_with_hash][_type]")).to eq('logs')
- expect(event.get("[metadata_with_hash][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
- expect(event.get("[metadata_with_hash][awesome]")).to eq("logstash")
+ context 'non-existent' do
+ let(:docinfo_target) { 'meta' }
+
+ it 'should move the document information to the specified field' do
+ expect(event.get("[meta][_index]")).to eq('logstash-2014.10.12')
+ expect(event.get("[meta][_type]")).to eq('logs')
+ expect(event.get("[meta][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
+ end
+
+ end
+
end
context 'if the `docinfo_target` exist but is not of type hash' do
- let (:config) { {
- "hosts" => ["localhost"],
- "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }',
- "docinfo" => true,
- "docinfo_target" => 'metadata_with_string'
- } }
- it 'thows an exception if the `docinfo_target` exist but is not of type hash' do
+ let(:config) { base_config.merge 'docinfo' => true, "docinfo_target" => 'metadata_with_string' }
+ let(:do_register) { false }
+
+ it 'raises an exception if the `docinfo_target` exist but is not of type hash' do
expect(client).not_to receive(:clear_scroll)
plugin.register
expect { plugin.run([]) }.to raise_error(Exception, /incompatible event/)
end
- end
- it 'should move the document information to the specified field' do
- config = %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- docinfo_target => 'meta'
- }
- }
- ]
- event = input(config) do |pipeline, queue|
- queue.pop
- end
-
- expect(event.get("[meta][_index]")).to eq('logstash-2014.10.12')
- expect(event.get("[meta][_type]")).to eq('logs')
- expect(event.get("[meta][_id]")).to eq('C5b2xLQwTZa76jBmHIbwHQ')
end
- it "allows to specify which fields from the document info to save to metadata" do
- fields = ["_index"]
- config = %Q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- docinfo_fields => #{fields}
- }
- }]
+ context 'with docinfo_fields' do
+ let(:config) { base_config.merge 'docinfo' => true, "docinfo_fields" => ["_index"] }
- event = input(config) do |pipeline, queue|
- queue.pop
+ it "allows to specify which fields from the document info to save to metadata" do
+ meta_base = event.get(ecs_select.active_mode == :disabled ? "@metadata" : "[@metadata][input][elasticsearch]")
+ expect(meta_base.keys).to eql ["_index"]
end
- meta_base = event.get(ecs_select.active_mode == :disabled ? "@metadata" : "[@metadata][input][elasticsearch]")
- expect(meta_base.keys).to eq(fields)
end
- it 'should be able to reference metadata fields in `add_field` decorations' do
- config = %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- docinfo => true
- add_field => {
- 'identifier' => "foo:%{[@metadata][_type]}:%{[@metadata][_id]}"
- }
- }
- }
- ]
+ context 'add_field' do
+ let(:config) { base_config.merge 'docinfo' => true,
+ 'add_field' => { 'identifier' => "foo:%{[@metadata][_type]}:%{[@metadata][_id]}" } }
- event = input(config) do |pipeline, queue|
- queue.pop
- end
+ it 'should be able to reference metadata fields in `add_field` decorations' do
+ expect(event.get('identifier')).to eq('foo:logs:C5b2xLQwTZa76jBmHIbwHQ')
+ end if ecs_select.active_mode == :disabled
- expect(event.get('identifier')).to eq('foo:logs:C5b2xLQwTZa76jBmHIbwHQ')
- end if ecs_select.active_mode == :disabled
+ end
end
- end
+ context "when not defining the docinfo" do
+ let(:config) { base_config }
- context "when not defining the docinfo" do
- it 'should keep the document information in the root of the event' do
- config = %q[
- input {
- elasticsearch {
- hosts => ["localhost"]
- query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
- }
- }
- ]
- event = input(config) do |pipeline, queue|
- queue.pop
+ it 'should keep the document information in the root of the event' do
+ expect(event.get("[@metadata]")).to be_empty
end
-
- expect(event.get("[@metadata]")).to be_empty
end
+
end
end
describe "client" do
let(:config) do
@@ -738,13 +683,11 @@
@t = java.lang.Thread.new(
proc do
begin
@server = WEBrick::HTTPServer.new :Port => 0, :DocumentRoot => ".",
:Logger => Cabin::Channel.get, # silence WEBrick logging
- :StartCallback => Proc.new {
- queue.push("started")
- }
+ :StartCallback => Proc.new { queue.push("started") }
@port = @server.config[:Port]
@server.mount_proc '/' do |req, res|
res.body = '''
{
"name": "ce7ccfb438e8",
@@ -809,15 +752,13 @@
res['Content-Type'] = 'application/json'
@first_request = req
@first_req_waiter.countDown()
end
-
-
@server.start
rescue => e
- puts "Error in webserver thread #{e}"
+ warn "ERROR in webserver thread #{e.inspect}\n #{e.backtrace.join("\n ")}"
# ignore
end
end
)
@t.daemon = true
@@ -912,10 +853,12 @@
mock_client
end
plugin.register
end
+
+ after { plugin.do_stop }
end
end
context 'connect_timeout_seconds' do
include_examples('configurable timeout', 'connect_timeout_seconds', :connect_timeout)
@@ -940,16 +883,19 @@
before do
plugin.register
end
it "should properly schedule" do
- expect(plugin).to receive(:do_run) {
- queue << LogStash::Event.new({})
- }.at_least(:twice)
- runner = Thread.start { plugin.run(queue) }
- sleep 3.0
- plugin.stop
- runner.join
+ begin
+ expect(plugin).to receive(:do_run) {
+ queue << LogStash::Event.new({})
+ }.at_least(:twice)
+ runner = Thread.start { plugin.run(queue) }
+ sleep 3.0
+ ensure
+ plugin.do_stop
+ runner.join if runner
+ end
expect(queue.size).to be >= 2
end
end