Sha256: 73c2c9b63213bc4807ed38534058138878a5673147a209753d941fae075c0e5e

Contents?: true

Size: 1.23 KB

Versions: 1

Compression:

Stored size: 1.23 KB

Contents

module Fluent
  class RedisListOutput < BufferedOutput
    Fluent::Plugin.register_output('redislist', self)
    attr_reader :host, :port, :database, :redis

    def initialize
      super
      require 'redis'
      require 'msgpack'
      require 'socket'
      require 'json'
    end

    def configure(conf)
      super

      @host = conf['host'] || 'localhost'
      @port = ( conf['port'] || '6379' ).to_i
      @database = ( conf['database'] || '0' ).to_i
    end

    def start
      super

      @whereami = Socket.gethostname
      @redis = Redis.new(:host => @host, :port => @port, :db => @database)
    end

    def shutdown
      @redis.quit
    end

    def format(tag, time, record)
      record["@node"] = @whereami
      record["@timestamp"] = Time.at(time).to_s # to avoid LOGSTASH-1340
      record["@key"] = tag
      record.to_msgpack
    end

    def write(chunk)
      @redis.pipelined {
        chunk.open { |io|
          begin
            MessagePack::Unpacker.new(io).each { |record|
              key = record.delete('@key')
              @redis.rpush key, record.to_json
            }
          rescue EOFError
            # EOFError always occured when reached end of chunk.
          end
        }
      }
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-redislist-0.1.0 lib/fluent/plugin/out_redislist.rb