Sha256: ecd687ec1f68f7c513481bfbc55418152c0387be10ab95f5576d13e2399eecb1

Contents?: true

Size: 2 KB

Versions: 4

Compression:

Stored size: 2 KB

Contents

require File.join(File.dirname(__FILE__), "service")

module Charrington
  class CreatePostgresTable
    # This service will create a table dynamically based on the JSON structure.
    # This is potentially called from Insert when an insert fails.

    include Service
    attr_reader :connection, :event, :table_name, :columns, :schema
    attr_accessor :column_types

    Error = Class.new(StandardError)
    CreateFailed = Class.new(Error)

    def initialize(connection, event, schema, table_name, columns)
      @connection = connection
      @event = event.to_hash
      @table_name = table_name
      @schema = schema
      @columns = columns
      @column_types = initial_columns
    end

    def call
      set_column_types
      create_table
      true
    rescue => e
      raise CreateFailed, e.message
    ensure
      @column_types.clear if @column_types.is_a? Array
    end

    private

    def set_column_types
      columns.each do |column|
        case event[column]
        when Time, LogStash::Timestamp
          column_types << "#{column} TIMESTAMP"
        when Date
          column_types << "#{column} DATE"
        when Integer
          column_types << "#{column} BIGINT"
        when BigDecimal
          column_types << "#{column} DECIMAL"
        when Float
          column_types << "#{column} DOUBLE PRECISION"
        when true, false
          column_types << "#{column} BOOLEAN"
        else
          column_types << "#{column} VARCHAR"
        end
      end
    end

    def initial_columns
      # Postgres
      ["id SERIAL PRIMARY KEY", "inserted_at TIMESTAMP DEFAULT NOW()"]
    end

    def create_table
      execute("CREATE TABLE IF NOT EXISTS #{schema}#{table_name} (#{column_types.join(', ')})")
    end

    def execute(sql)
      statement = connection.prepareStatement( sql.gsub(/\s+/, " ").strip )
      statement.execute()
    rescue Java::OrgPostgresqlUtil::PSQLException => e
      puts "PSQLException: #{e.message}"
    ensure
      statement.close unless statement.nil?
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
logstash-output-charrington-0.3.3 lib/logstash/outputs/charrington/create_postgres_table.rb
logstash-output-charrington-0.3.2 lib/logstash/outputs/charrington/create_postgres_table.rb
logstash-output-charrington-0.3.1 lib/logstash/outputs/charrington/create_postgres_table.rb
logstash-output-charrington-0.3.0 lib/logstash/outputs/charrington/create_postgres_table.rb