require File.join(File.dirname(__FILE__), "create_table") require File.join(File.dirname(__FILE__), "alter_table") require File.join(File.dirname(__FILE__), "service") module Charrington class Insert # This service assumes that the data is already clean and in a flattened hash format. # The Transform service should be called before calling this. include Service attr_accessor :event, :should_retry attr_reader :connection, :schema, :table_name, :columns attr_reader :event_as_json_keyword, :enable_event_as_json_keyword Error = Class.new(StandardError) EventNil = Class.new(Error) TableNameNil = Class.new(Error) InsertFailed = Class.new(Error) def initialize(connection, event, opts = {}) raise EventNil, "Table name is nil" if event.nil? @event = event.to_hash event_name = event["event"].to_s.downcase.strip raise TableNameNil, "Table name is nil" if event_name.empty? @connection = connection @schema = opts[:schema].empty? ? '' : "#{opts[:schema]}." @table_name = "#{@schema}#{event_name.gsub(/[^a-z0-9]+/, "_")}" @columns = event.keys @should_retry = false @enable_event_as_json_keyword = opts[:enable_event_as_json_keyword] @event_as_json_keyword = opts[:event_as_json_keyword] end def call stmt = connection.prepareStatement(insert_statement) stmt = add_statement_event_params(stmt) stmt.execute should_retry rescue Java::OrgPostgresqlUtil::PSQLException => e case e.getSQLState() when "42P01" should_retry = Charrington::CreateTable.call(connection, event, table_name, columns) when "42703" should_retry = Charrington::AlterTable.call(connection, event, table_name, columns) else raise InsertFailed, "Charrington: Rescue from SQLException #{e.message}" end should_retry rescue => e raise InsertFailed, "Charrington: Rescue from SQLException #{e.message}" ensure stmt.close unless stmt.nil? cleanup end private def cleanup @columns.clear if clearable(@columns) end ### Set Variables def columns_text @columns_text ||= arr_to_csv(columns) end def value_placeholders ('?' * columns.length).split('') end def insert_values arr_to_csv(value_placeholders) end def insert_statement "INSERT INTO #{table_name} #{columns_text} VALUES #{insert_values}" end def prepared_statement columns.map { |column| "[#{column}]" } end def add_statement_event_params(stmt) columns.each_with_index do |key, idx| pos = idx + 1 value = event[key] case value when Time stmt.setString(pos, value.strftime(STRFTIME_FMT)) when LogStash::Timestamp stmt.setString(pos, value.time.strftime(STRFTIME_FMT)) when Integer if value > 2147483647 || value < -2147483648 stmt.setLong(pos, value) else stmt.setInt(pos, value) end when BigDecimal stmt.setBigDecimal(pos, value.to_java) when Float stmt.setFloat(pos, value) when String stmt.setString(pos, value) when Array, Hash stmt.setString(pos, value.to_json) when true, false stmt.setBoolean(pos, value) else stmt.setString(pos, nil) end end stmt end ### Helpers def arr_to_csv(arr) '(' + arr.join(', ') + ')' end def clearable(obj) obj.is_a? Hash or obj.is_a? Array end ### SQL def execute(connection, sql) statement = connection.prepareStatement( sql.gsub(/\s+/, " ").strip ) statement.execute() rescue Java::OrgPostgresqlUtil::PSQLException => e @logger.error("#{e.message}") ensure statement.close unless statement.nil? end end end