Sha256: 73c2c9b63213bc4807ed38534058138878a5673147a209753d941fae075c0e5e
Contents?: true
Size: 1.23 KB
Versions: 1
Compression:
Stored size: 1.23 KB
Contents
module Fluent class RedisListOutput < BufferedOutput Fluent::Plugin.register_output('redislist', self) attr_reader :host, :port, :database, :redis def initialize super require 'redis' require 'msgpack' require 'socket' require 'json' end def configure(conf) super @host = conf['host'] || 'localhost' @port = ( conf['port'] || '6379' ).to_i @database = ( conf['database'] || '0' ).to_i end def start super @whereami = Socket.gethostname @redis = Redis.new(:host => @host, :port => @port, :db => @database) end def shutdown @redis.quit end def format(tag, time, record) record["@node"] = @whereami record["@timestamp"] = Time.at(time).to_s # to avoid LOGSTASH-1340 record["@key"] = tag record.to_msgpack end def write(chunk) @redis.pipelined { chunk.open { |io| begin MessagePack::Unpacker.new(io).each { |record| key = record.delete('@key') @redis.rpush key, record.to_json } rescue EOFError # EOFError always occured when reached end of chunk. end } } end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-redislist-0.1.0 | lib/fluent/plugin/out_redislist.rb |