lib/gizzard/commands.rb in gizzmo-0.10.0 vs lib/gizzard/commands.rb in gizzmo-0.10.1

- old
+ new

@@ -1,7 +1,8 @@ require "pp" require "digest/md5" + module Gizzard class Command include Thrift attr_reader :buffer @@ -15,11 +16,11 @@ run(command_name, global_options, command.buffer, OpenStruct.new, log, service) end end def self.classify(string) - string.split(/\W+/).map{|s| s.capitalize }.join("") + string.split(/\W+/).map { |s| s.capitalize }.join("") end attr_reader :service, :global_options, :argv, :command_options def initialize(service, global_options, argv, command_options) @service = service @@ -39,17 +40,17 @@ else puts string end end end - + class RetryProxy def initialize(retries, object) @inner = object @retries_left = retries end - + def method_missing(*args) @inner.send(*args) rescue if @retries_left > 0 @retries_left -= 1 @@ -61,19 +62,19 @@ end end class ShardCommand < Command def self.make_service(global_options, log) - RetryProxy.new global_options.retry.to_i, - Gizzard::Thrift::ShardManager.new(global_options.host, global_options.port, log, global_options.dry) + RetryProxy.new global_options.retry.to_i, + Gizzard::Thrift::ShardManager.new(global_options.host, global_options.port, log, global_options.framed, global_options.dry) end end class JobCommand < Command def self.make_service(global_options, log) RetryProxy.new global_options.retry.to_i , - Gizzard::Thrift::JobManager.new(global_options.host, global_options.port + 2, log, global_options.dry) + Gizzard::Thrift::JobManager.new(global_options.host, global_options.port + 2, log, global_options.framed, global_options.dry) end end class AddforwardingCommand < ShardCommand def run @@ -145,11 +146,22 @@ end class ReloadCommand < ShardCommand def run if global_options.force || ask - service.reload_forwardings + if @argv + # allow hosts to be given on the command line + @argv.each do |hostname| + output hostname + opts = global_options.dup + opts.host = hostname + s = self.class.make_service(opts, global_options.log || "./gizzmo.log") + s.reload_forwardings + end + else + service.reload_forwardings + end else STDERR.puts "aborted" end end @@ -199,11 +211,14 @@ shard_id = ShardId.parse(shard_id_string) upward_links = service.list_upward_links(shard_id) downward_links = service.list_downward_links(shard_id) - help! "Shard must not be a root or leaf" if upward_links.length == 0 or downward_links.length == 0 + if upward_links.length == 0 or downward_links.length == 0 + STDERR.puts "Shard #{shard_id_string} must not be a root or leaf" + next + end upward_links.each do |uplink| downward_links.each do |downlink| service.add_link(uplink.up_id, downlink.down_id, uplink.weight) new_link = LinkInfo.new(uplink.up_id, downlink.down_id, uplink.weight) @@ -237,15 +252,19 @@ def run shard_ids = @argv shard_ids.each do |shard_id_text| shard_id = ShardId.parse(shard_id_text) next if !shard_id - service.list_upward_links(shard_id).each do |link_info| - output link_info.to_unix + unless command_options.down + service.list_upward_links(shard_id).each do |link_info| + output command_options.ids ? link_info.up_id.to_unix : link_info.to_unix + end end - service.list_downward_links(shard_id).each do |link_info| - output link_info.to_unix + unless command_options.up + service.list_downward_links(shard_id).each do |link_info| + output command_options.ids ? link_info.down_id.to_unix : link_info.to_unix + end end end end end @@ -404,11 +423,11 @@ new_shards = shards.map{|(old, new)| new } puts "gizzmo create #{shard_info.class_name} -s '#{shard_info.source_type}' -d '#{shard_info.destination_type}' #{new_shards.join(" ")}" puts "gizzmo wrap #{command_options.write_only_shard} #{new_shards.join(" ")}" - shards.map {|(old, new)| puts "gizzmo copy #{old} #{new}" } + shards.map { |(old, new)| puts "gizzmo copy #{old} #{new}" } end end class PairCommand < ShardCommand def run @@ -431,20 +450,20 @@ ids_by_host[id.hostname] << id end overlaps = {} ids_by_table.values.each do |arr| - key = arr.map{|id| id.hostname }.sort + key = arr.map { |id| id.hostname }.sort overlaps[key] ||= 0 - overlaps[key] += 1 + overlaps[key] += 1 end displayed = {} - overlaps.sort_by{|hosts, count| count }.reverse.each do |(host_a, host_b), count| + overlaps.sort_by { |hosts, count| count }.reverse.each do |(host_a, host_b), count| next if !host_a || !host_b || displayed[host_a] || displayed[host_b] - id_a = ids_by_host[host_a].find{|id| service.list_upward_links(id).size > 0 } - id_b = ids_by_host[host_b].find{|id| service.list_upward_links(id).size > 0 } + id_a = ids_by_host[host_a].find { |id| service.list_upward_links(id).size > 0 } + id_b = ids_by_host[host_b].find { |id| service.list_upward_links(id).size > 0 } next unless id_a && id_b weight_a = service.list_upward_links(id_a).first.weight weight_b = service.list_upward_links(id_b).first.weight if weight_a > weight_b puts "#{host_a}\t#{host_b}" @@ -464,11 +483,10 @@ end end class ReportCommand < ShardCommand def run - things = @argv.map do |shard| parse(down(ShardId.parse(shard))).join("\n") end if command_options.flat @@ -490,24 +508,24 @@ def group(arr) arr.inject({}) do |m, e| m[e] ||= [] m[e] << e m - end.to_a.sort_by{|k, v| v.length}.reverse + end.to_a.sort_by { |k, v| v.length }.reverse end def parse(obj, id = nil, depth = 0, sub = true) case obj when Hash id, prefix = parse(obj.keys.first, id, depth, sub) - [prefix] + parse(obj.values.first, id, depth + 1, sub) + [ prefix ] + parse(obj.values.first, id, depth + 1, sub) when String host, prefix = obj.split("/") host = "db" if host != "localhost" && sub - id ||= prefix[/(\w+ward_)?\d+_\d+(_\w+ward)?/] + id ||= prefix[/(\w+ward_)?n?\d+_\d+(_\w+ward)?/] prefix = (" " * depth) + host + "/" + ((sub && id) ? prefix.sub(id, "[ID]") : prefix) - [id, prefix] + [ id, prefix ] when Array obj.map do |e| parse e, id, depth, sub end end @@ -515,11 +533,11 @@ def down(id) vals = service.list_downward_links(id).map do |link| down(link.down_id) end - {id.to_unix => vals} + { id.to_unix => vals } end end class DrillCommand < ReportCommand def run @@ -570,9 +588,63 @@ end class BusyCommand < ShardCommand def run service.get_busy_shards().each { |shard_info| output shard_info.to_unix } + end + end + + class SetupReplicaCommand < ShardCommand + def run + from_shard_id_string, to_shard_id_string = @argv + help!("Requires source & destination shard id") unless from_shard_id_string && to_shard_id_string + from_shard_id = ShardId.parse(from_shard_id_string) + to_shard_id = ShardId.parse(to_shard_id_string) + + if service.list_upward_links(to_shard_id).size > 0 + STDERR.puts "Destination shard #{to_shard_id} has links to it." + exit 1 + end + + link = service.list_upward_links(from_shard_id)[0] + replica_shard_id = link.up_id + weight = link.weight + write_only_shard_id = ShardId.new("localhost", "#{to_shard_id.table_prefix}_copy_write_only") + service.create_shard(ShardInfo.new(write_only_shard_id, "WriteOnlyShard", "", "", 0)) + service.add_link(replica_shard_id, write_only_shard_id, weight) + service.add_link(write_only_shard_id, to_shard_id, 1) + output to_shard_id.to_unix + end + end + + class FinishReplicaCommand < ShardCommand + def run + from_shard_id_string, to_shard_id_string = @argv + help!("Requires source & destination shard id") unless from_shard_id_string && to_shard_id_string + from_shard_id = ShardId.parse(from_shard_id_string) + to_shard_id = ShardId.parse(to_shard_id_string) + + write_only_shard_id = ShardId.new("localhost", "#{to_shard_id.table_prefix}_copy_write_only") + link = service.list_upward_links(write_only_shard_id)[0] + replica_shard_id = link.up_id + weight = link.weight + + # careful. need to validate some basic assumptions. + unless global_options.force + if service.list_upward_links(from_shard_id).map { |link| link.up_id }.to_a != [ replica_shard_id ] + STDERR.puts "Uplink from #{from_shard_id} is not a migration replica." + exit 1 + end + if service.list_upward_links(to_shard_id).map { |link| link.up_id }.to_a != [ write_only_shard_id ] + STDERR.puts "Uplink from #{to_shard_id} is not a write-only barrier." + exit 1 + end + end + + service.remove_link(write_only_shard_id, to_shard_id) + service.remove_link(replica_shard_id, write_only_shard_id) + service.add_link(replica_shard_id, to_shard_id, weight) + service.delete_shard(write_only_shard_id) end end class SetupMigrateCommand < ShardCommand def run