lib/cuboid/rpc/server/dispatcher.rb in cuboid-0.0.3 vs lib/cuboid/rpc/server/dispatcher.rb in cuboid-0.0.4

- old
+ new

@@ -29,10 +29,12 @@ include Utilities include UI::Output SERVICE_NAMESPACE = Service + PREFERENCE_STRATEGIES = Set.new([:horizontal, :vertical]) + def initialize( options = Options.instance ) @options = options @options.snapshot.path ||= @options.paths.snapshots @@ -75,27 +77,48 @@ # true def alive? @server.alive? end + # @param [Symbol] strategy + # `:horizontal` -- Pick the Dispatcher with the least amount of workload. + # `:vertical` -- Pick the Dispatcher with the most amount of workload. + # # @return [String, nil] - # Depending on availability: + # Depending on strategy and availability: # - # * URL of the least burdened Dispatcher. If not a grid member it will - # return this Dispatcher's URL. - # * `nil` if all nodes are at max utilization. - def preferred( &block ) + # * URL of the preferred Dispatcher. If not a grid member it will return + # this Dispatcher's URL. + # * `nil` if all nodes are at max utilization or on error. + # * `ArgumentError` -- On invalid `strategy`. + def preferred( strategy = :horizontal, &block ) + strategy = strategy.to_sym + if !PREFERENCE_STRATEGIES.include? strategy + block.call :error_unknown_strategy + raise ArgumentError, "Unknown strategy: #{strategy}" + end + if !@node.grid_member? block.call( self.utilization == 1 ? nil : @url ) return end pick_utilization = proc do |url, utilization| (utilization == 1 || utilization.rpc_exception?) ? nil : [url, utilization] end + adjust_score_by_strategy = proc do |score| + case strategy + when :horizontal + score + + when :vertical + -score + end + end + each = proc do |neighbour, iter| connect_to_peer( neighbour ).utilization do |utilization| iter.return pick_utilization.call( neighbour, utilization ) end end @@ -108,11 +131,11 @@ if nodes.empty? block.call next end - block.call nodes.sort_by { |_, score| score }[0][0] + block.call nodes.sort_by { |_, score| adjust_score_by_strategy.call score }[0][0] end Arachni::Reactor.global.create_iterator( @node.neighbours ).map( each, after ) end @@ -131,17 +154,23 @@ # # * `Hash`: Connection and proc info. # * `nil`: Max utilization, wait for one of the instances to finish and retry. def dispatch( options = {}, &block ) options = options.my_symbolize_keys + strategy = options.delete(:strategy) owner = options[:owner] helpers = options[:helpers] || {} load_balance = options[:load_balance].nil? ? true : options[:load_balance] if load_balance && @node.grid_member? - preferred do |url| + preferred *[strategy].compact do |url| if !url block.call + next + end + + if url == :error_unknown_strategy + block.call :error_unknown_strategy next end connect_to_peer( url ).dispatch( options.merge( helpers: helpers.merge( via: @url ),