spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-1.0.1 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-1.0.2
- old
+ new
@@ -1,11 +1,11 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/elasticsearch"
require "elasticsearch"
-describe "inputs/elasticsearch", :elasticsearch => true do
+describe LogStash::Inputs::Elasticsearch do
it "should retrieve json event from elasticseach" do
config = %q[
input {
elasticsearch {
hosts => ["localhost"]
@@ -45,11 +45,13 @@
client = Elasticsearch::Client.new
expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
expect(client).to receive(:search).with(any_args).and_return(response)
expect(client).to receive(:scroll).with({ :body => "cXVlcnlUaGVuRmV0Y2g", :scroll=> "1m" }).and_return(scroll_reponse)
- event = fetch_event(config)
+ event = input(config) do |pipeline, queue|
+ queue.pop
+ end
insist { event }.is_a?(LogStash::Event)
insist { event["message"] } == [ "ohayo" ]
end
@@ -100,11 +102,13 @@
expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
expect(client).to receive(:search).with(any_args).and_return(scan_response)
expect(client).to receive(:scroll).with({ :body => "DcrY3G1xff6SB", :scroll => "1m" }).and_return(scroll_responses.first)
expect(client).to receive(:scroll).with({ :body=> "cXVlcnlUaGVuRmV0Y2g", :scroll => "1m" }).and_return(scroll_responses.last)
- event = fetch_event(config)
+ event = input(config) do |pipeline, queue|
+ queue.pop
+ end
insist { event }.is_a?(LogStash::Event)
insist { event["message"] } == [ "ohayo" ]
end
@@ -179,11 +183,13 @@
docinfo_target => '#{metadata_field}'
}
}
]
- event = fetch_event(config_metadata_with_hash)
+ event = input(config_metadata_with_hash) do |pipeline, queue|
+ queue.pop
+ end
expect(event[metadata_field]["_index"]).to eq('logstash-2014.10.12')
expect(event[metadata_field]["_type"]).to eq('logs')
expect(event[metadata_field]["_id"]).to eq('C5b2xLQwTZa76jBmHIbwHQ')
expect(event[metadata_field]["awesome"]).to eq("logstash")
@@ -212,11 +218,13 @@
expect { pipeline.run }.to raise_error(Exception, /incompatible event/)
end
it "should move the document info to the @metadata field" do
- event = fetch_event(config_metadata)
+ event = input(config_metadata) do |pipeline, queue|
+ queue.pop
+ end
expect(event["[@metadata][_index]"]).to eq('logstash-2014.10.12')
expect(event["[@metadata][_type]"]).to eq('logs')
expect(event["[@metadata][_id]"]).to eq('C5b2xLQwTZa76jBmHIbwHQ')
end
@@ -231,11 +239,13 @@
docinfo => true
docinfo_target => 'meta'
}
}
]
- event = fetch_event(config)
+ event = input(config) do |pipeline, queue|
+ queue.pop
+ end
expect(event["[meta][_index]"]).to eq('logstash-2014.10.12')
expect(event["[meta][_type]"]).to eq('logs')
expect(event["[meta][_id]"]).to eq('C5b2xLQwTZa76jBmHIbwHQ')
end
@@ -251,11 +261,13 @@
docinfo => true
docinfo_fields => #{fields}
}
}]
- event = fetch_event(config)
+ event = input(config) do |pipeline, queue|
+ queue.pop
+ end
expect(event["@metadata"].keys).to eq(fields)
expect(event["[@metadata][_type]"]).to eq(nil)
expect(event["[@metadata][_index]"]).to eq('logstash-2014.10.12')
expect(event["[@metadata][_id]"]).to eq(nil)
@@ -271,28 +283,16 @@
scan => false
query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
}
}
]
- event = fetch_event(config)
+ event = input(config) do |pipeline, queue|
+ queue.pop
+ end
expect(event["[@metadata][_index]"]).to eq(nil)
expect(event["[@metadata][_type]"]).to eq(nil)
expect(event["[@metadata][_id]"]).to eq(nil)
end
end
end
-end
-
-def fetch_event(config)
- pipeline = LogStash::Pipeline.new(config)
- queue = Queue.new
- pipeline.instance_eval do
- @output_func = lambda { |event| queue << event }
- end
- pipeline_thread = Thread.new { pipeline.run }
- event = queue.pop
-
- pipeline_thread.join
-
- return event
end