spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-0.1.0 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-0.1.1
- old
+ new
@@ -1,80 +1,126 @@
-require "spec_helper"
+require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/elasticsearch"
+require "elasticsearch"
describe "inputs/elasticsearch" do
-
- search_response = <<-RESPONSE
- {
- "_scroll_id":"xxx",
- "took":5,
- "timed_out":false,
- "_shards":{"total":15,"successful":15,"failed":0},
- "hits":{
- "total":1000050,
- "max_score":1.0,
- "hits":[
- {
- "_index":"logstash2",
- "_type":"logs",
- "_id":"AmaqL7VuSWKF-F6N_Gz72g",
- "_score":1.0,
- "_source" : {
- "message":"foobar",
- "@version":"1",
- "@timestamp":"2014-05-19T21:08:39.000Z",
- "host":"colin-mbp13r"
- }
- }
- ]
+ it "should retrieve json event from elasticseach" do
+
+ config = %q[
+ input {
+ elasticsearch {
+ hosts => ["node01"]
+ scan => false
+ query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
+ }
}
- }
- RESPONSE
+ ]
- scroll_response = <<-RESPONSE
- {
- "hits":{
- "hits":[]
+ response = {
+ "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g",
+ "took" => 27,
+ "timed_out" => false,
+ "_shards" => {
+ "total" => 169,
+ "successful" => 169,
+ "failed" => 0
+ },
+ "hits" => {
+ "total" => 1,
+ "max_score" => 1.0,
+ "hits" => [ {
+ "_index" => "logstash-2014.10.12",
+ "_type" => "logs",
+ "_id" => "C5b2xLQwTZa76jBmHIbwHQ",
+ "_score" => 1.0,
+ "fields" => {"message" => ["ohayo"] }
+ } ]
}
}
- RESPONSE
- config <<-CONFIG
- input {
- elasticsearch {
- host => "localhost"
- scan => false
+ scroll_reponse = {
+ "_scroll_id" => "r453Wc1jh0caLJhSDg",
+ "hits" => { "hits" => [] }
+ }
+
+ client = Elasticsearch::Client.new
+ expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
+ expect(client).to receive(:search).with(any_args).and_return(response)
+ expect(client).to receive(:scroll).with({:body=>"cXVlcnlUaGVuRmV0Y2g", :scroll=>"1m"}).and_return(scroll_reponse)
+
+ pipeline = LogStash::Pipeline.new(config)
+ queue = Queue.new
+ pipeline.instance_eval do
+ @output_func = lambda { |event| queue << event }
+ end
+ pipeline_thread = Thread.new { pipeline.run }
+ event = queue.pop
+
+ insist { event["fields"]["message"] } == [ "ohayo" ]
+
+ pipeline_thread.join
+ end
+
+ it "should retrieve json event from elasticseach with scan" do
+
+ config = %q[
+ input {
+ elasticsearch {
+ hosts => ["node01"]
+ scan => true
+ query => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }'
+ }
}
+ ]
+
+ scan_response = {
+ "_scroll_id" => "DcrY3G1xff6SB",
}
- CONFIG
- it "should retrieve json event from elasticseach" do
- # I somewhat duplicated our "input" rspec extension because I needed to add mocks for the the actual ES calls
- # and rspec expectations need to be in "it" statement but the "input" extension defines the "it"
- # TODO(colin) see how we can improve our rspec extension to better integrate in these scenarios
+ scroll_responses = [
+ {
+ "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g",
+ "took" => 27,
+ "timed_out" => false,
+ "_shards" => {
+ "total" => 169,
+ "successful" => 169,
+ "failed" => 0
+ },
+ "hits" => {
+ "total" => 1,
+ "max_score" => 1.0,
+ "hits" => [ {
+ "_index" => "logstash-2014.10.12",
+ "_type" => "logs",
+ "_id" => "C5b2xLQwTZa76jBmHIbwHQ",
+ "_score" => 1.0,
+ "fields" => {"message" => ["ohayo"] }
+ } ]
+ }
+ },
+ {
+ "_scroll_id" => "r453Wc1jh0caLJhSDg",
+ "hits" => { "hits" => [] }
+ }
+ ]
- expect_any_instance_of(LogStash::Inputs::Elasticsearch).to receive(:execute_search_request).and_return(search_response)
- expect_any_instance_of(LogStash::Inputs::Elasticsearch).to receive(:execute_scroll_request).with(any_args).and_return(scroll_response)
+ client = Elasticsearch::Client.new
+ expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
+ expect(client).to receive(:search).with(any_args).and_return(scan_response)
+ expect(client).to receive(:scroll).with({:body=>"DcrY3G1xff6SB", :scroll=>"1m"}).and_return(scroll_responses.first)
+ expect(client).to receive(:scroll).with({:body=>"cXVlcnlUaGVuRmV0Y2g", :scroll=>"1m"}).and_return(scroll_responses.last)
pipeline = LogStash::Pipeline.new(config)
queue = Queue.new
pipeline.instance_eval do
@output_func = lambda { |event| queue << event }
end
pipeline_thread = Thread.new { pipeline.run }
event = queue.pop
- insist { event["message"] } == "foobar"
+ insist { event["fields"]["message"] } == [ "ohayo" ]
- # do not call pipeline.shutdown here, as it will stop the plugin execution randomly
- # and maybe kill input before calling execute_scroll_request.
- # TODO(colin) we should rework the pipeliene shutdown to allow a soft/clean shutdown mecanism,
- # using a shutdown event which can be fed into each plugin queue and when the plugin sees it
- # exits after completing its processing.
- #
- # pipeline.shutdown
- #
- # instead, since our scroll_response will terminate the plugin, we can just join the pipeline thread
pipeline_thread.join
end
end