lib/gizzard/commands.rb in gizzmo-0.10.0 vs lib/gizzard/commands.rb in gizzmo-0.10.1
- old
+ new
@@ -1,7 +1,8 @@
require "pp"
require "digest/md5"
+
module Gizzard
class Command
include Thrift
attr_reader :buffer
@@ -15,11 +16,11 @@
run(command_name, global_options, command.buffer, OpenStruct.new, log, service)
end
end
def self.classify(string)
- string.split(/\W+/).map{|s| s.capitalize }.join("")
+ string.split(/\W+/).map { |s| s.capitalize }.join("")
end
attr_reader :service, :global_options, :argv, :command_options
def initialize(service, global_options, argv, command_options)
@service = service
@@ -39,17 +40,17 @@
else
puts string
end
end
end
-
+
class RetryProxy
def initialize(retries, object)
@inner = object
@retries_left = retries
end
-
+
def method_missing(*args)
@inner.send(*args)
rescue
if @retries_left > 0
@retries_left -= 1
@@ -61,19 +62,19 @@
end
end
class ShardCommand < Command
def self.make_service(global_options, log)
- RetryProxy.new global_options.retry.to_i,
- Gizzard::Thrift::ShardManager.new(global_options.host, global_options.port, log, global_options.dry)
+ RetryProxy.new global_options.retry.to_i,
+ Gizzard::Thrift::ShardManager.new(global_options.host, global_options.port, log, global_options.framed, global_options.dry)
end
end
class JobCommand < Command
def self.make_service(global_options, log)
RetryProxy.new global_options.retry.to_i ,
- Gizzard::Thrift::JobManager.new(global_options.host, global_options.port + 2, log, global_options.dry)
+ Gizzard::Thrift::JobManager.new(global_options.host, global_options.port + 2, log, global_options.framed, global_options.dry)
end
end
class AddforwardingCommand < ShardCommand
def run
@@ -145,11 +146,22 @@
end
class ReloadCommand < ShardCommand
def run
if global_options.force || ask
- service.reload_forwardings
+ if @argv
+ # allow hosts to be given on the command line
+ @argv.each do |hostname|
+ output hostname
+ opts = global_options.dup
+ opts.host = hostname
+ s = self.class.make_service(opts, global_options.log || "./gizzmo.log")
+ s.reload_forwardings
+ end
+ else
+ service.reload_forwardings
+ end
else
STDERR.puts "aborted"
end
end
@@ -199,11 +211,14 @@
shard_id = ShardId.parse(shard_id_string)
upward_links = service.list_upward_links(shard_id)
downward_links = service.list_downward_links(shard_id)
- help! "Shard must not be a root or leaf" if upward_links.length == 0 or downward_links.length == 0
+ if upward_links.length == 0 or downward_links.length == 0
+ STDERR.puts "Shard #{shard_id_string} must not be a root or leaf"
+ next
+ end
upward_links.each do |uplink|
downward_links.each do |downlink|
service.add_link(uplink.up_id, downlink.down_id, uplink.weight)
new_link = LinkInfo.new(uplink.up_id, downlink.down_id, uplink.weight)
@@ -237,15 +252,19 @@
def run
shard_ids = @argv
shard_ids.each do |shard_id_text|
shard_id = ShardId.parse(shard_id_text)
next if !shard_id
- service.list_upward_links(shard_id).each do |link_info|
- output link_info.to_unix
+ unless command_options.down
+ service.list_upward_links(shard_id).each do |link_info|
+ output command_options.ids ? link_info.up_id.to_unix : link_info.to_unix
+ end
end
- service.list_downward_links(shard_id).each do |link_info|
- output link_info.to_unix
+ unless command_options.up
+ service.list_downward_links(shard_id).each do |link_info|
+ output command_options.ids ? link_info.down_id.to_unix : link_info.to_unix
+ end
end
end
end
end
@@ -404,11 +423,11 @@
new_shards = shards.map{|(old, new)| new }
puts "gizzmo create #{shard_info.class_name} -s '#{shard_info.source_type}' -d '#{shard_info.destination_type}' #{new_shards.join(" ")}"
puts "gizzmo wrap #{command_options.write_only_shard} #{new_shards.join(" ")}"
- shards.map {|(old, new)| puts "gizzmo copy #{old} #{new}" }
+ shards.map { |(old, new)| puts "gizzmo copy #{old} #{new}" }
end
end
class PairCommand < ShardCommand
def run
@@ -431,20 +450,20 @@
ids_by_host[id.hostname] << id
end
overlaps = {}
ids_by_table.values.each do |arr|
- key = arr.map{|id| id.hostname }.sort
+ key = arr.map { |id| id.hostname }.sort
overlaps[key] ||= 0
- overlaps[key] += 1
+ overlaps[key] += 1
end
displayed = {}
- overlaps.sort_by{|hosts, count| count }.reverse.each do |(host_a, host_b), count|
+ overlaps.sort_by { |hosts, count| count }.reverse.each do |(host_a, host_b), count|
next if !host_a || !host_b || displayed[host_a] || displayed[host_b]
- id_a = ids_by_host[host_a].find{|id| service.list_upward_links(id).size > 0 }
- id_b = ids_by_host[host_b].find{|id| service.list_upward_links(id).size > 0 }
+ id_a = ids_by_host[host_a].find { |id| service.list_upward_links(id).size > 0 }
+ id_b = ids_by_host[host_b].find { |id| service.list_upward_links(id).size > 0 }
next unless id_a && id_b
weight_a = service.list_upward_links(id_a).first.weight
weight_b = service.list_upward_links(id_b).first.weight
if weight_a > weight_b
puts "#{host_a}\t#{host_b}"
@@ -464,11 +483,10 @@
end
end
class ReportCommand < ShardCommand
def run
-
things = @argv.map do |shard|
parse(down(ShardId.parse(shard))).join("\n")
end
if command_options.flat
@@ -490,24 +508,24 @@
def group(arr)
arr.inject({}) do |m, e|
m[e] ||= []
m[e] << e
m
- end.to_a.sort_by{|k, v| v.length}.reverse
+ end.to_a.sort_by { |k, v| v.length }.reverse
end
def parse(obj, id = nil, depth = 0, sub = true)
case obj
when Hash
id, prefix = parse(obj.keys.first, id, depth, sub)
- [prefix] + parse(obj.values.first, id, depth + 1, sub)
+ [ prefix ] + parse(obj.values.first, id, depth + 1, sub)
when String
host, prefix = obj.split("/")
host = "db" if host != "localhost" && sub
- id ||= prefix[/(\w+ward_)?\d+_\d+(_\w+ward)?/]
+ id ||= prefix[/(\w+ward_)?n?\d+_\d+(_\w+ward)?/]
prefix = (" " * depth) + host + "/" + ((sub && id) ? prefix.sub(id, "[ID]") : prefix)
- [id, prefix]
+ [ id, prefix ]
when Array
obj.map do |e|
parse e, id, depth, sub
end
end
@@ -515,11 +533,11 @@
def down(id)
vals = service.list_downward_links(id).map do |link|
down(link.down_id)
end
- {id.to_unix => vals}
+ { id.to_unix => vals }
end
end
class DrillCommand < ReportCommand
def run
@@ -570,9 +588,63 @@
end
class BusyCommand < ShardCommand
def run
service.get_busy_shards().each { |shard_info| output shard_info.to_unix }
+ end
+ end
+
+ class SetupReplicaCommand < ShardCommand
+ def run
+ from_shard_id_string, to_shard_id_string = @argv
+ help!("Requires source & destination shard id") unless from_shard_id_string && to_shard_id_string
+ from_shard_id = ShardId.parse(from_shard_id_string)
+ to_shard_id = ShardId.parse(to_shard_id_string)
+
+ if service.list_upward_links(to_shard_id).size > 0
+ STDERR.puts "Destination shard #{to_shard_id} has links to it."
+ exit 1
+ end
+
+ link = service.list_upward_links(from_shard_id)[0]
+ replica_shard_id = link.up_id
+ weight = link.weight
+ write_only_shard_id = ShardId.new("localhost", "#{to_shard_id.table_prefix}_copy_write_only")
+ service.create_shard(ShardInfo.new(write_only_shard_id, "WriteOnlyShard", "", "", 0))
+ service.add_link(replica_shard_id, write_only_shard_id, weight)
+ service.add_link(write_only_shard_id, to_shard_id, 1)
+ output to_shard_id.to_unix
+ end
+ end
+
+ class FinishReplicaCommand < ShardCommand
+ def run
+ from_shard_id_string, to_shard_id_string = @argv
+ help!("Requires source & destination shard id") unless from_shard_id_string && to_shard_id_string
+ from_shard_id = ShardId.parse(from_shard_id_string)
+ to_shard_id = ShardId.parse(to_shard_id_string)
+
+ write_only_shard_id = ShardId.new("localhost", "#{to_shard_id.table_prefix}_copy_write_only")
+ link = service.list_upward_links(write_only_shard_id)[0]
+ replica_shard_id = link.up_id
+ weight = link.weight
+
+ # careful. need to validate some basic assumptions.
+ unless global_options.force
+ if service.list_upward_links(from_shard_id).map { |link| link.up_id }.to_a != [ replica_shard_id ]
+ STDERR.puts "Uplink from #{from_shard_id} is not a migration replica."
+ exit 1
+ end
+ if service.list_upward_links(to_shard_id).map { |link| link.up_id }.to_a != [ write_only_shard_id ]
+ STDERR.puts "Uplink from #{to_shard_id} is not a write-only barrier."
+ exit 1
+ end
+ end
+
+ service.remove_link(write_only_shard_id, to_shard_id)
+ service.remove_link(replica_shard_id, write_only_shard_id)
+ service.add_link(replica_shard_id, to_shard_id, weight)
+ service.delete_shard(write_only_shard_id)
end
end
class SetupMigrateCommand < ShardCommand
def run