lib/cuboid/rpc/server/dispatcher.rb in cuboid-0.0.3 vs lib/cuboid/rpc/server/dispatcher.rb in cuboid-0.0.4
- old
+ new
@@ -29,10 +29,12 @@
include Utilities
include UI::Output
SERVICE_NAMESPACE = Service
+ PREFERENCE_STRATEGIES = Set.new([:horizontal, :vertical])
+
def initialize( options = Options.instance )
@options = options
@options.snapshot.path ||= @options.paths.snapshots
@@ -75,27 +77,48 @@
# true
def alive?
@server.alive?
end
+ # @param [Symbol] strategy
+ # `:horizontal` -- Pick the Dispatcher with the least amount of workload.
+ # `:vertical` -- Pick the Dispatcher with the most amount of workload.
+ #
# @return [String, nil]
- # Depending on availability:
+ # Depending on strategy and availability:
#
- # * URL of the least burdened Dispatcher. If not a grid member it will
- # return this Dispatcher's URL.
- # * `nil` if all nodes are at max utilization.
- def preferred( &block )
+ # * URL of the preferred Dispatcher. If not a grid member it will return
+ # this Dispatcher's URL.
+ # * `nil` if all nodes are at max utilization or on error.
+ # * `ArgumentError` -- On invalid `strategy`.
+ def preferred( strategy = :horizontal, &block )
+ strategy = strategy.to_sym
+ if !PREFERENCE_STRATEGIES.include? strategy
+ block.call :error_unknown_strategy
+ raise ArgumentError, "Unknown strategy: #{strategy}"
+ end
+
if !@node.grid_member?
block.call( self.utilization == 1 ? nil : @url )
return
end
pick_utilization = proc do |url, utilization|
(utilization == 1 || utilization.rpc_exception?) ?
nil : [url, utilization]
end
+ adjust_score_by_strategy = proc do |score|
+ case strategy
+ when :horizontal
+ score
+
+ when :vertical
+ -score
+ end
+ end
+
each = proc do |neighbour, iter|
connect_to_peer( neighbour ).utilization do |utilization|
iter.return pick_utilization.call( neighbour, utilization )
end
end
@@ -108,11 +131,11 @@
if nodes.empty?
block.call
next
end
- block.call nodes.sort_by { |_, score| score }[0][0]
+ block.call nodes.sort_by { |_, score| adjust_score_by_strategy.call score }[0][0]
end
Arachni::Reactor.global.create_iterator( @node.neighbours ).map( each, after )
end
@@ -131,17 +154,23 @@
#
# * `Hash`: Connection and proc info.
# * `nil`: Max utilization, wait for one of the instances to finish and retry.
def dispatch( options = {}, &block )
options = options.my_symbolize_keys
+ strategy = options.delete(:strategy)
owner = options[:owner]
helpers = options[:helpers] || {}
load_balance = options[:load_balance].nil? ? true : options[:load_balance]
if load_balance && @node.grid_member?
- preferred do |url|
+ preferred *[strategy].compact do |url|
if !url
block.call
+ next
+ end
+
+ if url == :error_unknown_strategy
+ block.call :error_unknown_strategy
next
end
connect_to_peer( url ).dispatch( options.merge(
helpers: helpers.merge( via: @url ),