lib/gamma/command/apply.rb in gamma-0.1.9 vs lib/gamma/command/apply.rb in gamma-0.2.0

- old
+ new

@@ -1,29 +1,37 @@ +require 'parallel' + class Gamma::Command::Apply < Gamma::Command def initialize(opts) @database_settings = Gamma::DatabaseSettings.new(opts[:settings]) # TODO: support postgres adapter @in_client = Gamma::DatabaseConnector::MysqlConnector.new(@database_settings.in_database) @out_client = Gamma::DatabaseConnector::MysqlConnector.new(@database_settings.out_database) - @hook_root_dir = opts[:hook_dir] || "." - @syncdb = Gamma::SyncDatabase.new(opts[:sync_history] || "./history.json") + @hook_root_dir = opts[:hook_dir] || '.' + @syncdb = Gamma::SyncDatabase.new(opts[:sync_history] || './history.json') @data_parser = Gamma::Parser::DataParser.new(opts[:data], @hook_root_dir, @in_client, @out_client, apply: true) end def execute tables = @data_parser.gamma_tables output_setting_warning(tables) - tables.each do |t| + sync = lambda do |t| logger.info("[#{t.sync_mode}] Sync Start #{t.table_name}".green) case t.sync_mode - when "replace" + when 'replace' Gamma::Importer::Replace.new(@in_client, @out_client, t, apply: true).execute - when "force_replace" + when 'force_replace' Gamma::Importer::Replace.new(@in_client, @out_client, t, apply: true, ignore_error: true).execute else logger.info("[#{t.sync_mode}] Sync Failed #{t.table_name}. Unknown Sync mode".red) end + end + + if ENV['PARALLEL_COUNT'] + Parallel.each(tables, in_processes: ENV['PARALLEL_COUNT'].to_i) { |t| sync.call(t) } + else + tables.each { |t| sync.call(t) } end end end