lib/gizzard/commands.rb in gizzmo-0.12.1 vs lib/gizzard/commands.rb in gizzmo-0.13.0

- old
+ new

@@ -62,10 +62,58 @@ @buffer << string.strip else puts string end end + + def require_tables + if !global_options.tables + puts "Please specify tables to repair with the --tables flag" + exit 1 + end + end + + def require_template_options + options = command_options.template_options || {} + if global_options.template_options && global_options.template_options[:simple] + fail = false + if !options[:concrete] + fail = true + STDERR.puts "Please specify a concrete shard type with --concrete flag when using --simple" + end + if !options[:source_type] + fail = true + STDERR.puts "Please specify a source data type with --source-type flag when using --simple" + end + if !options[:dest_type] + fail = true + STDERR.puts "Please specify a destination data type with --dest-type flag when using --simple" + end + exit 1 if fail + end + end + + # Infer the shard base_name from a list of transformations + def get_base_name(transformations) + # Gets the first valid tree from the map of transformations -> applicable trees + # Trees are maps of forwardings -> shards. Gets the first valid shard from the this tree. + # Gets the ShardId of that shard and pulls the base name out of the table prefix + transformations.values.find {|v| v.is_a?(Hash) && !v.values.empty? }.values.find {|v| !v.nil?}.id.table_prefix.split('_').first + end + + def confirm!(message="Continue?") + unless global_options.force + begin + print "#{message} (y/n) "; $stdout.flush + resp = $stdin.gets.chomp.downcase + puts "" + end while resp != 'y' && resp != 'n' + exit if resp == 'n' + end + end + + end class RetryProxy def initialize(retries, object) @inner = object @@ -493,34 +541,53 @@ end end class CopyCommand < Command def run - from_shard_id_string, to_shard_id_string = @argv - help!("Requires source & destination shard id") unless from_shard_id_string && to_shard_id_string - from_shard_id = ShardId.parse(from_shard_id_string) - to_shard_id = ShardId.parse(to_shard_id_string) - manager.copy_shard(from_shard_id, to_shard_id) - end - end - - class RepairShardsCommand < Command - def run shard_id_strings = @argv help!("Requires at least two shard ids") unless shard_id_strings.size >= 2 shard_ids = shard_id_strings.map{|s| ShardId.parse(s)} - manager.repair_shards(shard_ids) + manager.copy_shard(shard_ids) + sleep 2 + while manager.get_busy_shards().size > 0 + sleep 5 + end end end - class DiffShardsCommand < Command + class RepairTablesCommand < Command def run - shard_id_strings = @argv - help!("Requires at least two shard ids") unless shard_id_strings.size >= 2 - shard_ids = shard_id_strings.map{|s| ShardId.parse(s)} - manager.diff_shards(shard_ids) + require_tables + + table_ids = global_options.tables + manifest = manager.manifest(*table_ids) + num_copies = command_options.num_copies || 100 + shard_sets = [] + manifest.trees.values.each do |tree| + shard_sets << concrete_leaves(tree) + end + shard_sets.each do |shard_ids| + while manager.get_busy_shards().size > num_copies + sleep 1 + end + puts "Repairing " + shard_ids.map {|s| s.to_unix }.join(",") + manager.copy_shard(shard_ids) + end + + while manager.get_busy_shards().size > 0 + sleep 5 + end end + + def concrete_leaves(tree) + list = [] + list << tree.info.id if tree.children.empty? && !tree.info.class_name.include?("BlackHoleShard") + tree.children.each do |child| + list += concrete_leaves(child) + end + list + end end class BusyCommand < Command def run manager.get_busy_shards().each { |shard_info| output shard_info.to_unix } @@ -712,153 +779,245 @@ end end class TablesCommand < Command def run - puts manager.list_tables.join(" ") + puts manager.list_tables.join(",") end end class TopologyCommand < Command def run + require_tables + manifest = manager.manifest(*global_options.tables) - templates = manifest.templates.inject({}) do |h, (t, fs)| - h.update t.to_config => fs - end + templates = manifest.templates if command_options.forwardings templates. - inject([]) { |h, (t, fs)| fs.each { |f| h << [f.inspect, t] }; h }. + inject([]) { |h, (t, fs)| fs.each { |f| h << [f.inspect, t.to_config] }; h }. sort. each { |a| puts "%s\t%s" % a } elsif command_options.root_shards templates. - inject([]) { |a, (t, fs)| fs.each { |f| a << [f.shard_id.inspect, t] }; a }. + inject([]) { |a, (t, fs)| fs.each { |f| a << [f.shard_id.inspect, t.to_config] }; a }. sort. each { |a| puts "%s\t%s" % a } else templates. - map { |(t, fs)| [fs.length, t] }. + map { |(t, fs)| [fs.length, t.to_config] }. sort.reverse. each { |a| puts "%4d %s" % a } end end end - class TransformTreeCommand < Command + class BaseTransformCommand < Command def run - help!("wrong number of arguments") unless @argv.length == 2 - scheduler_options = command_options.scheduler_options || {} - template_s, shard_id_s = @argv + be_quiet = global_options.force && command_options.quiet - to_template = ShardTemplate.parse(template_s) - shard_id = ShardId.parse(shard_id_s) - base_name = shard_id.table_prefix.split('_').first - forwarding = manager.get_forwarding_for_shard(shard_id) - manifest = manager.manifest(forwarding.table_id) - shard = manifest.trees[forwarding] - copy_wrapper = scheduler_options[:copy_wrapper] - be_quiet = global_options.force && command_options.quiet - transformation = Transformation.new(shard.template, to_template, copy_wrapper) - scheduler_options[:quiet] = be_quiet - if transformation.noop? + transformations = get_transformations + transformations.reject! {|t,trees| t.noop? or trees.empty? } + + if transformations.empty? puts "Nothing to do!" exit end + base_name = get_base_name(transformations) + unless be_quiet - puts transformation.inspect + transformations.each do |transformation, trees| + puts transformation.inspect + puts "Applied to #{trees.length} shards" + #trees.keys.sort.each {|f| puts " #{f.inspect}" } + end puts "" end - unless global_options.force - print "Continue? (y/n) "; $stdout.flush - exit unless $stdin.gets.chomp == "y" - puts "" - end + confirm! Gizzard.schedule! manager, base_name, - { transformation => { forwarding => shard } }, + transformations, scheduler_options end end - class TransformCommand < Command - def run + class TransformTreeCommand < BaseTransformCommand + def get_transformations help!("must have an even number of arguments") unless @argv.length % 2 == 0 + require_template_options scheduler_options = command_options.scheduler_options || {} + copy_wrapper = scheduler_options[:copy_wrapper] + skip_copies = scheduler_options[:skip_copies] || false + transformations = {} + + memoized_transforms = {} + @argv.each_slice(2) do |(template_s, shard_id_s)| + to_template = ShardTemplate.parse(template_s) + shard_id = ShardId.parse(shard_id_s) + base_name = shard_id.table_prefix.split('_').first + forwarding = manager.get_forwarding_for_shard(shard_id) + manifest = manager.manifest(forwarding.table_id) + shard = manifest.trees[forwarding] + + transform_args = [shard.template, to_template, copy_wrapper, skip_copies] + transformation = memoized_transforms.fetch(transform_args) do |args| + memoized_transforms[args] = Transformation.new(*args) + end + tree = transformations.fetch(transformation) do |t| + transformations[t] = {} + end + tree[forwarding] = shard + end + + transformations + end + end + + class TransformCommand < BaseTransformCommand + def get_transformations + help!("must have an even number of arguments") unless @argv.length % 2 == 0 + require_tables + require_template_options + + scheduler_options = command_options.scheduler_options || {} manifest = manager.manifest(*global_options.tables) copy_wrapper = scheduler_options[:copy_wrapper] - be_quiet = global_options.force && command_options.quiet + skip_copies = scheduler_options[:skip_copies] || false transformations = {} - scheduler_options[:quiet] = be_quiet - @argv.each_slice(2) do |(from_template_s, to_template_s)| from, to = [from_template_s, to_template_s].map {|s| ShardTemplate.parse(s) } - transformation = Transformation.new(from, to, copy_wrapper) + transformation = Transformation.new(from, to, copy_wrapper, skip_copies) forwardings = Set.new(manifest.templates[from] || []) trees = manifest.trees.reject {|(f, s)| !forwardings.include?(f) } transformations[transformation] = trees end - transformations.reject! {|t,trees| t.noop? or trees.empty? } + transformations + end + end + class RebalanceCommand < BaseTransformCommand + def get_transformations + help!("must have an even number of arguments") unless @argv.length % 2 == 0 + require_tables + require_template_options + + scheduler_options = command_options.scheduler_options || {} + manifest = manager.manifest(*global_options.tables) + copy_wrapper = scheduler_options[:copy_wrapper] + transformations = {} + + dest_templates_and_weights = {} + + @argv.each_slice(2) do |(weight_s, to_template_s)| + to = ShardTemplate.parse(to_template_s) + weight = weight_s.to_i + + dest_templates_and_weights[to] = weight + end + + global_options.tables.inject({}) do |all, table| + trees = manifest.trees.reject {|(f, s)| f.table_id != table } + rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper) + + all.update(rebalancer.transformations) {|t,a,b| a.merge b } + end + end + end + + class AddPartitionCommand < Command + def run + require_tables + require_template_options + + scheduler_options = command_options.scheduler_options || {} + manifest = manager.manifest(*global_options.tables) + copy_wrapper = scheduler_options[:copy_wrapper] + be_quiet = global_options.force && command_options.quiet + transformations = {} + + scheduler_options[:quiet] = be_quiet + + puts "Note: All partitions, including existing ones, will be weighted evenly." unless be_quiet + + add_templates_and_weights = {} + + @argv.each do |template_s| + to = ShardTemplate.parse(template_s) + + add_templates_and_weights[to] = ShardTemplate::DEFAULT_WEIGHT + end + + orig_templates_and_weights = manifest.templates.inject({}) do |h, (template, forwardings)| + h[template] = ShardTemplate::DEFAULT_WEIGHT; h + end + + dest_templates_and_weights = orig_templates_and_weights.merge(add_templates_and_weights) + + transformations = global_options.tables.inject({}) do |all, table| + trees = manifest.trees.reject {|(f, s)| f.table_id != table } + rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper) + + all.update(rebalancer.transformations) {|t,a,b| a.merge b } + end + if transformations.empty? puts "Nothing to do!" exit end - base_name = transformations.values.find {|v| v.is_a?(Hash) && !v.values.empty? }.values.find {|v| !v.nil?}.id.table_prefix.split('_').first + base_name = get_base_name(transformations) unless be_quiet - transformations.sort.each do |transformation, trees| + transformations.each do |transformation, trees| puts transformation.inspect puts "Applied to #{trees.length} shards:" trees.keys.sort.each {|f| puts " #{f.inspect}" } end puts "" end - unless global_options.force - print "Continue? (y/n) "; $stdout.flush - exit unless $stdin.gets.chomp == "y" - puts "" - end + confirm! Gizzard.schedule! manager, base_name, transformations, scheduler_options end end - class RebalanceCommand < Command + class RemovePartitionCommand < Command def run - help!("must have an even number of arguments") unless @argv.length % 2 == 0 + require_tables scheduler_options = command_options.scheduler_options || {} manifest = manager.manifest(*global_options.tables) copy_wrapper = scheduler_options[:copy_wrapper] be_quiet = global_options.force && command_options.quiet transformations = {} scheduler_options[:quiet] = be_quiet - dest_templates_and_weights = {} + puts "Note: All partitions will be weighted evenly." unless be_quiet - @argv.each_slice(2) do |(weight_s, to_template_s)| - to = ShardTemplate.parse(to_template_s) - weight = weight_s.to_i + dest_templates_and_weights = manifest.templates.inject({}) do |h, (template, forwardings)| + h[template] = ShardTemplate::DEFAULT_WEIGHT; h + end - dest_templates_and_weights[to] = weight + @argv.each do |template_s| + t = ShardTemplate.parse(template_s) + + dest_templates_and_weights.delete(t) end transformations = global_options.tables.inject({}) do |all, table| trees = manifest.trees.reject {|(f, s)| f.table_id != table } rebalancer = Rebalancer.new(trees, dest_templates_and_weights, copy_wrapper) @@ -869,35 +1028,30 @@ if transformations.empty? puts "Nothing to do!" exit end - base_name = transformations.values.first.values.first.id.table_prefix.split('_').first + base_name = get_base_name(transformations) unless be_quiet transformations.each do |transformation, trees| puts transformation.inspect puts "Applied to #{trees.length} shards:" trees.keys.sort.each {|f| puts " #{f.inspect}" } end puts "" end - unless global_options.force - print "Continue? (y/n) "; $stdout.flush - exit unless $stdin.gets.chomp == "y" - puts "" - end + confirm! Gizzard.schedule! manager, base_name, transformations, scheduler_options end end - class CreateTableCommand < Command DEFAULT_NUM_SHARDS = 1024 DEFAULT_BASE_NAME = "shard" @@ -932,10 +1086,12 @@ # This is all super hacky but I don't have time to generalize right now def run help!("must have an even number of arguments") unless @argv.length % 2 == 0 + require_tables + require_template_options base_name = command_options.base_name || DEFAULT_BASE_NAME num_shards = (command_options.shards || DEFAULT_NUM_SHARDS).to_i max_id = (command_options.max_id || FORWARDING_SPACE_MAX).to_i min_id = (command_options.min_id || FORWARDING_SPACE_MIN).to_i @@ -977,14 +1133,10 @@ base_ids.each {|(enum, base_id)| puts " #{base_id}" } end puts "" end - unless global_options.force - print "Continue? (y/n) "; $stdout.flush - exit unless $stdin.gets.chomp == "y" - puts "" - end + confirm! global_options.tables.each do |table_id| templates_and_base_ids.each do |template, base_ids| ops = op_sets[template]