Sha256: e3298f88d1b5077564b8e79a87a4ced679eb68b9517ab7b91b33030f710f9283
Contents?: true
Size: 1.21 KB
Versions: 1
Compression:
Stored size: 1.21 KB
Contents
module Fluent class RedisPubsubOutput < BufferedOutput Plugin.register_output('redis_pubsub', self) attr_reader :host, :port, :channel, :redis def initialize super require 'redis' require 'msgpack' end def configure(config) super @host = config.has_key?('host') ? config['host'] : 'localhost' @port = config.has_key?('port') ? config['port'].to_i : 6379 raise Fluent::ConfigError, "need channel" if not config.has_key?('channel') or config['channel'].empty? @channel = config['channel'].to_s end def start super @redis = Redis.new(:host => @host, :port => @port ,:thread_safe => true) end def shutdown @redis.quit end def format(tag, time, record) record['__tag__'] = tag record['__time__'] = time record.to_msgpack end def write(chunk) @redis.pipelined do chunk.msgpack_each do |record| @redis.publish @channel, record.to_json end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-redis-pubsub-0.0.1 | lib/fluent/plugin/out_redis_pubsub.rb |