Sha256: d82c49aa56b10b2da55f26f04a07941e567f95e4d9f9d3db49f7d30c206168a5

Contents?: true

Size: 1.86 KB

Versions: 1

Compression:

Stored size: 1.86 KB

Contents

module Fluent

class PgJsonOutput < Fluent::BufferedOutput
  Fluent::Plugin.register_output('pgjson', self)

  config_param :host       , :string  , :default => 'localhost'
  config_param :port       , :integer , :default => 5432
  config_param :database   , :string
  config_param :table      , :string
  config_param :user       , :string  , :default => nil
  config_param :password   , :string  , :default => nil
  config_param :time_col   , :string  , :default => 'time'
  config_param :tag_col    , :string  , :default => 'tag'
  config_param :record_col , :string  , :default => 'record'


  def initialize
    super
    require 'pg'
  end

  def configure(conf)
    super
    @stmt_name = 'insert'
  end

  def start
    super
    init_connection
  end

  def shutdown
    super
    if !@conn.nil? and !@conn.finished?
      @conn.close()
    end
  end

  def format(tag, time, record)
    [tag, time, record].to_msgpack
  end

  def write(chunk)
    begin
      sql = build_sql(chunk)
      @conn.exec(sql)
    rescue
      #TODO
      raise
    end
  end

  private
  def init_connection
    begin
      @conn = PGconn.new(:dbname => @database, :host => @host, :port => @port, :user => @user, :password => @password)
      @conn.setnonblocking(true)
    rescue
      raise Fluent::ConfigError, "failed to connect PostgreSQL Server at #{@host}:#{@port}"
    end
  end

  def prepare_statement
    begin
      @conn.prepare(@stmt_name, sql)
    rescue
      #TODO
      raise
    end
  end

  def build_sql(chunk)
    values = build_values(chunk)
    sql =<<"SQL"
INSERT INTO #{@table} (#{@tag_col}, #{@time_col}, #{@record_col})
VALUES #{values};
SQL
  end

  def build_values(chunk)
    tmp = []
    chunk.msgpack_each do |tag, time, record|
      tmp << ("("+[tag, Time.at(time), record.to_json].map{|s| @conn.escape_literal(s.to_s)}.join(',')+")")
    end
    tmp.join(',')
  end
end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
fluent-plugin-pgjson-0.0.1 lib/fluent/plugin/out_pgjson.rb