spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-1.0.1 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-1.0.2

- old
+ new

@@ -1,11 +1,11 @@ # encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/elasticsearch" require "elasticsearch" -describe "inputs/elasticsearch", :elasticsearch => true do +describe LogStash::Inputs::Elasticsearch do it "should retrieve json event from elasticseach" do config = %q[ input { elasticsearch { hosts => ["localhost"] @@ -45,11 +45,13 @@ client = Elasticsearch::Client.new expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) expect(client).to receive(:search).with(any_args).and_return(response) expect(client).to receive(:scroll).with({ :body => "cXVlcnlUaGVuRmV0Y2g", :scroll=> "1m" }).and_return(scroll_reponse) - event = fetch_event(config) + event = input(config) do |pipeline, queue| + queue.pop + end insist { event }.is_a?(LogStash::Event) insist { event["message"] } == [ "ohayo" ] end @@ -100,11 +102,13 @@ expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) expect(client).to receive(:search).with(any_args).and_return(scan_response) expect(client).to receive(:scroll).with({ :body => "DcrY3G1xff6SB", :scroll => "1m" }).and_return(scroll_responses.first) expect(client).to receive(:scroll).with({ :body=> "cXVlcnlUaGVuRmV0Y2g", :scroll => "1m" }).and_return(scroll_responses.last) - event = fetch_event(config) + event = input(config) do |pipeline, queue| + queue.pop + end insist { event }.is_a?(LogStash::Event) insist { event["message"] } == [ "ohayo" ] end @@ -179,11 +183,13 @@ docinfo_target => '#{metadata_field}' } } ] - event = fetch_event(config_metadata_with_hash) + event = input(config_metadata_with_hash) do |pipeline, queue| + queue.pop + end expect(event[metadata_field]["_index"]).to eq('logstash-2014.10.12') expect(event[metadata_field]["_type"]).to eq('logs') expect(event[metadata_field]["_id"]).to eq('C5b2xLQwTZa76jBmHIbwHQ') expect(event[metadata_field]["awesome"]).to eq("logstash") @@ -212,11 +218,13 @@ expect { pipeline.run }.to raise_error(Exception, /incompatible event/) end it "should move the document info to the @metadata field" do - event = fetch_event(config_metadata) + event = input(config_metadata) do |pipeline, queue| + queue.pop + end expect(event["[@metadata][_index]"]).to eq('logstash-2014.10.12') expect(event["[@metadata][_type]"]).to eq('logs') expect(event["[@metadata][_id]"]).to eq('C5b2xLQwTZa76jBmHIbwHQ') end @@ -231,11 +239,13 @@ docinfo => true docinfo_target => 'meta' } } ] - event = fetch_event(config) + event = input(config) do |pipeline, queue| + queue.pop + end expect(event["[meta][_index]"]).to eq('logstash-2014.10.12') expect(event["[meta][_type]"]).to eq('logs') expect(event["[meta][_id]"]).to eq('C5b2xLQwTZa76jBmHIbwHQ') end @@ -251,11 +261,13 @@ docinfo => true docinfo_fields => #{fields} } }] - event = fetch_event(config) + event = input(config) do |pipeline, queue| + queue.pop + end expect(event["@metadata"].keys).to eq(fields) expect(event["[@metadata][_type]"]).to eq(nil) expect(event["[@metadata][_index]"]).to eq('logstash-2014.10.12') expect(event["[@metadata][_id]"]).to eq(nil) @@ -271,28 +283,16 @@ scan => false query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' } } ] - event = fetch_event(config) + event = input(config) do |pipeline, queue| + queue.pop + end expect(event["[@metadata][_index]"]).to eq(nil) expect(event["[@metadata][_type]"]).to eq(nil) expect(event["[@metadata][_id]"]).to eq(nil) end end end -end - -def fetch_event(config) - pipeline = LogStash::Pipeline.new(config) - queue = Queue.new - pipeline.instance_eval do - @output_func = lambda { |event| queue << event } - end - pipeline_thread = Thread.new { pipeline.run } - event = queue.pop - - pipeline_thread.join - - return event end