lib/pgslice.rb in pgslice-0.1.5 vs lib/pgslice.rb in pgslice-0.1.6
- old
+ new
@@ -52,38 +52,47 @@
def prep
table, column, period = arguments
intermediate_table = "#{table}_intermediate"
trigger_name = self.trigger_name(table)
- abort "Usage: pgslice prep <table> <column> <period>" if arguments.length != 3
+ if options[:no_partition]
+ abort "Usage: pgslice prep <table> --no-partition" if arguments.length != 1
+ else
+ abort "Usage: pgslice prep <table> <column> <period>" if arguments.length != 3
+ end
abort "Table not found: #{table}" unless table_exists?(table)
abort "Table already exists: #{intermediate_table}" if table_exists?(intermediate_table)
- abort "Column not found: #{column}" unless columns(table).include?(column)
- abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym]
+ unless options[:no_partition]
+ abort "Column not found: #{column}" unless columns(table).include?(column)
+ abort "Invalid period: #{period}" unless SQL_FORMAT[period.to_sym]
+ end
+
queries = []
queries << <<-SQL
CREATE TABLE #{intermediate_table} (LIKE #{table} INCLUDING ALL);
SQL
- sql_format = SQL_FORMAT[period.to_sym]
- queries << <<-SQL
-CREATE FUNCTION #{trigger_name}()
- RETURNS trigger AS $$
- BEGIN
- EXECUTE 'INSERT INTO #{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW;
- RETURN NULL;
- END;
- $$ LANGUAGE plpgsql;
- SQL
+ unless options[:no_partition]
+ sql_format = SQL_FORMAT[period.to_sym]
+ queries << <<-SQL
+ CREATE FUNCTION #{trigger_name}()
+ RETURNS trigger AS $$
+ BEGIN
+ EXECUTE 'INSERT INTO #{table}_' || to_char(NEW.#{column}, '#{sql_format}') || ' VALUES ($1.*)' USING NEW;
+ RETURN NULL;
+ END;
+ $$ LANGUAGE plpgsql;
+ SQL
- queries << <<-SQL
-CREATE TRIGGER #{trigger_name}
- BEFORE INSERT ON #{intermediate_table}
- FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}();
- SQL
+ queries << <<-SQL
+ CREATE TRIGGER #{trigger_name}
+ BEFORE INSERT ON #{intermediate_table}
+ FOR EACH ROW EXECUTE PROCEDURE #{trigger_name}();
+ SQL
+ end
run_queries(queries)
end
def unprep
@@ -94,11 +103,11 @@
abort "Usage: pgslice unprep <table>" if arguments.length != 1
abort "Table not found: #{intermediate_table}" unless table_exists?(intermediate_table)
queries = [
"DROP TABLE #{intermediate_table} CASCADE;",
- "DROP FUNCTION #{trigger_name}();"
+ "DROP FUNCTION IF EXISTS #{trigger_name}();"
]
run_queries(queries)
end
def add_partitions
@@ -120,10 +129,11 @@
primary_key = self.primary_key(table)
queries = []
period, field = settings_from_table(original_table, table)
+ abort "Could not read settings" unless period
today = round_date(Date.today, period)
range.each do |n|
day = advance_date(today, period, n)
partition_name = "#{original_table}_#{day.strftime(name_format(period))}"
@@ -160,42 +170,54 @@
abort "Table not found: #{source_table}" unless table_exists?(source_table)
abort "Table not found: #{dest_table}" unless table_exists?(dest_table)
period, field = settings_from_table(table, dest_table)
- name_format = self.name_format(period)
- existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) }.sort
- starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format)
- ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1)
+ if period
+ name_format = self.name_format(period)
+ existing_tables = self.existing_tables(like: "#{table}_%").select { |t| /#{Regexp.escape("#{table}_")}(\d{4,6})/.match(t) }.sort
+ starting_time = DateTime.strptime(existing_tables.first.split("_").last, name_format)
+ ending_time = advance_date(DateTime.strptime(existing_tables.last.split("_").last, name_format), period, 1)
+ end
+
primary_key = self.primary_key(table)
max_source_id = max_id(source_table, primary_key)
max_dest_id =
if options[:swapped]
max_id(dest_table, primary_key, below: max_source_id)
else
max_id(dest_table, primary_key)
end
if max_dest_id == 0 && !options[:swapped]
- min_source_id = min_id(source_table, primary_key, field, starting_time)
- max_dest_id = min_source_id - 1 if min_source_id
+ if options[:start]
+ max_dest_id = options[:start]
+ else
+ min_source_id = min_id(source_table, primary_key, field, starting_time)
+ max_dest_id = min_source_id - 1 if min_source_id
+ end
end
starting_id = max_dest_id
fields = columns(source_table).join(", ")
batch_size = options[:batch_size]
i = 1
batch_count = ((max_source_id - starting_id) / batch_size.to_f).ceil
while starting_id < max_source_id
+ where = "#{primary_key} > #{starting_id} AND #{primary_key} <= #{starting_id + batch_size}"
+ if period
+ where << " AND #{field} >= #{sql_date(starting_time)} AND #{field} < #{sql_date(ending_time)}"
+ end
+
query = <<-SQL
/* #{i} of #{batch_count} */
INSERT INTO #{dest_table} (#{fields})
SELECT #{fields} FROM #{source_table}
- WHERE #{primary_key} > #{starting_id} AND #{primary_key} <= #{starting_id + batch_size} AND #{field} >= #{sql_date(starting_time)} AND #{field} < #{sql_date(ending_time)}
+ WHERE #{where}
SQL
log_sql(query)
log_sql
execute(query)
@@ -263,10 +285,13 @@
o.float "--sleep"
o.integer "--future", default: 0
o.integer "--past", default: 0
o.integer "--batch-size", default: 10000
o.boolean "--dry-run", default: false
+ o.boolean "--no-partition", default: false
+ o.integer "--start"
+ o.string "--url"
o.on "-v", "--version", "print the version" do
log PgSlice::VERSION
@exit = true
end
end
@@ -292,12 +317,13 @@
# database connection
def connection
@connection ||= begin
- abort "Set PGSLICE_URL" unless ENV["PGSLICE_URL"]
- uri = URI.parse(ENV["PGSLICE_URL"])
+ url = options[:url] || ENV["PGSLICE_URL"]
+ abort "Set PGSLICE_URL or use the --url option" unless url
+ uri = URI.parse(url)
uri_parser = URI::Parser.new
config = {
host: uri.host,
port: uri.port,
dbname: uri.path.sub(/\A\//, ""),
@@ -367,12 +393,13 @@
query << " WHERE #{primary_key} <= #{below}" if below
execute(query)[0]["max"].to_i
end
def min_id(table, primary_key, column, starting_time)
- query = "SELECT MIN(#{primary_key}) FROM #{table} WHERE #{column} >= #{sql_date(starting_time)}"
- execute(query)[0]["min"].to_i
+ query = "SELECT MIN(#{primary_key}) FROM #{table}"
+ query << " WHERE #{column} >= #{sql_date(starting_time)}" if starting_time
+ (execute(query)[0]["min"] || 1).to_i
end
def has_trigger?(trigger_name, table)
execute("SELECT 1 FROM pg_trigger WHERE tgname = $1 AND tgrelid = $2::regclass", [trigger_name, table]).any?
end
@@ -442,12 +469,14 @@
end
end
def settings_from_table(original_table, table)
trigger_name = self.trigger_name(original_table)
- function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]["pg_get_functiondef"]
+ function_def = execute("select pg_get_functiondef(oid) from pg_proc where proname = $1", [trigger_name])[0]
+ return [nil, nil] unless function_def
+ function_def = function_def["pg_get_functiondef"]
sql_format = SQL_FORMAT.find { |_, f| function_def.include?("'#{f}'") }
- abort "Could not read settings" unless sql_format
+ return [nil, nil] unless sql_format
period = sql_format[0]
field = /to_char\(NEW\.(\w+),/.match(function_def)[1]
[period, field]
end
end