lib/pione/command/pione-task-worker.rb in pione-0.3.2 vs lib/pione/command/pione-task-worker.rb in pione-0.4.0
- old
+ new
@@ -1,172 +1,222 @@
module Pione
module Command
- # This is a body for +pione-task-worker+ command.
+ # `PioneTaskWorker` is a command that runs pione task worker agents.
class PioneTaskWorker < BasicCommand
#
- # basic informations
- #
-
- command_name "pione-task-worker" do |cmd|
- "front: %s, parent: %s" % [Global.front.uri, cmd.option[:parent_front].uri]
- end
-
- command_banner(Util::Indentation.cut(<<-TEXT))
- Run a task worker process. This command assumes to be launched by
- pione-client or pione-broker, so you should not execute this by hand.
- TEXT
-
- command_front Front::TaskWorkerFront
-
- #
- # options
- #
-
- use_option :color
- use_option :debug
- use_option :my_ip_address
- use_option :parent_front
- use_option :features
-
- define_option(:tuple_space_id) do |item|
- item.long = '--tuple-space-id=UUID'
- item.desc = 'tuple space id that the worker joins'
- item.requisite = true
- item.value = proc {|id| id}
- end
-
- #
# class methods
#
- # Create a new process of +pione-task-worker+ command.
- def self.spawn(features, tuple_space_id)
- spawner = Spawner.new("pione-task-worker")
+ # Create a new process of `pione-task-worker` command.
+ def self.spawn(model, features, tuple_space_id)
+ spawner = Spawner.new(model, "pione-task-worker")
# debug options
spawner.option("--debug=system") if Global.debug_system
spawner.option("--debug=ignored_exception") if Global.debug_ignored_exception
spawner.option("--debug=rule_engine") if Global.debug_rule_engine
spawner.option("--debug=communication") if Global.debug_communication
- spawner.option("--debug=presence_notification") if Global.debug_presence_notification
+ spawner.option("--debug=notification") if Global.debug_notification
# requisite options
- spawner.option("--parent-front", Global.front.uri)
+ spawner.option("--parent-front", model[:front].uri.to_s)
spawner.option("--tuple-space-id", tuple_space_id)
spawner.option("--features", features) if features
- # optionals
- spawner.option("--color") if Global.color_enabled
+ # others
+ spawner.option("--color", Global.color_enabled)
+ spawner.option("--file-cache-method", System::FileCache.cache_method.name.to_s)
+ spawner.option("--file-sliding", Global.file_sliding)
spawner.spawn # this method returns child front
end
#
+ # informations
+ #
+
+ define(:toplevel, true)
+ define(:name, "pione-task-worker")
+ define(:desc, "process tasks")
+ define(:front, Front::TaskWorkerFront)
+
+ #
+ # options
+ #
+
+ option CommonOption.color
+ option CommonOption.debug
+ option CommonOption.communication_address
+ option CommonOption.parent_front
+ option CommonOption.features
+ option CommonOption.file_cache_method
+ option CommonOption.file_sliding
+
+ option(:tuple_space_id) do |item|
+ item.type = :string
+ item.long = '--tuple-space-id'
+ item.arg = 'UUID'
+ item.desc = 'Tuple space ID that the worker joins'
+ item.requisite = true
+ end
+
+ #
# instance methods
#
attr_reader :agent
attr_reader :tuple_space_server
#
# command lifecycle: setup phase
#
- setup_phase :timeout => 15
- setup :parent_process_connection, :module => CommonCommandAction
- setup :tuple_space
- setup :agent
- setup :base_location
- setup :job_terminator
+ phase(:setup) do |item|
+ item.configure(:timeout => 15)
- # Get tuple space from parent process and test the connection.
- def setup_tuple_space
- @tuple_space = option[:parent_front].get_tuple_space(option[:tuple_space_id])
+ item << ProcessAction.connect_parent
+ item << :tuple_space
+ item << :task_worker_agent
+ item << :job_terminator
+ item << :base_location
+ end
- unless @tuple_space
- abort("%s cannot get tuple space \"%s\"." % [command_name, option[:tuple_space_id]])
+ setup(:tuple_space) do |item|
+ item.desc = "Get a tuple space from parent"
+
+ item.assign(:tuple_space) do
+ model[:parent_front].get_tuple_space(model[:tuple_space_id])
end
- if Util.error?(:timeout => 3) {@tuple_space.uuid}
- abort("%s cannot connect to tuple space." % command_name)
+ item.process do
+ test(not(model[:tuple_space]))
+
+ arg = {name: cmd.name, id: option[:tuple_space_id]}
+ cmd.abort('"%{name}" cannot get the tuple space "%{id}".' % arg)
end
+
+ item.process do
+ if Util.error?(:timeout => 3) {model[:tuple_space].uuid}
+ cmd.abort('"%{name}" cannot connect to tuple space.' % {name: cmd.name})
+ end
+ end
end
- # Create a task worker agent.
- def setup_agent
- @agent = Agent::TaskWorker.new(@tuple_space, Global.expressional_features)
- rescue Agent::TupleSpaceError => e
- abort(e.message)
+ setup(:task_worker_agent) do |item|
+ item.desc = "Create a task worker agent"
+
+ item.assign(:task_worker_agent) do
+ Agent::TaskWorker.new(model[:tuple_space], Global.expressional_features)
+ end
+
+ item.exception(Agent::TupleSpaceError) do |e|
+ cmd.abort(e)
+ end
end
- # Setup base location.
- def setup_base_location
- if @tuple_space.base_location.kind_of?(Location::DropboxLocation)
- Location::Dropbox.init(@tuple_space)
- unless Location::Dropbox.ready?
- abort("You aren't ready to access Dropbox.")
+ setup(:base_location) do |item|
+ item.desc = "Get a base location"
+
+ item.assign(:base_location) do
+ model[:tuple_space].base_location
+ end
+
+ item.process do
+ # enable Dropbox location
+ if model[:base_location].is_a?(Location::DropboxLocation)
+ begin
+ Location::DropboxLocation.enable(model[:tuple_space])
+ rescue Location::DropboxLocationUnavailable => e
+ arg = {addr: model[:base_location].address}
+ cmd.abort('Base location "%{addr}" is on Dropbox, but it is not ready.' % arg)
+ end
end
end
end
- # Create a job terminator and setup the action.
- def setup_job_terminator
- @job_terminator = Agent::JobTerminator.new(@tuple_space) do |status|
- if status.error?
- abort("pione-task-worker catched the error: %s" % status.exception.message)
- else
- terminate
+ setup(:job_terminator) do |item|
+ item.desc = "Create a job terminator and setup the action"
+
+ item.assign(:job_terminator) do
+ Agent::JobTerminator.new(model[:tuple_space]) do |status|
+ if status.error?
+ cmd.abort('"%s" catched the error: %s' % [cmd.name, status.message])
+ else
+ cmd.terminate
+ end
end
end
end
#
# command lifecycle: execution phase
#
- handle_execution_exception(DRb::DRbConnError) do |cmd, e|
- Log::Debug.system do
- "%s goes termination phase because the exception was catched: %s" % [cmd.command_name, e.message]
+ phase(:execution) do |item|
+ item << :job_terminator
+ item << :task_worker_agent
+
+ item.exception(DRb::DRbConnError) do |e|
+ Log::Debug.system do
+ "%s goes termination phase because the exception was catched: %s" % [cmd.name, e.message]
+ end
+ cmd.terminate
end
- cmd.terminate
end
- execute :job_terminator
- execute :agent
- # Start the job terminator.
- def execute_job_terminator
- @job_terminator.start
+ execution(:job_terminator) do |item|
+ item.desc = "Start the job terminator"
+
+ item.process do
+ model[:job_terminator].start
+ end
end
- # Start task worker activity and wait the termination.
- def execute_agent
- @agent.start
- @agent.wait_until_terminated(nil)
+ execution(:task_worker_agent) do |item|
+ item.desc = "Start task worker activity and wait the termination"
+
+ item.process do
+ model[:task_worker_agent].start
+ model[:task_worker_agent].wait_until_terminated(nil)
+ end
end
#
# command lifecycle: termination phase
#
- termination_phase :timeout => 10
- terminate :job_terminator
- terminate :agent
- terminate :parent_process_connection, :module => CommonCommandAction
+ phase(:termination) do |item|
+ item.configure(:timeout => 10)
+ item << ProcessAction.disconnect_parent
+ item << :job_terminator
+ item << :task_worker_agent
+ end
- # Terminate job terminator.
- def terminate_job_terminator
- if @job_terminator and not(@job_terminator.terminated?)
- @job_terminator.terminate
+ termination(:job_terminator) do |item|
+ item.desc = "Terminate job terminator agent"
+
+ item.condition do
+ test(model[:job_terminator])
+ test(not(model[:job_terminator].terminated?))
end
+
+ item.process do
+ model[:job_terminator].terminate
+ end
end
- # Terminate task worker agent.
- def terminate_agent
- if @agent
- @agent.terminate
- @agent.wait_until_terminated(nil)
+ termination(:task_worker_agent) do |item|
+ item.desc = "Terminate task worker agent"
+
+ item.condition do
+ test(model[:task_worker_agent])
+ test(not(model[:task_worker_agent].terminated?))
end
+
+ item.process do
+ model[:task_worker_agent].terminate
+ model[:task_worker_agent].wait_until_terminated(nil)
+ end
end
end
end
end
-