lib/pione/agent/broker.rb in pione-0.2.2 vs lib/pione/agent/broker.rb in pione-0.3.0

- old
+ new

@@ -1,288 +1,304 @@ module Pione module Agent + # Broker is an agent for providing task workers to tuple space. class Broker < BasicAgent - # Balancer is a base class for balancing task workers. - class Balancer < PioneObject - # Create a new balancer. - def initialize(broker) - @broker = broker - end + set_agent_type :broker, self - def balance - raise NotImplementedError - end - end + # + # instance methods + # - # EasyBalancer is a balancer by ratios of tuple space server and task worker. - class EasyBalancer < Balancer - # see Balancer.new - def initialize(broker) - @broker = broker - end + attr_reader :task_worker_resource # resource size of task worker + attr_reader :tuple_space_lock # lock for tuple space table - # Balances by killing a task worker in max tuple server. - def balance - ratios = calc_resource_ratios - min = ratios.values.min - max = ratios.values.max - min_server = ratios.key(min) - max_server = ratios.key(max) + def initialize(option={}) + super() - return unless min_server - return unless max_server + @task_workers = Array.new # known task worker fronts + @tuple_space = Hash.new # known tuple space table + @task_worker_resource = option[:task_worker_resource] || 1 + @sleeping_time = option[:sleeping_time] || 1 + @spawnings = 0 # number of current spawning task worker + @tuple_space_lock = Monitor.new + @task_worker_lock = Monitor.new # lock for task worker table - if @broker.excess_task_workers > 0 and min_server - create_task_worker(min_server) - else - adjust_task_worker(min_server, max_server) - end - end + @option = option + @option[:spawn_task_worker] = true unless @option.has_key?(:spawn_task_worker) - # Calculates resource ratios of tuple space servers. - def calc_resource_ratios(revision={}) - ratio = {} - # make ratio table - @broker.tuple_space_servers.each do |ts| - begin - rev = revision.has_key?(ts) ? revision[ts] : 0 - current = timeout(1){ts.current_task_worker_size} + rev - resource = ts.task_worker_resource - # minimum resource is 1 - resource = 1.0 unless resource > 0 - ratio[ts] = current / resource.to_f - rescue Exception - # ignore - end - end - return ratio - end + # balancer + @balancer = Global.broker_task_worker_balancer.new(self) + end - # Creates a new task worker. - def create_task_worker(min_server) - @broker.create_task_worker(min_server) + # Return number of task workers the broker manages. + def quantity + @task_worker_lock.synchronize {@task_workers.size} + end - if Pione.debug_mode? - puts "create a new task worker in #{min_server}" - end - end + # Add the tuple space. + def add_tuple_space(tuple_space) + uuid = tuple_space.uuid - # Adjusts task worker size between tuple space servers. - def adjust_task_worker(min_server, max_server) - revision = {min_server => 1, max_server => -1} - new_ratios = calc_resource_ratios(revision) + # update tuple space table with the id + @tuple_space_lock.synchronize {@tuple_space[uuid] = tuple_space} - return unless new_ratios.has_key?(min_server) - return unless new_ratios.has_key?(max_server) - - if new_ratios[min_server] < new_ratios[max_server] - # move worker from max server to min server - max_workers = @broker.task_workers.select do |worker| - worker.tuple_space_server == max_server && worker.task_waiting? - end - if not(max_workers.empty?) - max_workers.first.terminate - - # for degging - if Pione.debug_mode? - puts "worker #{worker.uuid} moved from #{max_server} to #{min_server}" - end - end + # wakeup chain thread if it sleeps + @chain_threads.list.each do |thread| + if thread[:agent_state] and thread[:agent_state].current?(:sleep) + thread.run end end end - module BrokerMethod - # Adds the tuple space server. - def add_tuple_space_server(tuple_space_server) - @tuple_space_server_lock.synchronize do - @tuple_space_servers << tuple_space_server - end - end + # Get the tuple space. + def get_tuple_space(tuple_space_id) + @tuple_space_lock.synchronize {@tuple_space[tuple_space_id]} + end - # Gets a tuple space server by connection id. - def get_tuple_space_server(connection_id) - @assignment_table[connection_id] - end + # Return known tuple spaces. + def tuple_spaces + @tuple_space_lock.synchronize {@tuple_space.values} + end - # Return excess number of workers belongs to this broker. - def excess_task_workers + # Return excess number of workers belongs to this broker. + def excess_task_workers + @task_worker_lock.synchronize do @task_worker_resource - @task_workers.size - @spawnings end + end - # Return task wainting workers. - def task_waiting_workers - @task_workers.select {|worker| worker.status.task_waiting?} - end + # Create a task worker for the server. This method returns true if we + # suceeded to spawn the task worker, or returns false. + def create_task_worker(tuple_space) + res = true - # Return task processing workers. - def task_processing_workers - @task_workers.select {|worker| worker.status.task_processing?} - end + @task_worker_lock.synchronize do + @spawnings += 1 - # Return terminated task workers. - def terminated_task_workers - @task_workers.select {|worker| worker.status.terminated?} - end - - # Create a task worker for the server. - def create_task_worker(tuple_space_server) - connection_id = Util::UUID.generate - @assignment_table[connection_id] = tuple_space_server - Thread.new do + # spawn a new process of pione-task-worker command + if @option[:spawn_task_worker] begin - @spawnings += 1 - Agent[:task_worker].spawn(Global.front, connection_id, @features) - ensure - @spawnings -= 1 + spawner = Command::PioneTaskWorker.spawn(Global.features, tuple_space.uuid) + @task_workers << spawner.child_front + spawner.when_terminated {delete_task_worker(spawner.child_front)} + rescue Command::SpawnError => e + Log::Debug.system("broker agent failed to spawn a task worker.") + res = false end + else + @task_workers << Agent::TaskWorker.start(tuple_space, Global.expressional_features, @env) end + + @spawnings -= 1 end - # Deletes unavilable tuple space servers. - def check_tuple_space_servers - @tuple_space_server_lock.synchronize do - @tuple_space_servers.select! do |ts| - begin - timeout(1) { ts.ping } - rescue Exception - false - end + return res + end + + def delete_task_worker(worker) + @task_worker_lock.synchronize {@task_workers.delete(worker)} + end + + # Terminate first task worker that satisfies the condition. Return true if + def terminate_task_worker_if(&condition) + @task_worker_lock.synchronize do + @task_workers.each do |worker| + if condition.call(worker) + worker.terminate + @task_workers.delete(worker) + return true end end end + return false + end - # Update tuple space server list. - def update_tuple_space_servers(tuple_space_servers) - @tuple_space_server_lock.synchronize do - del_targets = @tuple_space_servers - tuple_space_servers - add_targets = tuple_space_servers - @tuple_space_servers + # Delete unavilable tuple space servers. + def check_tuple_space + @tuple_space_lock.synchronize do + @tuple_space.delete_if do |_, space| + not(Util.ignore_exception {timeout(1) {space.ping}}) + end + end + end - # bye - #del_targets.each do |ts_server| - # ts_server.write(Tuple[:bye].new(agent_type: agent_type, uuid: uuid)) - #end - # hello - #add_targets.each do |ts_server| - # ts_server.write(Tuple[:agent].new(agent_type: agent_type, uuid: uuid)) - #end + # Update tuple space list. + def update_tuple_space_list(tuple_spaces) + Thread.new do + begin + @tuple_space_lock.synchronize do + # clear and update tuple space list + @tuple_space = {} + tuple_spaces.each do |tuple_space| + Util.ignore_exception {timeout(1) {add_tuple_space(tuple_space)}} + end - # update - @tuple_space_servers = tuple_space_servers - - if Global.show_presence_notifier timeout(1) do - puts "broker's tuple space servers: %s" % [@tuple_space_servers] + Log::Debug.presence_notification do + "broker agent updated tuple space table: %s" % [@tuple_space.values.map{|space| space.__drburi}] + end end end + rescue Exception => e + check_tuple_space end - rescue Exception - check_tuple_space_servers end + return true end - include BrokerMethod + # + # agent activities + # - set_agent_type :broker + define_transition :count_tuple_space + define_transition :balance_task_worker + define_transition :sleep + define_transition :check_tuple_space + define_transition :check_task_worker_life - define_state :count_tuple_space_servers - define_state :creating_task_worker - define_state :balancing_task_worker - define_state :sleeping - define_state :checking_tuple_space_servers - - define_state_transition :initialized => :count_tuple_space_servers - define_state_transition :count_tuple_space_servers => lambda {|agent, res| - res > 0 ? :balancing_task_worker : :sleeping + chain :init => [:count_tuple_space, :check_task_worker_life] + chain :count_tuple_space => lambda {|agent, res| + res > 0 ? :balance_task_worker : :sleep } - define_state_transition :balancing_task_worker => :sleeping - define_state_transition :sleeping => :count_tuple_space_servers - define_state_transition :checking_tuple_space_servers => :count_tuple_space_servers + chain :balance_task_worker => lambda {|agent, rebalance| + rebalance ? :balance_task_worker : :sleep + } + chain :sleep => :count_tuple_space + chain :check_tuple_space => :count_tuple_space + chain :check_task_worker_life => :check_task_worker_life - define_exception_handler Exception => :checking_tuple_space_servers + define_exception_handler Restart => :check_tuple_space - attr_accessor :task_workers - attr_reader :tuple_space_servers - attr_reader :task_worker_resource + # + # transitions + # - # current spawning task worker number - attr_reader :spawnings + def transit_to_count_tuple_space + @tuple_space.size + end - # @api private - def initialize(features, data={}) - super() - @task_workers = [] - @tuple_space_servers = [] - @task_worker_resource = data[:task_worker_resource] || 1 - @sleeping_time = data[:sleeping_time] || 1 - @assignment_table = {} - @tuple_space_server_lock = Mutex.new - @spawnings = 0 - @features = features + def transit_to_balance_task_worker + @balancer.balance + end - # balancer - @balancer = EasyBalancer.new(self) - - # start agents - @task_worker_checker = Agent::TrivialRoutineWorker.new( - Proc.new do - @task_workers.delete_if do |worker| - begin - timeout(3) { worker.terminated? } - rescue Exception - true - end - end - sleep 1 - end - ) + def transit_to_check_tuple_space + check_tuple_space end - # @api private - def start - super - @task_worker_checker.start + def transit_to_sleep + if @tuple_space.size == 0 or excess_task_workers == 0 + sleep 3 + else + sleep 1 + end end - # Sends bye message to tuple space servers when the broker is destroyed. - def finalize - @tuple_space_server_lock.synchronize do - @tuple_space_servers.each do |ts_server| + def transit_to_check_task_worker_life + @task_worker_lock.synchronize do + @task_workers.delete_if do |worker| begin - ts_server.bye - rescue Exception - # ignore + timeout(1) { worker.ping } + false + rescue Exception => e + true end end end - super + sleep 1 end - private + # Send bye message to tuple spaces when the broker is destroyed. + def transit_to_terminate + @tuple_space_lock.synchronize do + @tuple_space.each do |_, tuple_space| + Util.ignore_exception {timeout(1) {tuple_space.bye}} + end + end + end + end - # @api private - def transit_to_initialized + # TaskWorkerBalancer is a base class for balancing task workers. + class TaskWorkerBalancer + # Create a new balancer. + def initialize(broker) + @broker = broker end - def transit_to_count_tuple_space_servers - @tuple_space_servers.size + # Execute task worker balancing. If this method returned true, broker + # executes rebalance chain with no span. If false, broker sleeps a little. + def balance + raise NotImplementedError end + end - def transit_to_balancing_task_worker - @balancer.balance + # EasyBalancer is a balancer by ratios of tuple space and task worker. + class EasyTaskWorkerBalancer < TaskWorkerBalancer + # see Balancer.new + def initialize(broker) + @broker = broker end - def transit_to_checking_tuple_space_servers - check_tuple_space_servers + # Balance task worker ratio by creating a new task worker in minimum + # tuple space or killing a task worker in maximum. + def balance + ratios = calc_resource_ratios + min = ratios.values.min + max = ratios.values.max + min_server = ratios.key(min) + max_server = ratios.key(max) + + return false unless min_server + return false unless max_server + + if @broker.excess_task_workers > 0 and min_server + return create_task_worker(min_server) + else + return adjust_task_worker(min_server, max_server) + end end - # Transits to the state +sleeping+. - def transit_to_sleeping - if @tuple_space_servers.size == 0 or excess_task_workers == 0 - sleep 1 + # Calculate resource ratios of tuple space servers. + def calc_resource_ratios(revision={}) + ratio = {} + # make ratio table + @broker.tuple_space_lock.synchronize do + @broker.tuple_spaces.each do |tuple_space| + rev = revision.has_key?(tuple_space) ? revision[tuple_space] : 0 + current = timeout(1){tuple_space.current_task_worker_size} + rev + resource = tuple_space.task_worker_resource + # minimum resource is 1 + resource = 1 unless resource > 0 + ratio[tuple_space] = current / resource.to_f + end end + return ratio end - end - set_agent Broker + # Creates a new task worker. + def create_task_worker(min_server) + return @broker.create_task_worker(min_server) + end + + # Adjusts task worker size between tuple space servers. + def adjust_task_worker(min_server, max_server) + revision = {min_server => 1, max_server => -1} + new_ratios = calc_resource_ratios(revision) + + # failed to calculate tuple space ratio + return unless new_ratios.has_key?(min_server) + return unless new_ratios.has_key?(max_server) + + # kill a task worker for moving worker from max server to min server + if new_ratios[min_server] < new_ratios[max_server] + if @broker.terminate_task_worker_if do |worker| + worker.tuple_space == max_server && worker.states.any?{|s| s.current?(:take_task)} + end + return true + end + end + + # failed to adjust task workers + return false + end + end end end