lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.1 vs lib/embulk/input/elasticsearch.rb in embulk-input-elasticsearch-0.3.2

- old
+ new

@@ -59,10 +59,11 @@ @limit_size = task['limit_size'] @fields = task['fields'] @sort = task['sort'] @add_query_to_record = task['add_query_to_record'] @scroll = task['scroll'] + @retry_on_failure = task['retry_on_failure'] end def run search(@index_type, @per_size, @routing, @fields, @sort) page_builder.finish @@ -80,21 +81,37 @@ end def search_with_query(query, type, size, routing, fields, sort) search_option = get_search_option(type, query, size, fields, sort) Embulk.logger.info("#{search_option}") - r = @client.search(search_option) + r = search_with_retry { @client.search(search_option) } i = 0 get_sources(r, fields).each do |result| result_proc(result, query) return if @limit_size == (i += 1) end - while r = @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) and (not r['hits']['hits'].empty?) do + while r = (search_with_retry { @client.scroll(scroll_id: r['_scroll_id'], scroll: @scroll) }) and (not r['hits']['hits'].empty?) do get_sources(r, fields).each do |result| result_proc(result, query) return if @limit_size == (i += 1) end + end + end + + def search_with_retry + retries = 0 + begin + yield if block_given? + rescue => e + if retries < @retry_on_failure + retries += 1 + Embulk.logger.warn "Could not search to Elasticsearch, resetting connection and trying again. #{e.message}" + sleep 2**retries + retry + end + Embulk.logger.error "Could not search to Elasticsearch after #{retries} retries. #{e.message}" + raise end end def result_proc(result, query) if @add_query_to_record