spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.1.1 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.2.0
- old
+ new
@@ -1,15 +1,26 @@
# encoding: utf-8
require "logstash/devutils/rspec/spec_helper"
require "logstash/inputs/elasticsearch"
require "elasticsearch"
+require "timecop"
+require "stud/temporary"
+require "time"
+require "date"
describe LogStash::Inputs::Elasticsearch do
+ let(:plugin) { LogStash::Inputs::Elasticsearch.new(config) }
+ let(:queue) { Queue.new }
+
it_behaves_like "an interruptible input plugin" do
let(:esclient) { double("elasticsearch-client") }
- let(:config) { { } }
+ let(:config) do
+ {
+ "schedule" => "* * * * * UTC"
+ }
+ end
before :each do
allow(Elasticsearch::Client).to receive(:new).and_return(esclient)
hit = {
"_index" => "logstash-2014.10.12",
@@ -269,6 +280,68 @@
expect(event.get("[@metadata][_type]")).to eq(nil)
expect(event.get("[@metadata][_id]")).to eq(nil)
end
end
end
+
+ context "when scheduling" do
+ let(:config) do
+ {
+ "hosts" => ["localhost"],
+ "query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }',
+ "schedule" => "* * * * * UTC"
+ }
+ end
+
+ response = {
+ "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g",
+ "took" => 27,
+ "timed_out" => false,
+ "_shards" => {
+ "total" => 169,
+ "successful" => 169,
+ "failed" => 0
+ },
+ "hits" => {
+ "total" => 1,
+ "max_score" => 1.0,
+ "hits" => [ {
+ "_index" => "logstash-2014.10.12",
+ "_type" => "logs",
+ "_id" => "C5b2xLQwTZa76jBmHIbwHQ",
+ "_score" => 1.0,
+ "_source" => { "message" => ["ohayo"] }
+ } ]
+ }
+ }
+
+ scroll_reponse = {
+ "_scroll_id" => "r453Wc1jh0caLJhSDg",
+ "hits" => { "hits" => [] }
+ }
+
+ before do
+ plugin.register
+ end
+
+ it "should properly schedule" do
+
+ Timecop.travel(Time.new(2000))
+ Timecop.scale(60)
+ runner = Thread.new do
+ expect(plugin).to receive(:do_run) {
+ queue << LogStash::Event.new({})
+ }.at_least(:twice)
+
+ plugin.run(queue)
+ end
+ sleep 3
+ plugin.stop
+ runner.kill
+ runner.join
+ expect(queue.size).to eq(2)
+ Timecop.return
+ end
+
+ end
+
end