spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-0.1.0 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-0.1.1

- old
+ new

@@ -1,80 +1,126 @@ -require "spec_helper" +require "logstash/devutils/rspec/spec_helper" require "logstash/inputs/elasticsearch" +require "elasticsearch" describe "inputs/elasticsearch" do - - search_response = <<-RESPONSE - { - "_scroll_id":"xxx", - "took":5, - "timed_out":false, - "_shards":{"total":15,"successful":15,"failed":0}, - "hits":{ - "total":1000050, - "max_score":1.0, - "hits":[ - { - "_index":"logstash2", - "_type":"logs", - "_id":"AmaqL7VuSWKF-F6N_Gz72g", - "_score":1.0, - "_source" : { - "message":"foobar", - "@version":"1", - "@timestamp":"2014-05-19T21:08:39.000Z", - "host":"colin-mbp13r" - } - } - ] + it "should retrieve json event from elasticseach" do + + config = %q[ + input { + elasticsearch { + hosts => ["node01"] + scan => false + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + } } - } - RESPONSE + ] - scroll_response = <<-RESPONSE - { - "hits":{ - "hits":[] + response = { + "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g", + "took" => 27, + "timed_out" => false, + "_shards" => { + "total" => 169, + "successful" => 169, + "failed" => 0 + }, + "hits" => { + "total" => 1, + "max_score" => 1.0, + "hits" => [ { + "_index" => "logstash-2014.10.12", + "_type" => "logs", + "_id" => "C5b2xLQwTZa76jBmHIbwHQ", + "_score" => 1.0, + "fields" => {"message" => ["ohayo"] } + } ] } } - RESPONSE - config <<-CONFIG - input { - elasticsearch { - host => "localhost" - scan => false + scroll_reponse = { + "_scroll_id" => "r453Wc1jh0caLJhSDg", + "hits" => { "hits" => [] } + } + + client = Elasticsearch::Client.new + expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + expect(client).to receive(:search).with(any_args).and_return(response) + expect(client).to receive(:scroll).with({:body=>"cXVlcnlUaGVuRmV0Y2g", :scroll=>"1m"}).and_return(scroll_reponse) + + pipeline = LogStash::Pipeline.new(config) + queue = Queue.new + pipeline.instance_eval do + @output_func = lambda { |event| queue << event } + end + pipeline_thread = Thread.new { pipeline.run } + event = queue.pop + + insist { event["fields"]["message"] } == [ "ohayo" ] + + pipeline_thread.join + end + + it "should retrieve json event from elasticseach with scan" do + + config = %q[ + input { + elasticsearch { + hosts => ["node01"] + scan => true + query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }' + } } + ] + + scan_response = { + "_scroll_id" => "DcrY3G1xff6SB", } - CONFIG - it "should retrieve json event from elasticseach" do - # I somewhat duplicated our "input" rspec extension because I needed to add mocks for the the actual ES calls - # and rspec expectations need to be in "it" statement but the "input" extension defines the "it" - # TODO(colin) see how we can improve our rspec extension to better integrate in these scenarios + scroll_responses = [ + { + "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g", + "took" => 27, + "timed_out" => false, + "_shards" => { + "total" => 169, + "successful" => 169, + "failed" => 0 + }, + "hits" => { + "total" => 1, + "max_score" => 1.0, + "hits" => [ { + "_index" => "logstash-2014.10.12", + "_type" => "logs", + "_id" => "C5b2xLQwTZa76jBmHIbwHQ", + "_score" => 1.0, + "fields" => {"message" => ["ohayo"] } + } ] + } + }, + { + "_scroll_id" => "r453Wc1jh0caLJhSDg", + "hits" => { "hits" => [] } + } + ] - expect_any_instance_of(LogStash::Inputs::Elasticsearch).to receive(:execute_search_request).and_return(search_response) - expect_any_instance_of(LogStash::Inputs::Elasticsearch).to receive(:execute_scroll_request).with(any_args).and_return(scroll_response) + client = Elasticsearch::Client.new + expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client) + expect(client).to receive(:search).with(any_args).and_return(scan_response) + expect(client).to receive(:scroll).with({:body=>"DcrY3G1xff6SB", :scroll=>"1m"}).and_return(scroll_responses.first) + expect(client).to receive(:scroll).with({:body=>"cXVlcnlUaGVuRmV0Y2g", :scroll=>"1m"}).and_return(scroll_responses.last) pipeline = LogStash::Pipeline.new(config) queue = Queue.new pipeline.instance_eval do @output_func = lambda { |event| queue << event } end pipeline_thread = Thread.new { pipeline.run } event = queue.pop - insist { event["message"] } == "foobar" + insist { event["fields"]["message"] } == [ "ohayo" ] - # do not call pipeline.shutdown here, as it will stop the plugin execution randomly - # and maybe kill input before calling execute_scroll_request. - # TODO(colin) we should rework the pipeliene shutdown to allow a soft/clean shutdown mecanism, - # using a shutdown event which can be fed into each plugin queue and when the plugin sees it - # exits after completing its processing. - # - # pipeline.shutdown - # - # instead, since our scroll_response will terminate the plugin, we can just join the pipeline thread pipeline_thread.join end end