require 'flydata-core/postgresql/pg_client' require 'flydata-core/table_def/sync_redshift_table_def' require 'flydata/command_loggable' module Flydata module Output class RedshiftDDLRunner include CommandLoggable def initialize(dbconf, de) @pg_client = FlydataCore::Postgresql::PGClient.new(dbconf, notice_receiver: Proc.new{|result| log_info_stdout(" #{result.error_message.to_s.strip}") }) @schema_name = de['schema_name'] @schema_name = nil if @schema_name.to_s.strip.empty? end def run_ddls(flydata_tabledefs) @pg_client.establish_connection create_schema create_ctl_tables flydata_tabledefs.each.with_index(1) do |flydata_tabledef, index| run_ddl(flydata_tabledef, index, flydata_tabledefs.size) end ensure @pg_client.close end private def create_schema return unless @schema_name query = FlydataCore::TableDef::SyncRedshiftTableDef.create_schema_sql(@schema_name) begin @pg_client.exec(query) rescue PG::InsufficientPrivilege # Ignore end end def create_ctl_tables query = FlydataCore::TableDef::SyncRedshiftTableDef.create_flydata_ctl_table_sql(@schema_name) log_info_stdout(" -> Creating flydata ctl tables... schema:\"#{@schema_name}\"") log_info("query:\n#{query}") @pg_client.exec(query) end def run_ddl(flydata_tabledef, index, total) ddl = FlydataCore::TableDef::SyncRedshiftTableDef.from_flydata_tabledef( flydata_tabledef, flydata_tabledef[:ddl_options].merge(flydata_ctl_table: false)) log_info_stdout(" -> Creating \"#{flydata_tabledef[:table_name]}\"... (#{index}/#{total})") log_info("query:\n#{ddl}") @pg_client.exec(ddl) rescue PG::DependentObjectsStillExist => e log_error_stderr(" [error] #{e.to_s.strip}") # ignore this error end =begin def wrap(query) query = query.strip query = "BEGIN;\n#{query}" unless /^BEGIN;/i.match(query) query = "#{query}\nEND;" unless /END;$/i.match(query) query end =end end end end