spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-0.1.1 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-0.1.2

- old
+ new

@@ -1,13 +1,12 @@ +# encoding: utf-8 require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/elasticsearch" require "elasticsearch" describe "inputs/elasticsearch" do - it "should retrieve json event from elasticseach" do - config = %q[ input { elasticsearch { hosts => ["node01"] scan => false @@ -31,11 +30,11 @@ "hits" => [ { "_index" => "logstash-2014.10.12", "_type" => "logs", "_id" => "C5b2xLQwTZa76jBmHIbwHQ", "_score" => 1.0, - "fields" => {"message" => ["ohayo"] } + "_source" => { "message" => ["ohayo"] } } ] } } scroll_reponse = { @@ -44,27 +43,19 @@ } client = Elasticsearch::Client.new expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) expect(client).to receive(:search).with(any_args).and_return(response) - expect(client).to receive(:scroll).with({:body=>"cXVlcnlUaGVuRmV0Y2g", :scroll=>"1m"}).and_return(scroll_reponse) + expect(client).to receive(:scroll).with({ :body => "cXVlcnlUaGVuRmV0Y2g", :scroll=> "1m" }).and_return(scroll_reponse) - pipeline = LogStash::Pipeline.new(config) - queue = Queue.new - pipeline.instance_eval do - @output_func = lambda { |event| queue << event } - end - pipeline_thread = Thread.new { pipeline.run } - event = queue.pop + event = fetch_event(config) - insist { event["fields"]["message"] } == [ "ohayo" ] - - pipeline_thread.join + insist { event }.is_a?(LogStash::Event) + insist { event["message"] } == [ "ohayo" ] end it "should retrieve json event from elasticseach with scan" do - config = %q[ input { elasticsearch { hosts => ["node01"] scan => true @@ -93,11 +84,11 @@ "hits" => [ { "_index" => "logstash-2014.10.12", "_type" => "logs", "_id" => "C5b2xLQwTZa76jBmHIbwHQ", "_score" => 1.0, - "fields" => {"message" => ["ohayo"] } + "_source" => { "message" => ["ohayo"] } } ] } }, { "_scroll_id" => "r453Wc1jh0caLJhSDg", @@ -106,21 +97,202 @@ ] client = Elasticsearch::Client.new expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) expect(client).to receive(:search).with(any_args).and_return(scan_response) - expect(client).to receive(:scroll).with({:body=>"DcrY3G1xff6SB", :scroll=>"1m"}).and_return(scroll_responses.first) - expect(client).to receive(:scroll).with({:body=>"cXVlcnlUaGVuRmV0Y2g", :scroll=>"1m"}).and_return(scroll_responses.last) + expect(client).to receive(:scroll).with({ :body => "DcrY3G1xff6SB", :scroll => "1m" }).and_return(scroll_responses.first) + expect(client).to receive(:scroll).with({ :body=> "cXVlcnlUaGVuRmV0Y2g", :scroll => "1m" }).and_return(scroll_responses.last) - pipeline = LogStash::Pipeline.new(config) - queue = Queue.new - pipeline.instance_eval do - @output_func = lambda { |event| queue << event } + event = fetch_event(config) + + insist { event }.is_a?(LogStash::Event) + insist { event["message"] } == [ "ohayo" ] + end + + context "with Elasticsearch document information" do + let!(:response) do + { + "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g", + "took" => 27, + "timed_out" => false, + "_shards" => { + "total" => 169, + "successful" => 169, + "failed" => 0 + }, + "hits" => { + "total" => 1, + "max_score" => 1.0, + "hits" => [ { + "_index" => "logstash-2014.10.12", + "_type" => "logs", + "_id" => "C5b2xLQwTZa76jBmHIbwHQ", + "_score" => 1.0, + "_source" => { + "message" => ["ohayo"], + "metadata_with_hash" => { "awesome" => "logstash" }, + "metadata_with_string" => "a string" + } + } ] + } + } end - pipeline_thread = Thread.new { pipeline.run } - event = queue.pop - insist { event["fields"]["message"] } == [ "ohayo" ] + let(:scroll_reponse) do + { + "_scroll_id" => "r453Wc1jh0caLJhSDg", + "hits" => { "hits" => [] } + } + end - pipeline_thread.join + let(:client) { Elasticsearch::Client.new } + + before do + expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + expect(client).to receive(:search).with(any_args).and_return(response) + allow(client).to receive(:scroll).with({ :body => "cXVlcnlUaGVuRmV0Y2g", :scroll => "1m" }).and_return(scroll_reponse) + end + + context 'when defining docinfo' do + let(:config_metadata) do + %q[ + input { + elasticsearch { + hosts => ["node01"] + scan => false + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + } + } + ] + end + + it 'merges the values if the `docinfo_target` already exist in the `_source` document' do + metadata_field = 'metadata_with_hash' + + config_metadata_with_hash = %Q[ + input { + elasticsearch { + hosts => ["node01"] + scan => false + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + docinfo_target => '#{metadata_field}' + } + } + ] + + event = fetch_event(config_metadata_with_hash) + + expect(event[metadata_field]["_index"]).to eq('logstash-2014.10.12') + expect(event[metadata_field]["_type"]).to eq('logs') + expect(event[metadata_field]["_id"]).to eq('C5b2xLQwTZa76jBmHIbwHQ') + expect(event[metadata_field]["awesome"]).to eq("logstash") + end + + it 'thows an exception if the `docinfo_target` exist but is not of type hash' do + metadata_field = 'metadata_with_string' + + config_metadata_with_string = %Q[ + input { + elasticsearch { + hosts => ["node01"] + scan => false + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + docinfo_target => '#{metadata_field}' + } + } + ] + + pipeline = LogStash::Pipeline.new(config_metadata_with_string) + queue = Queue.new + pipeline.instance_eval do + @output_func = lambda { |event| queue << event } + end + + expect { pipeline.run }.to raise_error(Exception, /incompatible event/) + end + + it "should move the document info to the @metadata field" do + event = fetch_event(config_metadata) + + expect(event["[@metadata][_index]"]).to eq('logstash-2014.10.12') + expect(event["[@metadata][_type]"]).to eq('logs') + expect(event["[@metadata][_id]"]).to eq('C5b2xLQwTZa76jBmHIbwHQ') + end + + it 'should move the document information to the specified field' do + config = %q[ + input { + elasticsearch { + hosts => ["node01"] + scan => false + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + docinfo_target => 'meta' + } + } + ] + event = fetch_event(config) + + expect(event["[meta][_index]"]).to eq('logstash-2014.10.12') + expect(event["[meta][_type]"]).to eq('logs') + expect(event["[meta][_id]"]).to eq('C5b2xLQwTZa76jBmHIbwHQ') + end + + it "should allow to specify which fields from the document info to save to the @metadata field" do + fields = ["_index"] + config = %Q[ + input { + elasticsearch { + hosts => ["node01"] + scan => false + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + docinfo => true + docinfo_fields => #{fields} + } + }] + + event = fetch_event(config) + + expect(event["@metadata"].keys).to eq(fields) + expect(event["[@metadata][_type]"]).to eq(nil) + expect(event["[@metadata][_index]"]).to eq('logstash-2014.10.12') + expect(event["[@metadata][_id]"]).to eq(nil) + end + end + + context "when not defining the docinfo" do + it 'should keep the document information in the root of the event' do + config = %q[ + input { + elasticsearch { + hosts => ["node01"] + scan => false + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + } + } + ] + event = fetch_event(config) + + expect(event["[@metadata][_index]"]).to eq(nil) + expect(event["[@metadata][_type]"]).to eq(nil) + expect(event["[@metadata][_id]"]).to eq(nil) + end + end end +end + +def fetch_event(config) + pipeline = LogStash::Pipeline.new(config) + queue = Queue.new + pipeline.instance_eval do + @output_func = lambda { |event| queue << event } + end + pipeline_thread = Thread.new { pipeline.run } + event = queue.pop + + pipeline_thread.join + + return event end