lib/gizzard/commands.rb in gizzmo-0.12.1 vs lib/gizzard/commands.rb in gizzmo-0.13.0
- old
+ new
@@ -62,10 +62,58 @@
@buffer << string.strip
else
puts string
end
end
+
+ def require_tables
+ if !global_options.tables
+ puts "Please specify tables to repair with the --tables flag"
+ exit 1
+ end
+ end
+
+ def require_template_options
+ options = command_options.template_options || {}
+ if global_options.template_options && global_options.template_options[:simple]
+ fail = false
+ if !options[:concrete]
+ fail = true
+ STDERR.puts "Please specify a concrete shard type with --concrete flag when using --simple"
+ end
+ if !options[:source_type]
+ fail = true
+ STDERR.puts "Please specify a source data type with --source-type flag when using --simple"
+ end
+ if !options[:dest_type]
+ fail = true
+ STDERR.puts "Please specify a destination data type with --dest-type flag when using --simple"
+ end
+ exit 1 if fail
+ end
+ end
+
+ # Infer the shard base_name from a list of transformations
+ def get_base_name(transformations)
+ # Gets the first valid tree from the map of transformations -> applicable trees
+ # Trees are maps of forwardings -> shards. Gets the first valid shard from the this tree.
+ # Gets the ShardId of that shard and pulls the base name out of the table prefix
+ transformations.values.find {|v| v.is_a?(Hash) && !v.values.empty? }.values.find {|v| !v.nil?}.id.table_prefix.split('_').first
+ end
+
+ def confirm!(message="Continue?")
+ unless global_options.force
+ begin
+ print "#{message} (y/n) "; $stdout.flush
+ resp = $stdin.gets.chomp.downcase
+ puts ""
+ end while resp != 'y' && resp != 'n'
+ exit if resp == 'n'
+ end
+ end
+
+
end
class RetryProxy
def initialize(retries, object)
@inner = object
@@ -493,34 +541,53 @@
end
end
class CopyCommand < Command
def run
- from_shard_id_string, to_shard_id_string = @argv
- help!("Requires source & destination shard id") unless from_shard_id_string && to_shard_id_string
- from_shard_id = ShardId.parse(from_shard_id_string)
- to_shard_id = ShardId.parse(to_shard_id_string)
- manager.copy_shard(from_shard_id, to_shard_id)
- end
- end
-
- class RepairShardsCommand < Command
- def run
shard_id_strings = @argv
help!("Requires at least two shard ids") unless shard_id_strings.size >= 2
shard_ids = shard_id_strings.map{|s| ShardId.parse(s)}
- manager.repair_shards(shard_ids)
+ manager.copy_shard(shard_ids)
+ sleep 2
+ while manager.get_busy_shards().size > 0
+ sleep 5
+ end
end
end
- class DiffShardsCommand < Command
+ class RepairTablesCommand < Command
def run
- shard_id_strings = @argv
- help!("Requires at least two shard ids") unless shard_id_strings.size >= 2
- shard_ids = shard_id_strings.map{|s| ShardId.parse(s)}
- manager.diff_shards(shard_ids)
+ require_tables
+
+ table_ids = global_options.tables
+ manifest = manager.manifest(*table_ids)
+ num_copies = command_options.num_copies || 100
+ shard_sets = []
+ manifest.trees.values.each do |tree|
+ shard_sets << concrete_leaves(tree)
+ end
+ shard_sets.each do |shard_ids|
+ while manager.get_busy_shards().size > num_copies
+ sleep 1
+ end
+ puts "Repairing " + shard_ids.map {|s| s.to_unix }.join(",")
+ manager.copy_shard(shard_ids)
+ end
+
+ while manager.get_busy_shards().size > 0
+ sleep 5
+ end
end
+
+ def concrete_leaves(tree)
+ list = []
+ list << tree.info.id if tree.children.empty? && !tree.info.class_name.include?("BlackHoleShard")
+ tree.children.each do |child|
+ list += concrete_leaves(child)
+ end
+ list
+ end
end
class BusyCommand < Command
def run
manager.get_busy_shards().each { |shard_info| output shard_info.to_unix }
@@ -712,153 +779,245 @@
end
end
class TablesCommand < Command
def run
- puts manager.list_tables.join(" ")
+ puts manager.list_tables.join(",")
end
end
class TopologyCommand < Command
def run
+ require_tables
+
manifest = manager.manifest(*global_options.tables)
- templates = manifest.templates.inject({}) do |h, (t, fs)|
- h.update t.to_config => fs
- end
+ templates = manifest.templates
if command_options.forwardings
templates.
- inject([]) { |h, (t, fs)| fs.each { |f| h << [f.inspect, t] }; h }.
+ inject([]) { |h, (t, fs)| fs.each { |f| h << [f.inspect, t.to_config] }; h }.
sort.
each { |a| puts "%s\t%s" % a }
elsif command_options.root_shards
templates.
- inject([]) { |a, (t, fs)| fs.each { |f| a << [f.shard_id.inspect, t] }; a }.
+ inject([]) { |a, (t, fs)| fs.each { |f| a << [f.shard_id.inspect, t.to_config] }; a }.
sort.
each { |a| puts "%s\t%s" % a }
else
templates.
- map { |(t, fs)| [fs.length, t] }.
+ map { |(t, fs)| [fs.length, t.to_config] }.
sort.reverse.
each { |a| puts "%4d %s" % a }
end
end
end
- class TransformTreeCommand < Command
+ class BaseTransformCommand < Command
def run
- help!("wrong number of arguments") unless @argv.length == 2
-
scheduler_options = command_options.scheduler_options || {}
- template_s, shard_id_s = @argv
+ be_quiet = global_options.force && command_options.quiet
- to_template = ShardTemplate.parse(template_s)
- shard_id = ShardId.parse(shard_id_s)
- base_name = shard_id.table_prefix.split('_').first
- forwarding = manager.get_forwarding_for_shard(shard_id)
- manifest = manager.manifest(forwarding.table_id)
- shard = manifest.trees[forwarding]
- copy_wrapper = scheduler_options[:copy_wrapper]
- be_quiet = global_options.force && command_options.quiet
- transformation = Transformation.new(shard.template, to_template, copy_wrapper)
-
scheduler_options[:quiet] = be_quiet
- if transformation.noop?
+ transformations = get_transformations
+ transformations.reject! {|t,trees| t.noop? or trees.empty? }
+
+ if transformations.empty?
puts "Nothing to do!"
exit
end
+ base_name = get_base_name(transformations)
+
unless be_quiet
- puts transformation.inspect
+ transformations.each do |transformation, trees|
+ puts transformation.inspect
+ puts "Applied to #{trees.length} shards"
+ #trees.keys.sort.each {|f| puts " #{f.inspect}" }
+ end
puts ""
end
- unless global_options.force
- print "Continue? (y/n) "; $stdout.flush
- exit unless $stdin.gets.chomp == "y"
- puts ""
- end
+ confirm!
Gizzard.schedule! manager,
base_name,
- { transformation => { forwarding => shard } },
+ transformations,
scheduler_options
end
end
- class TransformCommand < Command
- def run
+ class TransformTreeCommand < BaseTransformCommand
+ def get_transformations
help!("must have an even number of arguments") unless @argv.length % 2 == 0
+ require_template_options
scheduler_options = command_options.scheduler_options || {}
+ copy_wrapper = scheduler_options[:copy_wrapper]
+ skip_copies = scheduler_options[:skip_copies] || false
+ transformations = {}
+
+ memoized_transforms = {}
+ @argv.each_slice(2) do |(template_s, shard_id_s)|
+ to_template = ShardTemplate.parse(template_s)
+ shard_id = ShardId.parse(shard_id_s)
+ base_name = shard_id.table_prefix.split('_').first
+ forwarding = manager.get_forwarding_for_shard(shard_id)
+ manifest = manager.manifest(forwarding.table_id)
+ shard = manifest.trees[forwarding]
+
+ transform_args = [shard.template, to_template, copy_wrapper, skip_copies]
+ transformation = memoized_transforms.fetch(transform_args) do |args|
+ memoized_transforms[args] = Transformation.new(*args)
+ end
+ tree = transformations.fetch(transformation) do |t|
+ transformations[t] = {}
+ end
+ tree[forwarding] = shard
+ end
+
+ transformations
+ end
+ end
+
+ class TransformCommand < BaseTransformCommand
+ def get_transformations
+ help!("must have an even number of arguments") unless @argv.length % 2 == 0
+ require_tables
+ require_template_options
+
+ scheduler_options = command_options.scheduler_options || {}
manifest = manager.manifest(*global_options.tables)
copy_wrapper = scheduler_options[:copy_wrapper]
- be_quiet = global_options.force && command_options.quiet
+ skip_copies = scheduler_options[:skip_copies] || false
transformations = {}
- scheduler_options[:quiet] = be_quiet
-
@argv.each_slice(2) do |(from_template_s, to_template_s)|
from, to = [from_template_s, to_template_s].map {|s| ShardTemplate.parse(s) }
- transformation = Transformation.new(from, to, copy_wrapper)
+ transformation = Transformation.new(from, to, copy_wrapper, skip_copies)
forwardings = Set.new(manifest.templates[from] || [])
trees = manifest.trees.reject {|(f, s)| !forwardings.include?(f) }
transformations[transformation] = trees
end
- transformations.reject! {|t,trees| t.noop? or trees.empty? }
+ transformations
+ end
+ end
+ class RebalanceCommand < BaseTransformCommand
+ def get_transformations
+ help!("must have an even number of arguments") unless @argv.length % 2 == 0
+ require_tables
+ require_template_options
+
+ scheduler_options = command_options.scheduler_options || {}
+ manifest = manager.manifest(*global_options.tables)
+ copy_wrapper = scheduler_options[:copy_wrapper]
+ transformations = {}
+
+ dest_templates_and_weights = {}
+
+ @argv.each_slice(2) do |(weight_s, to_template_s)|
+ to = ShardTemplate.parse(to_template_s)
+ weight = weight_s.to_i
+
+ dest_templates_and_weights[to] = weight
+ end
+
+ global_options.tables.inject({}) do |all, table|
+ trees = manifest.trees.reject {|(f, s)| f.table_id != table }
+ rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper)
+
+ all.update(rebalancer.transformations) {|t,a,b| a.merge b }
+ end
+ end
+ end
+
+ class AddPartitionCommand < Command
+ def run
+ require_tables
+ require_template_options
+
+ scheduler_options = command_options.scheduler_options || {}
+ manifest = manager.manifest(*global_options.tables)
+ copy_wrapper = scheduler_options[:copy_wrapper]
+ be_quiet = global_options.force && command_options.quiet
+ transformations = {}
+
+ scheduler_options[:quiet] = be_quiet
+
+ puts "Note: All partitions, including existing ones, will be weighted evenly." unless be_quiet
+
+ add_templates_and_weights = {}
+
+ @argv.each do |template_s|
+ to = ShardTemplate.parse(template_s)
+
+ add_templates_and_weights[to] = ShardTemplate::DEFAULT_WEIGHT
+ end
+
+ orig_templates_and_weights = manifest.templates.inject({}) do |h, (template, forwardings)|
+ h[template] = ShardTemplate::DEFAULT_WEIGHT; h
+ end
+
+ dest_templates_and_weights = orig_templates_and_weights.merge(add_templates_and_weights)
+
+ transformations = global_options.tables.inject({}) do |all, table|
+ trees = manifest.trees.reject {|(f, s)| f.table_id != table }
+ rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper)
+
+ all.update(rebalancer.transformations) {|t,a,b| a.merge b }
+ end
+
if transformations.empty?
puts "Nothing to do!"
exit
end
- base_name = transformations.values.find {|v| v.is_a?(Hash) && !v.values.empty? }.values.find {|v| !v.nil?}.id.table_prefix.split('_').first
+ base_name = get_base_name(transformations)
unless be_quiet
- transformations.sort.each do |transformation, trees|
+ transformations.each do |transformation, trees|
puts transformation.inspect
puts "Applied to #{trees.length} shards:"
trees.keys.sort.each {|f| puts " #{f.inspect}" }
end
puts ""
end
- unless global_options.force
- print "Continue? (y/n) "; $stdout.flush
- exit unless $stdin.gets.chomp == "y"
- puts ""
- end
+ confirm!
Gizzard.schedule! manager,
base_name,
transformations,
scheduler_options
end
end
- class RebalanceCommand < Command
+ class RemovePartitionCommand < Command
def run
- help!("must have an even number of arguments") unless @argv.length % 2 == 0
+ require_tables
scheduler_options = command_options.scheduler_options || {}
manifest = manager.manifest(*global_options.tables)
copy_wrapper = scheduler_options[:copy_wrapper]
be_quiet = global_options.force && command_options.quiet
transformations = {}
scheduler_options[:quiet] = be_quiet
- dest_templates_and_weights = {}
+ puts "Note: All partitions will be weighted evenly." unless be_quiet
- @argv.each_slice(2) do |(weight_s, to_template_s)|
- to = ShardTemplate.parse(to_template_s)
- weight = weight_s.to_i
+ dest_templates_and_weights = manifest.templates.inject({}) do |h, (template, forwardings)|
+ h[template] = ShardTemplate::DEFAULT_WEIGHT; h
+ end
- dest_templates_and_weights[to] = weight
+ @argv.each do |template_s|
+ t = ShardTemplate.parse(template_s)
+
+ dest_templates_and_weights.delete(t)
end
transformations = global_options.tables.inject({}) do |all, table|
trees = manifest.trees.reject {|(f, s)| f.table_id != table }
rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper)
@@ -869,35 +1028,30 @@
if transformations.empty?
puts "Nothing to do!"
exit
end
- base_name = transformations.values.first.values.first.id.table_prefix.split('_').first
+ base_name = get_base_name(transformations)
unless be_quiet
transformations.each do |transformation, trees|
puts transformation.inspect
puts "Applied to #{trees.length} shards:"
trees.keys.sort.each {|f| puts " #{f.inspect}" }
end
puts ""
end
- unless global_options.force
- print "Continue? (y/n) "; $stdout.flush
- exit unless $stdin.gets.chomp == "y"
- puts ""
- end
+ confirm!
Gizzard.schedule! manager,
base_name,
transformations,
scheduler_options
end
end
-
class CreateTableCommand < Command
DEFAULT_NUM_SHARDS = 1024
DEFAULT_BASE_NAME = "shard"
@@ -932,10 +1086,12 @@
# This is all super hacky but I don't have time to generalize right now
def run
help!("must have an even number of arguments") unless @argv.length % 2 == 0
+ require_tables
+ require_template_options
base_name = command_options.base_name || DEFAULT_BASE_NAME
num_shards = (command_options.shards || DEFAULT_NUM_SHARDS).to_i
max_id = (command_options.max_id || FORWARDING_SPACE_MAX).to_i
min_id = (command_options.min_id || FORWARDING_SPACE_MIN).to_i
@@ -977,14 +1133,10 @@
base_ids.each {|(enum, base_id)| puts " #{base_id}" }
end
puts ""
end
- unless global_options.force
- print "Continue? (y/n) "; $stdout.flush
- exit unless $stdin.gets.chomp == "y"
- puts ""
- end
+ confirm!
global_options.tables.each do |table_id|
templates_and_base_ids.each do |template, base_ids|
ops = op_sets[template]