Sha256: 223f3aae36281d171b804a223773c0673d42c32f69b802db0c8ff429182c770e
Contents?: true
Size: 1.11 KB
Versions: 1
Compression:
Stored size: 1.11 KB
Contents
module Fluent class CassandraOutput < BufferedOutput Fluent::Plugin.register_output('cassandra', self) def initialize super require 'cassandra' require 'msgpack' end def configure(conf) super raise ConfigError, "'Keyspace' parameter is required on file output" unless @keyspace = conf['keyspace'] raise ConfigError, "'ColumnFamily' parameter is required on file output" unless @columnfamily = conf['columnfamily'] @host = conf.has_key?('host') ? conf['host'] : 'localhost' @port = conf.has_key?('port') ? conf['port'] : 9160 end def start super @connection = Cassandra.new(@keyspace, @host + ':' + @port ) end def shutdown super end def format(tag, event) [tag,event.time,event.record].to_msgpack end def write(chunk) chunk.open { |io| begin MessagePack::Unpacker.new(io).each { |record| c_key = record[0] + "_" + record[1].to_s @connection.insert(@columnfamily,c_key,record[2]) } rescue EOFError # EOFError always occured when reached end of chunk. end } end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
fluent-plugin-cassandra-0.0.1 | lib/fluent/plugin/out_cassandra.rb |