Sha256: 8feb0afe1652b12c20425e1eab192fe28f19446b4ec49af97d68a47cf42c47ef

Contents?: true

Size: 1.81 KB

Versions: 7

Compression:

Stored size: 1.81 KB

Contents

require 'net/http'
require 'date'

class Fluent::MysqlReplicatorElasticsearchOutput < Fluent::BufferedOutput
  Fluent::Plugin.register_output('mysql_replicator_elasticsearch', self)

  config_param :host, :string,  :default => 'localhost'
  config_param :port, :integer, :default => 9200
  config_param :tag_format, :string, :default => nil

  DEFAULT_TAG_FORMAT = /(?<index_name>[^\.]+)\.(?<type_name>[^\.]+)\.(?<event>[^\.]+)\.(?<primary_key>[^\.]+)$/

  def initialize
    super
  end

  def configure(conf)
    super

    if @tag_format.nil? || @tag_format == DEFAULT_TAG_FORMAT
      @tag_format = DEFAULT_TAG_FORMAT
    else
      @tag_format = Regexp.new(conf['tag_format'])
    end
  end

  def start
    super
  end

  def format(tag, time, record)
    [tag, time, record].to_msgpack
  end

  def shutdown
    super
  end

  def write(chunk)
    bulk_message = []

    chunk.msgpack_each do |tag, time, record|
      tag_parts = tag.match(@tag_format)
      target_index = tag_parts['index_name']
      target_type = tag_parts['type_name']
      id_key = tag_parts['primary_key']

      if tag_parts['event'] == 'delete'
        meta = { "delete" => {"_index" => target_index, "_type" => target_type, "_id" => record[id_key]} }
        bulk_message << Yajl::Encoder.encode(meta)
      else
        meta = { "index" => {"_index" => target_index, "_type" => target_type} }
        if id_key && record[id_key]
          meta['index']['_id'] = record[id_key]
        end
        bulk_message << Yajl::Encoder.encode(meta)
        bulk_message << Yajl::Encoder.encode(record)
      end
    end
    bulk_message << ""

    http = Net::HTTP.new(@host, @port.to_i)
    request = Net::HTTP::Post.new('/_bulk', {'content-type' => 'application/json; charset=utf-8'})
    request.body = bulk_message.join("\n")
    http.request(request).value
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
fluent-plugin-mysql-replicator-0.4.3 lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
fluent-plugin-mysql-replicator-0.4.2 lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
fluent-plugin-mysql-replicator-0.4.1 lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
fluent-plugin-mysql-replicator-0.4.0 lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
fluent-plugin-mysql-replicator-0.3.1 lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
fluent-plugin-mysql-replicator-0.3.0 lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb
fluent-plugin-mysql-replicator-0.2.3 lib/fluent/plugin/out_mysql_replicator_elasticsearch.rb