lib/pgslice.rb in pgslice-0.1.5 vs lib/pgslice.rb in pgslice-0.1.6

- old
+ new

@@ -52,38 +52,47 @@ def prep table, column, period = arguments intermediate_table = "#{table}_intermediate" trigger_name = self.trigger_name(table) - abort "Usage: pgslice prep <table> <column> <period>" if arguments.length != 3 + if options[:no_partition] + abort "Usage: pgslice prep <table> --no-partition" if arguments.length != 1 + else + abort "Usage: pgslice prep <table> <column> <period>" if arguments.length != 3 + end abort "Table not found: #{table}" unless table_exists?(table) abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table) - abort "Column not found: #{column}" unless columns(table).include?(column) - abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym] + unless options[:no_partition] + abort "Column not found: #{column}" unless columns(table).include?(column) + abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym] + end + queries = [] queries << <<-SQL CREATE TABLE #{intermediate_table} (LIKE #{table} INCLUDING ALL); SQL - sql_format = SQL_FORMAT[period.to_sym] - queries << <<-SQL -CREATE FUNCTION #{trigger_name}() - RETURNS trigger AS $$ - BEGIN - EXECUTE 'INSERT INTO #{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW; - RETURN NULL; - END; - $$ LANGUAGE plpgsql; - SQL + unless options[:no_partition] + sql_format = SQL_FORMAT[period.to_sym] + queries << <<-SQL + CREATE FUNCTION #{trigger_name}() + RETURNS trigger AS $$ + BEGIN + EXECUTE 'INSERT INTO #{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW; + RETURN NULL; + END; + $$ LANGUAGE plpgsql; + SQL - queries << <<-SQL -CREATE TRIGGER #{trigger_name} - BEFORE INSERT ON #{intermediate_table} - FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}(); - SQL + queries << <<-SQL + CREATE TRIGGER #{trigger_name} + BEFORE INSERT ON #{intermediate_table} + FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}(); + SQL + end run_queries(queries) end def unprep @@ -94,11 +103,11 @@ abort "Usage: pgslice unprep <table>" if arguments.length != 1 abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table) queries = [ "DROP TABLE #{intermediate_table} CASCADE;", - "DROP FUNCTION #{trigger_name}();" + "DROP FUNCTION IF EXISTS #{trigger_name}();" ] run_queries(queries) end def add_partitions @@ -120,10 +129,11 @@ primary_key = self.primary_key(table) queries = [] period, field = settings_from_table(original_table, table) + abort "Could not read settings" unless period today = round_date(Date.today, period) range.each do |n| day = advance_date(today, period, n) partition_name = "#{original_table}_#{day.strftime(name_format(period))}" @@ -160,42 +170,54 @@ abort "Table not found: #{source_table}" unless table_exists?(source_table) abort "Table not found: #{dest_table}" unless table_exists?(dest_table) period, field = settings_from_table(table, dest_table) - name_format = self.name_format(period) - existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) }.sort - starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format) - ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1) + if period + name_format = self.name_format(period) + existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) }.sort + starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format) + ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1) + end + primary_key = self.primary_key(table) max_source_id = max_id(source_table, primary_key) max_dest_id = if options[:swapped] max_id(dest_table, primary_key, below: max_source_id) else max_id(dest_table, primary_key) end if max_dest_id == 0 && !options[:swapped] - min_source_id = min_id(source_table, primary_key, field, starting_time) - max_dest_id = min_source_id - 1 if min_source_id + if options[:start] + max_dest_id = options[:start] + else + min_source_id = min_id(source_table, primary_key, field, starting_time) + max_dest_id = min_source_id - 1 if min_source_id + end end starting_id = max_dest_id fields = columns(source_table).join(", ") batch_size = options[:batch_size] i = 1 batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil while starting_id < max_source_id + where = "#{primary_key} > #{starting_id} AND #{primary_key} <= #{starting_id + batch_size}" + if period + where << " AND #{field} >= #{sql_date(starting_time)} AND #{field} < #{sql_date(ending_time)}" + end + query = <<-SQL /* #{i} of #{batch_count} */ INSERT INTO #{dest_table} (#{fields}) SELECT #{fields} FROM #{source_table} - WHERE #{primary_key} > #{starting_id} AND #{primary_key} <= #{starting_id + batch_size} AND #{field} >= #{sql_date(starting_time)} AND #{field} < #{sql_date(ending_time)} + WHERE #{where} SQL log_sql(query) log_sql execute(query) @@ -263,10 +285,13 @@ o.float "--sleep" o.integer "--future", default: 0 o.integer "--past", default: 0 o.integer "--batch-size", default: 10000 o.boolean "--dry-run", default: false + o.boolean "--no-partition", default: false + o.integer "--start" + o.string "--url" o.on "-v", "--version", "print the version" do log PgSlice::VERSION @exit = true end end @@ -292,12 +317,13 @@ # database connection def connection @connection ||= begin - abort "Set PGSLICE_URL" unless ENV["PGSLICE_URL"] - uri = URI.parse(ENV["PGSLICE_URL"]) + url = options[:url] || ENV["PGSLICE_URL"] + abort "Set PGSLICE_URL or use the --url option" unless url + uri = URI.parse(url) uri_parser = URI::Parser.new config = { host: uri.host, port: uri.port, dbname: uri.path.sub(/\A\//, ""), @@ -367,12 +393,13 @@ query << " WHERE #{primary_key} <= #{below}" if below execute(query)[0]["max"].to_i end def min_id(table, primary_key, column, starting_time) - query = "SELECT MIN(#{primary_key}) FROM #{table} WHERE #{column} >= #{sql_date(starting_time)}" - execute(query)[0]["min"].to_i + query = "SELECT MIN(#{primary_key}) FROM #{table}" + query << " WHERE #{column} >= #{sql_date(starting_time)}" if starting_time + (execute(query)[0]["min"] || 1).to_i end def has_trigger?(trigger_name, table) execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, table]).any? end @@ -442,12 +469,14 @@ end end def settings_from_table(original_table, table) trigger_name = self.trigger_name(original_table) - function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]["pg_get_functiondef"] + function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0] + return [nil, nil] unless function_def + function_def = function_def["pg_get_functiondef"] sql_format = SQL_FORMAT.find { |_, f| function_def.include?("'#{f}'") } - abort "Could not read settings" unless sql_format + return [nil, nil] unless sql_format period = sql_format[0] field = /to_char\(NEW\.(\w+),/.match(function_def)[1] [period, field] end end