module Embulk module Output class ElasticsearchUsingUrl < OutputPlugin Plugin.register_output("elasticsearch_using_url", self) ENABLE_MODE = %w[normal update] def self.transaction(config, schema, count, &control) task = { "nodes" => config.param("nodes", :array, default: [{ 'url' => 'http://localhost:9200' }]), "request_timeout" => config.param("request_timeout", :integer, default: 60), "index" => config.param("index", :string, default: 'logstash-%Y.%m.%d'), "mode" => config.param("mode", :string, default: 'normal'), "reload_connections" => config.param("reload_connections", :bool, default: true), "reload_on_failure" => config.param("reload_on_failure", :bool, default: false), "delete_old_index" => config.param("delete_old_index", :bool, default: false), "index_type" => config.param("index_type", :string), "id_keys" => config.param("id_keys", :array, default: nil), "id_format" => config.param("id_format", :string, default: nil), "array_columns" => config.param("array_columns", :array, default: nil), "bulk_actions" => config.param("bulk_actions", :integer, default: 1000), "retry_on_failure" => config.param("retry_on_failure", :integer, default: 5), "before_template_name" => config.param("before_template_name", :string, default: nil), "before_template" => config.param("before_template", :hash, default: nil), } task['time_value'] = Time.now.strftime('%Y.%m.%d.%H.%M.%S') task['index'] = Time.now.strftime(task['index']) unless ENABLE_MODE.include?(task['mode']) raise ConfigError.new "`mode` must be one of #{ENABLE_MODE.join(', ')}" end Embulk.logger.info("mode => #{task['mode']}") if task['before_template_name'] && task['before_template'] client = create_client(task) Embulk.logger.info("put template => #{task['before_template_name']}") client.indices.put_template name: task['before_template_name'], body: task['before_template'] end return self.resume(task, schema, count, &control) end def self.resume(task, schema, count, &control) task_reports = yield(task) next_config_diff = {} return next_config_diff end def init @nodes = task["nodes"] @index_type = task["index_type"] @id_keys = task["id_keys"] @id_format = task["id_format"] @bulk_actions = task["bulk_actions"] @array_columns = task["array_columns"] @retry_on_failure = task["retry_on_failure"] @mode = task["mode"] @index = task['index'] @client = create_client(task) @bulk_message = [] end def close end def add(page) page.each do |record| hash = Hash[schema.names.zip(record)] action = (@mode == 'update') ? :update : :index meta = {} meta[action] = { _index: @index, _type: @index_type } meta[action][:_id] = generate_id(@id_format, hash, @id_keys) unless @id_keys.nil? source = generate_array(hash) @bulk_message << meta @bulk_message << source if @bulk_actions * 2 <= @bulk_message.size send end end end def finish if @bulk_message.size > 0 send end end def abort end def commit task_report = {} return task_report end private def create_client(task) return ::Elasticsearch::Client.new urls: task['nodes'].map{|v| v['url']}.join(','), reload_connections: task['reload_connections'], reload_on_failure: task['reload_on_failure'], retry_on_failure: task['retry_on_failure'], transport_options: { request: { timeout: task['request_timeout'] } } end def generate_array(record) result = {} record.each { |key, value| result[key] = value next if (value.nil? || !@array_columns) @array_columns.each do |array_column| if array_column['name'] == key array_value = value.split(array_column['delimiter']).reject(&:empty?) array_value = array_value.map(&:to_i) if array_column['is_integer'] result[key] = array_value end end } (@mode == 'update') ? {doc: result} : result end def generate_id(template, record, id_keys) template % id_keys.map { |key| record[key] } end def send retries = 0 begin @client.bulk body: @bulk_message Embulk.logger.info "bulk: #{@bulk_message.size/2} success." rescue => e if retries < @retry_on_failure retries += 1 Embulk.logger.warn "Could not push logs to Elasticsearch, resetting connection and trying again. #{e.message}" sleep 2**retries retry end raise "Could not push logs to Elasticsearch after #{retries} retries. #{e.message}" end @bulk_message.clear end end end end