spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.18.0 vs spec/inputs/elasticsearch_spec.rb in logstash-input-elasticsearch-4.19.0
- old
+ new
@@ -16,11 +16,12 @@
describe LogStash::Inputs::Elasticsearch, :ecs_compatibility_support do
let(:plugin) { described_class.new(config) }
let(:queue) { Queue.new }
let(:build_flavor) { "default" }
- let(:cluster_info) { {"version" => {"number" => "7.5.0", "build_flavor" => build_flavor}, "tagline" => "You Know, for Search"} }
+ let(:es_version) { "7.5.0" }
+ let(:cluster_info) { {"version" => {"number" => es_version, "build_flavor" => build_flavor}, "tagline" => "You Know, for Search"} }
before(:each) do
Elasticsearch::Client.send(:define_method, :ping) { } # define no-action ping method
allow_any_instance_of(Elasticsearch::Client).to receive(:info).and_return(cluster_info)
end
@@ -100,10 +101,30 @@
end
it "should raise an exception with negative number" do
expect { plugin.register }.to raise_error(LogStash::ConfigurationError)
end
end
+
+ context "search_api" do
+ before(:each) do
+ plugin.register
+ end
+
+ context "ES 8" do
+ let(:es_version) { "8.10.0" }
+ it "resolves `auto` to `search_after`" do
+ expect(plugin.instance_variable_get(:@paginated_search)).to be_a LogStash::Inputs::Elasticsearch::SearchAfter
+ end
+ end
+
+ context "ES 7" do
+ let(:es_version) { "7.17.0" }
+ it "resolves `auto` to `scroll`" do
+ expect(plugin.instance_variable_get(:@paginated_search)).to be_a LogStash::Inputs::Elasticsearch::Scroll
+ end
+ end
+ end
end
it_behaves_like "an interruptible input plugin" do
let(:config) do
{
@@ -242,40 +263,43 @@
end
end
context 'with `slices => 1`' do
let(:slices) { 1 }
+ before { plugin.register }
+
it 'runs just one slice' do
- expect(plugin).to receive(:do_run_slice).with(duck_type(:<<))
+ expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), nil)
expect(Thread).to_not receive(:new)
- plugin.register
plugin.run([])
end
end
context 'without slices directive' do
let(:config) { super().tap { |h| h.delete('slices') } }
+ before { plugin.register }
+
it 'runs just one slice' do
- expect(plugin).to receive(:do_run_slice).with(duck_type(:<<))
+ expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), nil)
expect(Thread).to_not receive(:new)
- plugin.register
plugin.run([])
end
end
2.upto(8) do |slice_count|
context "with `slices => #{slice_count}`" do
let(:slices) { slice_count }
+ before { plugin.register }
+
it "runs #{slice_count} independent slices" do
expect(Thread).to receive(:new).and_call_original.exactly(slice_count).times
slice_count.times do |slice_id|
- expect(plugin).to receive(:do_run_slice).with(duck_type(:<<), slice_id)
+ expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).with(duck_type(:<<), slice_id)
end
- plugin.register
plugin.run([])
end
end
end
@@ -397,12 +421,12 @@
# SLICE1 is a two-page scroll in which the last page has no next scroll id
slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2}))
expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0)
expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_return(slice1_response1)
- synchronize_method!(plugin, :scroll_request)
- synchronize_method!(plugin, :search_request)
+ synchronize_method!(plugin.instance_variable_get(:@paginated_search), :next_page)
+ synchronize_method!(plugin.instance_variable_get(:@paginated_search), :initial_search)
end
let(:client) { Elasticsearch::Client.new }
let(:emitted_events) do
@@ -467,18 +491,18 @@
# SLICE1 is a two-page scroll in which the last page throws exception
slice1_query = LogStash::Json.dump(query.merge('slice' => { 'id' => 1, 'max' => 2}))
expect(client).to receive(:search).with(hash_including(:body => slice1_query)).and_return(slice1_response0)
expect(client).to receive(:scroll).with(hash_including(:body => { :scroll_id => slice1_scroll1 })).and_raise("boom")
- synchronize_method!(plugin, :scroll_request)
- synchronize_method!(plugin, :search_request)
+ synchronize_method!(plugin.instance_variable_get(:@paginated_search), :next_page)
+ synchronize_method!(plugin.instance_variable_get(:@paginated_search), :initial_search)
end
let(:client) { Elasticsearch::Client.new }
it 'insert event to queue without waiting other slices' do
- expect(plugin).to receive(:do_run_slice).twice.and_wrap_original do |m, *args|
+ expect(plugin.instance_variable_get(:@paginated_search)).to receive(:search).twice.and_wrap_original do |m, *args|
q = args[0]
slice_id = args[1]
if slice_id == 0
m.call(*args)
expect(q.size).to eq(3)
@@ -994,11 +1018,11 @@
plugin.register
end
it "should properly schedule" do
begin
- expect(plugin).to receive(:do_run) {
+ expect(plugin.instance_variable_get(:@paginated_search)).to receive(:do_run) {
queue << LogStash::Event.new({})
}.at_least(:twice)
runner = Thread.start { plugin.run(queue) }
expect(queue.pop).not_to be_nil
cron_jobs = plugin.instance_variable_get(:@_scheduler).instance_variable_get(:@impl).jobs
@@ -1011,80 +1035,110 @@
end
end
context "retries" do
- let(:mock_response) do
- {
- "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g",
- "took" => 27,
- "timed_out" => false,
- "_shards" => {
- "total" => 169,
- "successful" => 169,
- "failed" => 0
- },
- "hits" => {
- "total" => 1,
- "max_score" => 1.0,
- "hits" => [ {
- "_index" => "logstash-2014.10.12",
- "_type" => "logs",
- "_id" => "C5b2xLQwTZa76jBmHIbwHQ",
- "_score" => 1.0,
- "_source" => { "message" => ["ohayo"] }
- } ]
- }
- }
- end
-
- let(:mock_scroll_response) do
- {
- "_scroll_id" => "r453Wc1jh0caLJhSDg",
- "hits" => { "hits" => [] }
- }
- end
-
- before(:each) do
- client = Elasticsearch::Client.new
- allow(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
- allow(client).to receive(:search).with(any_args).and_return(mock_response)
- allow(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(mock_scroll_response)
- allow(client).to receive(:clear_scroll).and_return(nil)
- allow(client).to receive(:ping)
- end
-
+ let(:client) { Elasticsearch::Client.new }
let(:config) do
{
"hosts" => ["localhost"],
"query" => '{ "query": { "match": { "city_name": "Okinawa" } }, "fields": ["message"] }',
"retries" => 1
}
end
- it "retry and log error when all search request fail" do
- expect(plugin.logger).to receive(:error).with(/Tried .* unsuccessfully/,
- hash_including(:message => 'Manticore::UnknownException'))
- expect(plugin.logger).to receive(:warn).twice.with(/Attempt to .* but failed/,
- hash_including(:exception => "Manticore::UnknownException"))
- expect(plugin).to receive(:search_request).with(instance_of(Hash)).and_raise(Manticore::UnknownException).at_least(:twice)
+ shared_examples "a retryable plugin" do
+ it "retry and log error when all search request fail" do
+ expect_any_instance_of(LogStash::Helpers::LoggableTry).to receive(:log_failure).with(instance_of(Manticore::UnknownException), instance_of(Integer), instance_of(String)).twice
+ expect(client).to receive(:search).with(instance_of(Hash)).and_raise(Manticore::UnknownException).at_least(:twice)
- plugin.register
+ plugin.register
- expect{ plugin.run(queue) }.not_to raise_error
- expect(queue.size).to eq(0)
+ expect{ plugin.run(queue) }.not_to raise_error
+ end
+
+ it "retry successfully when search request fail for one time" do
+ expect_any_instance_of(LogStash::Helpers::LoggableTry).to receive(:log_failure).with(instance_of(Manticore::UnknownException), 1, instance_of(String))
+ expect(client).to receive(:search).with(instance_of(Hash)).once.and_raise(Manticore::UnknownException)
+ expect(client).to receive(:search).with(instance_of(Hash)).once.and_return(search_response)
+
+ plugin.register
+
+ expect{ plugin.run(queue) }.not_to raise_error
+ end
end
- it "retry successfully when search request fail for one time" do
- expect(plugin.logger).to receive(:warn).once.with(/Attempt to .* but failed/,
- hash_including(:exception => "Manticore::UnknownException"))
- expect(plugin).to receive(:search_request).with(instance_of(Hash)).once.and_raise(Manticore::UnknownException)
- expect(plugin).to receive(:search_request).with(instance_of(Hash)).once.and_call_original
+ describe "scroll" do
+ let(:search_response) do
+ {
+ "_scroll_id" => "cXVlcnlUaGVuRmV0Y2g",
+ "took" => 27,
+ "timed_out" => false,
+ "_shards" => {
+ "total" => 169,
+ "successful" => 169,
+ "failed" => 0
+ },
+ "hits" => {
+ "total" => 1,
+ "max_score" => 1.0,
+ "hits" => [ {
+ "_index" => "logstash-2014.10.12",
+ "_type" => "logs",
+ "_id" => "C5b2xLQwTZa76jBmHIbwHQ",
+ "_score" => 1.0,
+ "_source" => { "message" => ["ohayo"] }
+ } ]
+ }
+ }
+ end
- plugin.register
+ let(:empty_scroll_response) do
+ {
+ "_scroll_id" => "r453Wc1jh0caLJhSDg",
+ "hits" => { "hits" => [] }
+ }
+ end
- expect{ plugin.run(queue) }.not_to raise_error
- expect(queue.size).to eq(1)
+ before(:each) do
+ allow(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
+ allow(client).to receive(:scroll).with({ :body => { :scroll_id => "cXVlcnlUaGVuRmV0Y2g" }, :scroll=> "1m" }).and_return(empty_scroll_response)
+ allow(client).to receive(:clear_scroll).and_return(nil)
+ allow(client).to receive(:ping)
+ end
+
+ it_behaves_like "a retryable plugin"
+ end
+
+ describe "search_after" do
+ let(:es_version) { "8.10.0" }
+ let(:config) { super().merge({ "search_api" => "search_after" }) }
+
+ let(:search_response) do
+ {
+ "took" => 27,
+ "timed_out" => false,
+ "_shards" => {
+ "total" => 169,
+ "successful" => 169,
+ "failed" => 0
+ },
+ "hits" => {
+ "total" => 1,
+ "max_score" => 1.0,
+ "hits" => [ ] # empty hits to break the loop
+ }
+ }
+ end
+
+ before(:each) do
+ expect(Elasticsearch::Client).to receive(:new).with(any_args).and_return(client)
+ expect(client).to receive(:open_point_in_time).once.and_return({ "id" => "cXVlcnlUaGVuRmV0Y2g"})
+ expect(client).to receive(:close_point_in_time).once.and_return(nil)
+ expect(client).to receive(:ping)
+ end
+
+ it_behaves_like "a retryable plugin"
end
end
# @note can be removed once we depends on elasticsearch gem >= 6.x
def extract_transport(client) # on 7.x client.transport is a ES::Transport::Client