lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.1 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.2
- old
+ new
@@ -59,10 +59,11 @@
@limit_size = task['limit_size']
@fields = task['fields']
@sort = task['sort']
@add_query_to_record = task['add_query_to_record']
@scroll = task['scroll']
+ @retry_on_failure = task['retry_on_failure']
end
def run
search(@index_type, @per_size, @routing, @fields, @sort)
page_builder.finish
@@ -80,21 +81,37 @@
end
def search_with_query(query, type, size, routing, fields, sort)
search_option = get_search_option(type, query, size, fields, sort)
Embulk.logger.info("#{search_option}")
- r = @client.search(search_option)
+ r = search_with_retry { @client.search(search_option) }
i = 0
get_sources(r, fields).each do |result|
result_proc(result, query)
return if @limit_size == (i += 1)
end
- while r = @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) and (not r['hits']['hits'].empty?) do
+ while r = (search_with_retry { @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) }) and (not r['hits']['hits'].empty?) do
get_sources(r, fields).each do |result|
result_proc(result, query)
return if @limit_size == (i += 1)
end
+ end
+ end
+
+ def search_with_retry
+ retries = 0
+ begin
+ yield if block_given?
+ rescue => e
+ if retries < @retry_on_failure
+ retries += 1
+ Embulk.logger.warn "Could not search to Elasticsearch, resetting connection and trying again. #{e.message}"
+ sleep 2**retries
+ retry
+ end
+ Embulk.logger.error "Could not search to Elasticsearch after #{retries} retries. #{e.message}"
+ raise
end
end
def result_proc(result, query)
if @add_query_to_record