Sha256: 0ac5bdae457eec59fe19c6160ea9c8cab5d1c779be1f5426f3b51a869fb7306d

Contents?: true

Size: 1.75 KB

Versions: 1

Compression:

Stored size: 1.75 KB

Contents

module Fluent

class RiakOutput < BufferedOutput

  Fluent::Plugin.register_output('riak', self)
  include SetTimeKeyMixin
  config_set_default :include_tag_key, true
  include SetTagKeyMixin
  config_set_default :include_time_key, true

  config_param :bucket_name, :string, :default => "fluentlog"
  config_param :nodes, :string, :default => "localhost:8087"

  def initialize
    super
    require 'riak'
    require 'msgpack'
    require 'uuidtools'
  end

  def configure(conf)
    super

    @nodes = @nodes.split(',').map{ |s|
      ip,port = s.split(':')
      {:host => ip, :pb_port => port.to_i}
    }
    $log.info "riak nodes=#{@nodes}"
  end

  def start
    $log.debug " => #{@buffer.chunk_limit} #{@buffer.queue_limit} "
    @conn = Riak::Client.new(:nodes => @nodes, :protocol => "pbc")
    @bucket = @conn.bucket(@bucket_name)
    @buf = {}

    super
  end

  def format(tag, time, record)
    [time, tag, record].to_msgpack
  end

  def write(chunk)
    $log.debug " <<<<<===========\n"

    records  = []
    tags = []
    chunk.msgpack_each { |time, tag, record|
      record[@tag_key] = tag
      tags << tag
      records << record
      $log.debug record
    }
    put_now(records, tags)
  end

  private

  # TODO: add index for some analysis
  def put_now(records, tags)
    if not records.empty? then
      today = Date.today
      key = "#{today.to_s}-#{UUIDTools::UUID.random_create.to_s}"
      robj = Riak::RObject.new(@bucket, key)
      robj.raw_data = records.to_json
      robj.indexes['year_int'] << today.year
      robj.indexes['month_bin'] << "#{today.year}-#{"%02d" % today.month}"
      tags.each do |tag|
        robj.indexes['tag_bin'] << tag
      end
      robj.content_type = 'application/json'
      robj.store
      robj
    end
  end

end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-riak-0.0.3 lib/fluent/plugin/out_riak.rb