Sha256: 04687181a6b0ec6b2614da3c472ebf6f178fe4a50d5df6e0c7d940122f5b51c5

Contents?: true

Size: 1.62 KB

Versions: 2

Compression:

Stored size: 1.62 KB

Contents

module Fluent
  class RedisOutput < BufferedOutput
    Fluent::Plugin.register_output('redis', self)
    attr_reader :redis

    config_param :host, :string, :default => 'localhost'
    config_param :port, :integer, :default => 6379
    config_param :db_number, :integer, :default => 0
    config_param :password, :string, :default => nil, :secret => true

    # To support log_level option implemented by Fluentd v0.10.43
    unless method_defined?(:log)
      define_method("log") { $log }
    end

    def initialize
      super
      require 'redis'
      require 'msgpack'
    end

    def configure(conf)
      super

      if conf.has_key?('namespace')
        log.warn "namespace option has been removed from fluent-plugin-redis 0.1.3. Please add or remove the namespace '#{conf['namespace']}' manually."
      end
    end

    def start
      super

      options = {
        :host => @host,
        :port => @port,
        :thread_safe => true,
        :db => @db_number
      }
      options[:password] = @password if @password

      @redis = Redis.new(options)
    end

    def shutdown
      @redis.quit
      super
    end

    def format(tag, time, record)
      identifier = [tag, time].join(".")
      [identifier, record].to_msgpack
    end

    def write(chunk)
      @redis.pipelined {
        chunk.open { |io|
          begin
            MessagePack::Unpacker.new(io).each.each_with_index { |record, index|
              @redis.mapped_hmset "#{record[0]}.#{index}", record[1]
            }
          rescue EOFError
            # EOFError always occured when reached end of chunk.
          end
        }
      }
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
fluent-plugin-redis-0.2.3 lib/fluent/plugin/out_redis.rb
fluent-plugin-redis-0.2.2 lib/fluent/plugin/out_redis.rb