lib/pione/agent/broker.rb in pione-0.2.2 vs lib/pione/agent/broker.rb in pione-0.3.0
- old
+ new
@@ -1,288 +1,304 @@
module Pione
module Agent
+ # Broker is an agent for providing task workers to tuple space.
class Broker < BasicAgent
- # Balancer is a base class for balancing task workers.
- class Balancer < PioneObject
- # Create a new balancer.
- def initialize(broker)
- @broker = broker
- end
+ set_agent_type :broker, self
- def balance
- raise NotImplementedError
- end
- end
+ #
+ # instance methods
+ #
- # EasyBalancer is a balancer by ratios of tuple space server and task worker.
- class EasyBalancer < Balancer
- # see Balancer.new
- def initialize(broker)
- @broker = broker
- end
+ attr_reader :task_worker_resource # resource size of task worker
+ attr_reader :tuple_space_lock # lock for tuple space table
- # Balances by killing a task worker in max tuple server.
- def balance
- ratios = calc_resource_ratios
- min = ratios.values.min
- max = ratios.values.max
- min_server = ratios.key(min)
- max_server = ratios.key(max)
+ def initialize(option={})
+ super()
- return unless min_server
- return unless max_server
+ @task_workers = Array.new # known task worker fronts
+ @tuple_space = Hash.new # known tuple space table
+ @task_worker_resource = option[:task_worker_resource] || 1
+ @sleeping_time = option[:sleeping_time] || 1
+ @spawnings = 0 # number of current spawning task worker
+ @tuple_space_lock = Monitor.new
+ @task_worker_lock = Monitor.new # lock for task worker table
- if @broker.excess_task_workers > 0 and min_server
- create_task_worker(min_server)
- else
- adjust_task_worker(min_server, max_server)
- end
- end
+ @option = option
+ @option[:spawn_task_worker] = true unless @option.has_key?(:spawn_task_worker)
- # Calculates resource ratios of tuple space servers.
- def calc_resource_ratios(revision={})
- ratio = {}
- # make ratio table
- @broker.tuple_space_servers.each do |ts|
- begin
- rev = revision.has_key?(ts) ? revision[ts] : 0
- current = timeout(1){ts.current_task_worker_size} + rev
- resource = ts.task_worker_resource
- # minimum resource is 1
- resource = 1.0 unless resource > 0
- ratio[ts] = current / resource.to_f
- rescue Exception
- # ignore
- end
- end
- return ratio
- end
+ # balancer
+ @balancer = Global.broker_task_worker_balancer.new(self)
+ end
- # Creates a new task worker.
- def create_task_worker(min_server)
- @broker.create_task_worker(min_server)
+ # Return number of task workers the broker manages.
+ def quantity
+ @task_worker_lock.synchronize {@task_workers.size}
+ end
- if Pione.debug_mode?
- puts "create a new task worker in #{min_server}"
- end
- end
+ # Add the tuple space.
+ def add_tuple_space(tuple_space)
+ uuid = tuple_space.uuid
- # Adjusts task worker size between tuple space servers.
- def adjust_task_worker(min_server, max_server)
- revision = {min_server => 1, max_server => -1}
- new_ratios = calc_resource_ratios(revision)
+ # update tuple space table with the id
+ @tuple_space_lock.synchronize {@tuple_space[uuid] = tuple_space}
- return unless new_ratios.has_key?(min_server)
- return unless new_ratios.has_key?(max_server)
-
- if new_ratios[min_server] < new_ratios[max_server]
- # move worker from max server to min server
- max_workers = @broker.task_workers.select do |worker|
- worker.tuple_space_server == max_server && worker.task_waiting?
- end
- if not(max_workers.empty?)
- max_workers.first.terminate
-
- # for degging
- if Pione.debug_mode?
- puts "worker #{worker.uuid} moved from #{max_server} to #{min_server}"
- end
- end
+ # wakeup chain thread if it sleeps
+ @chain_threads.list.each do |thread|
+ if thread[:agent_state] and thread[:agent_state].current?(:sleep)
+ thread.run
end
end
end
- module BrokerMethod
- # Adds the tuple space server.
- def add_tuple_space_server(tuple_space_server)
- @tuple_space_server_lock.synchronize do
- @tuple_space_servers << tuple_space_server
- end
- end
+ # Get the tuple space.
+ def get_tuple_space(tuple_space_id)
+ @tuple_space_lock.synchronize {@tuple_space[tuple_space_id]}
+ end
- # Gets a tuple space server by connection id.
- def get_tuple_space_server(connection_id)
- @assignment_table[connection_id]
- end
+ # Return known tuple spaces.
+ def tuple_spaces
+ @tuple_space_lock.synchronize {@tuple_space.values}
+ end
- # Return excess number of workers belongs to this broker.
- def excess_task_workers
+ # Return excess number of workers belongs to this broker.
+ def excess_task_workers
+ @task_worker_lock.synchronize do
@task_worker_resource - @task_workers.size - @spawnings
end
+ end
- # Return task wainting workers.
- def task_waiting_workers
- @task_workers.select {|worker| worker.status.task_waiting?}
- end
+ # Create a task worker for the server. This method returns true if we
+ # suceeded to spawn the task worker, or returns false.
+ def create_task_worker(tuple_space)
+ res = true
- # Return task processing workers.
- def task_processing_workers
- @task_workers.select {|worker| worker.status.task_processing?}
- end
+ @task_worker_lock.synchronize do
+ @spawnings += 1
- # Return terminated task workers.
- def terminated_task_workers
- @task_workers.select {|worker| worker.status.terminated?}
- end
-
- # Create a task worker for the server.
- def create_task_worker(tuple_space_server)
- connection_id = Util::UUID.generate
- @assignment_table[connection_id] = tuple_space_server
- Thread.new do
+ # spawn a new process of pione-task-worker command
+ if @option[:spawn_task_worker]
begin
- @spawnings += 1
- Agent[:task_worker].spawn(Global.front, connection_id, @features)
- ensure
- @spawnings -= 1
+ spawner = Command::PioneTaskWorker.spawn(Global.features, tuple_space.uuid)
+ @task_workers << spawner.child_front
+ spawner.when_terminated {delete_task_worker(spawner.child_front)}
+ rescue Command::SpawnError => e
+ Log::Debug.system("broker agent failed to spawn a task worker.")
+ res = false
end
+ else
+ @task_workers << Agent::TaskWorker.start(tuple_space, Global.expressional_features, @env)
end
+
+ @spawnings -= 1
end
- # Deletes unavilable tuple space servers.
- def check_tuple_space_servers
- @tuple_space_server_lock.synchronize do
- @tuple_space_servers.select! do |ts|
- begin
- timeout(1) { ts.ping }
- rescue Exception
- false
- end
+ return res
+ end
+
+ def delete_task_worker(worker)
+ @task_worker_lock.synchronize {@task_workers.delete(worker)}
+ end
+
+ # Terminate first task worker that satisfies the condition. Return true if
+ def terminate_task_worker_if(&condition)
+ @task_worker_lock.synchronize do
+ @task_workers.each do |worker|
+ if condition.call(worker)
+ worker.terminate
+ @task_workers.delete(worker)
+ return true
end
end
end
+ return false
+ end
- # Update tuple space server list.
- def update_tuple_space_servers(tuple_space_servers)
- @tuple_space_server_lock.synchronize do
- del_targets = @tuple_space_servers - tuple_space_servers
- add_targets = tuple_space_servers - @tuple_space_servers
+ # Delete unavilable tuple space servers.
+ def check_tuple_space
+ @tuple_space_lock.synchronize do
+ @tuple_space.delete_if do |_, space|
+ not(Util.ignore_exception {timeout(1) {space.ping}})
+ end
+ end
+ end
- # bye
- #del_targets.each do |ts_server|
- # ts_server.write(Tuple[:bye].new(agent_type: agent_type, uuid: uuid))
- #end
- # hello
- #add_targets.each do |ts_server|
- # ts_server.write(Tuple[:agent].new(agent_type: agent_type, uuid: uuid))
- #end
+ # Update tuple space list.
+ def update_tuple_space_list(tuple_spaces)
+ Thread.new do
+ begin
+ @tuple_space_lock.synchronize do
+ # clear and update tuple space list
+ @tuple_space = {}
+ tuple_spaces.each do |tuple_space|
+ Util.ignore_exception {timeout(1) {add_tuple_space(tuple_space)}}
+ end
- # update
- @tuple_space_servers = tuple_space_servers
-
- if Global.show_presence_notifier
timeout(1) do
- puts "broker's tuple space servers: %s" % [@tuple_space_servers]
+ Log::Debug.presence_notification do
+ "broker agent updated tuple space table: %s" % [@tuple_space.values.map{|space| space.__drburi}]
+ end
end
end
+ rescue Exception => e
+ check_tuple_space
end
- rescue Exception
- check_tuple_space_servers
end
+ return true
end
- include BrokerMethod
+ #
+ # agent activities
+ #
- set_agent_type :broker
+ define_transition :count_tuple_space
+ define_transition :balance_task_worker
+ define_transition :sleep
+ define_transition :check_tuple_space
+ define_transition :check_task_worker_life
- define_state :count_tuple_space_servers
- define_state :creating_task_worker
- define_state :balancing_task_worker
- define_state :sleeping
- define_state :checking_tuple_space_servers
-
- define_state_transition :initialized => :count_tuple_space_servers
- define_state_transition :count_tuple_space_servers => lambda {|agent, res|
- res > 0 ? :balancing_task_worker : :sleeping
+ chain :init => [:count_tuple_space, :check_task_worker_life]
+ chain :count_tuple_space => lambda {|agent, res|
+ res > 0 ? :balance_task_worker : :sleep
}
- define_state_transition :balancing_task_worker => :sleeping
- define_state_transition :sleeping => :count_tuple_space_servers
- define_state_transition :checking_tuple_space_servers => :count_tuple_space_servers
+ chain :balance_task_worker => lambda {|agent, rebalance|
+ rebalance ? :balance_task_worker : :sleep
+ }
+ chain :sleep => :count_tuple_space
+ chain :check_tuple_space => :count_tuple_space
+ chain :check_task_worker_life => :check_task_worker_life
- define_exception_handler Exception => :checking_tuple_space_servers
+ define_exception_handler Restart => :check_tuple_space
- attr_accessor :task_workers
- attr_reader :tuple_space_servers
- attr_reader :task_worker_resource
+ #
+ # transitions
+ #
- # current spawning task worker number
- attr_reader :spawnings
+ def transit_to_count_tuple_space
+ @tuple_space.size
+ end
- # @api private
- def initialize(features, data={})
- super()
- @task_workers = []
- @tuple_space_servers = []
- @task_worker_resource = data[:task_worker_resource] || 1
- @sleeping_time = data[:sleeping_time] || 1
- @assignment_table = {}
- @tuple_space_server_lock = Mutex.new
- @spawnings = 0
- @features = features
+ def transit_to_balance_task_worker
+ @balancer.balance
+ end
- # balancer
- @balancer = EasyBalancer.new(self)
-
- # start agents
- @task_worker_checker = Agent::TrivialRoutineWorker.new(
- Proc.new do
- @task_workers.delete_if do |worker|
- begin
- timeout(3) { worker.terminated? }
- rescue Exception
- true
- end
- end
- sleep 1
- end
- )
+ def transit_to_check_tuple_space
+ check_tuple_space
end
- # @api private
- def start
- super
- @task_worker_checker.start
+ def transit_to_sleep
+ if @tuple_space.size == 0 or excess_task_workers == 0
+ sleep 3
+ else
+ sleep 1
+ end
end
- # Sends bye message to tuple space servers when the broker is destroyed.
- def finalize
- @tuple_space_server_lock.synchronize do
- @tuple_space_servers.each do |ts_server|
+ def transit_to_check_task_worker_life
+ @task_worker_lock.synchronize do
+ @task_workers.delete_if do |worker|
begin
- ts_server.bye
- rescue Exception
- # ignore
+ timeout(1) { worker.ping }
+ false
+ rescue Exception => e
+ true
end
end
end
- super
+ sleep 1
end
- private
+ # Send bye message to tuple spaces when the broker is destroyed.
+ def transit_to_terminate
+ @tuple_space_lock.synchronize do
+ @tuple_space.each do |_, tuple_space|
+ Util.ignore_exception {timeout(1) {tuple_space.bye}}
+ end
+ end
+ end
+ end
- # @api private
- def transit_to_initialized
+ # TaskWorkerBalancer is a base class for balancing task workers.
+ class TaskWorkerBalancer
+ # Create a new balancer.
+ def initialize(broker)
+ @broker = broker
end
- def transit_to_count_tuple_space_servers
- @tuple_space_servers.size
+ # Execute task worker balancing. If this method returned true, broker
+ # executes rebalance chain with no span. If false, broker sleeps a little.
+ def balance
+ raise NotImplementedError
end
+ end
- def transit_to_balancing_task_worker
- @balancer.balance
+ # EasyBalancer is a balancer by ratios of tuple space and task worker.
+ class EasyTaskWorkerBalancer < TaskWorkerBalancer
+ # see Balancer.new
+ def initialize(broker)
+ @broker = broker
end
- def transit_to_checking_tuple_space_servers
- check_tuple_space_servers
+ # Balance task worker ratio by creating a new task worker in minimum
+ # tuple space or killing a task worker in maximum.
+ def balance
+ ratios = calc_resource_ratios
+ min = ratios.values.min
+ max = ratios.values.max
+ min_server = ratios.key(min)
+ max_server = ratios.key(max)
+
+ return false unless min_server
+ return false unless max_server
+
+ if @broker.excess_task_workers > 0 and min_server
+ return create_task_worker(min_server)
+ else
+ return adjust_task_worker(min_server, max_server)
+ end
end
- # Transits to the state +sleeping+.
- def transit_to_sleeping
- if @tuple_space_servers.size == 0 or excess_task_workers == 0
- sleep 1
+ # Calculate resource ratios of tuple space servers.
+ def calc_resource_ratios(revision={})
+ ratio = {}
+ # make ratio table
+ @broker.tuple_space_lock.synchronize do
+ @broker.tuple_spaces.each do |tuple_space|
+ rev = revision.has_key?(tuple_space) ? revision[tuple_space] : 0
+ current = timeout(1){tuple_space.current_task_worker_size} + rev
+ resource = tuple_space.task_worker_resource
+ # minimum resource is 1
+ resource = 1 unless resource > 0
+ ratio[tuple_space] = current / resource.to_f
+ end
end
+ return ratio
end
- end
- set_agent Broker
+ # Creates a new task worker.
+ def create_task_worker(min_server)
+ return @broker.create_task_worker(min_server)
+ end
+
+ # Adjusts task worker size between tuple space servers.
+ def adjust_task_worker(min_server, max_server)
+ revision = {min_server => 1, max_server => -1}
+ new_ratios = calc_resource_ratios(revision)
+
+ # failed to calculate tuple space ratio
+ return unless new_ratios.has_key?(min_server)
+ return unless new_ratios.has_key?(max_server)
+
+ # kill a task worker for moving worker from max server to min server
+ if new_ratios[min_server] < new_ratios[max_server]
+ if @broker.terminate_task_worker_if do |worker|
+ worker.tuple_space == max_server && worker.states.any?{|s| s.current?(:take_task)}
+ end
+ return true
+ end
+ end
+
+ # failed to adjust task workers
+ return false
+ end
+ end
end
end