Sha256: 4c116299e4f991a6e6237624824b6192a3f4e3bbdb50be5b6b3703f4765fd806

Contents?: true

Size: 990 Bytes

Versions: 1

Compression:

Stored size: 990 Bytes

Contents

module Fluent

class DbiOutput < BufferedOutput
  Plugin.register_output('dbi', self)

  config_param :dsn, :string
  config_param :keys, :string
  config_param :db_user, :string
  config_param :db_pass, :string
  config_param :query, :string

  def initialize
    super

    require 'dbi'
  end

  def configure(conf)
    super

    @keys = @keys.split(",")
  end

  def format(tag, time, record)
    [tag, time, record].to_msgpack
  end

  def write(chunk)
    begin
      dbh = DBI.connect(@dsn, @db_user, @db_pass)
      sth = dbh.prepare(@query)
      chunk.msgpack_each { |tag, time, record|
        record.key?('time') || record['time'] = time
        record.key('tag') || record['tag'] = tag
        values = []
        @keys.each { |key|
          values.push(record[key])
        }
        rows = sth.execute(*values)
      }
    rescue
      dbh.rollback if dbh
      raise
    else
      sth.finish
      dbh.commit
    ensure
      dbh.disconnect if dbh
    end
  end
end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-dbi-0.0.1 lib/fluent/plugin/out_dbi.rb