Sha256: 9e25b32fd4a7dac18779be013909c610dc0d37542af7a38dc71c1919acb7283f

Contents?: true

Size: 1.01 KB

Versions: 1

Compression:

Stored size: 1.01 KB

Contents

module Fluent

class DbiOutput < BufferedOutput
  Plugin.register_output('dbi', self)

  config_param :dsn, :string
  config_param :keys, :string
  config_param :db_user, :string
  config_param :db_pass, :string, secret: true
  config_param :query, :string

  def initialize
    super

    require 'dbi'
  end

  def configure(conf)
    super

    @keys = @keys.split(",")
  end

  def format(tag, time, record)
    [tag, time, record].to_msgpack
  end

  def write(chunk)
    begin
      dbh = DBI.connect(@dsn, @db_user, @db_pass)
      dbh['AutoCommit'] = false
      sth = dbh.prepare(@query)
      chunk.msgpack_each { |tag, time, record|
        record.key?('time') || record['time'] = time
        record.key('tag') || record['tag'] = tag
        values = []
        @keys.each { |key|
          values.push(record[key])
        }
        rows = sth.execute(*values)
      }
    rescue
      dbh.rollback if dbh
      raise
    else
      sth.finish
      dbh.commit
    ensure
      dbh.disconnect if dbh
    end
  end
end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-dbi-1.0.0 lib/fluent/plugin/out_dbi.rb