Sha256: e3298f88d1b5077564b8e79a87a4ced679eb68b9517ab7b91b33030f710f9283

Contents?: true

Size: 1.21 KB

Versions: 1

Compression:

Stored size: 1.21 KB

Contents

module Fluent
    class RedisPubsubOutput < BufferedOutput
        Plugin.register_output('redis_pubsub', self)
        attr_reader :host, :port, :channel, :redis

        def initialize
            super
            require 'redis'
            require 'msgpack'
        end

        def configure(config)
            super
            @host    = config.has_key?('host')    ? config['host']         : 'localhost'
            @port    = config.has_key?('port')    ? config['port'].to_i    : 6379
            raise Fluent::ConfigError, "need channel" if not config.has_key?('channel') or config['channel'].empty?
            @channel = config['channel'].to_s
        end

        def start
            super
            @redis = Redis.new(:host => @host, :port => @port ,:thread_safe => true)
        end

        def shutdown
            @redis.quit
        end

        def format(tag, time, record)
            record['__tag__']  = tag
            record['__time__'] = time
            record.to_msgpack
        end

        def write(chunk)
            @redis.pipelined do
                chunk.msgpack_each do |record|
                    @redis.publish @channel, record.to_json
                end
            end
        end
    end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-redis-pubsub-0.0.1 lib/fluent/plugin/out_redis_pubsub.rb