Sha256: 07ad37a1a033c2fbb9939fd9879447ff7be6e708260a16476104a19ea3ad7a32
Contents?: true
Size: 1.62 KB
Versions: 1
Compression:
Stored size: 1.62 KB
Contents
module Fluent class RiakOutput < BufferedOutput Fluent::Plugin.register_output('riak', self) include SetTimeKeyMixin config_set_default :include_tag_key, true include SetTagKeyMixin config_set_default :include_time_key, true config_param :nodes, :string, :default => "localhost:8087" def initialize super require 'riak' require 'msgpack' require 'uuidtools' end def configure(conf) super @nodes = @nodes.split(',').map{ |s| ip,port = s.split(':') {:host => ip, :pb_port => port.to_i} } $log.info "riak nodes=#{@nodes}" end def start $log.debug " => #{@buffer.chunk_limit} #{@buffer.queue_limit} " @conn = Riak::Client.new(:nodes => @nodes, :protocol => "pbc") @bucket = @conn.bucket("fluentlog") @buf = {} super end def format(tag, time, record) [time, tag, record].to_msgpack end def write(chunk) $log.debug " <<<<<===========\n" records = [] tags = [] chunk.msgpack_each { |time, tag, record| record[@tag_key] = tag tags << tag records << record $log.debug record } put_now(records, tags) end private # TODO: add index for some analysis def put_now(records, tags) today = Date.today key = "#{today.to_s}-#{UUIDTools::UUID.random_create.to_s}" robj = Riak::RObject.new(@bucket, key) robj.raw_data = records.to_json robj.indexes['year_int'] << today.year robj.indexes['month_bin'] << "#{today.year}-#{today.month}" tags.each do |tag| robj.indexes['tag_bin'] << tag end robj.content_type = 'application/json' robj.store robj end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-riak-0.0.2 | lib/fluent/plugin/out_riak.rb |