lib/noda/job_worker.rb in noda-0.0.10 vs lib/noda/job_worker.rb in noda-0.0.11
- old
+ new
@@ -1,20 +1,33 @@
module Noda
+# ジョブワーカー
+#
+# ジョブを待ち受けるスレッドです。
+# Taskを取りだして実行します.
+# ip=127.0.0.1
+# w=Noda::JobWorker.new("#{ip}", "10001")
+# t = DRb.start_service("druby://#{ip}:10101",w)
+# w.start
+#
+#
class JobWorker
attr_reader :thread
attr_accessor :max_retry_connect , :wait_time_to_retry
- def initialize( job_server_addr="localhost",job_server_port="10001" )
- @server_uri = "druby://#{job_server_addr}:#{job_server_port}"
+ # * server_addr ジョブサーバーアドレス、またはホスト名
+ # * server_port ジョブサーバーポート
+ def initialize( server_addr="localhost",server_port="10001",q="" )
+ @server_uri = "druby://#{server_addr}:#{server_port}/?#{q}"
@max_retry_connect = 30
@wait_time_to_retry = 2
require "socket"
@local_addr = IPSocket::getaddress(Socket::gethostname)
self.connect
self
end
+ # 内部的に使います。ジョブサーバーへ接続
def connect_job_server
error_conter = 0
begin
@job =DRbObject.new_with_uri(@server_uri)
@job.hash_table
@@ -24,42 +37,64 @@
raise e if error_conter > @max_retry_connect
sleep @wait_time_to_retry
retry
end
end
+ # 担当ジョブからタスクを実行します.
+ #
+ # タスクは do_task(hash)実装が必須
+ # タスクのクラス定義はrequire必須.(start前にrequire)
+ # タスクのクラス定義はサーバー側から自動ロード(eval)します.
def handle_task()
# @logger.info("self.class@#{@local_addr}#{self.object_id}"){"i try to pop a task."}
task = @job.input.pop
if task.class == DRb::DRbUnknown
self.load_class(task.name)
task = task.reload
end
result = task.do_task(@job.hash_table)
@job.output.push result
end
+ # クラス定義をEvalする。クラス定義はサーバーから取り出す.
+ # ワーカー側にクラス定義を動的に渡すときに使います.
+ # *name クラス名
def load_class(name)
s = @job.task_class(name)
Noda.module_eval(s)
end
+ # ワーカーのメインスレッドを起動します.start で使います.
def init_thread
@table = @job.hash_table
@thread= Thread.new{
loop{
self.handle_task()
sleep 0.001
}
}
end
+
+ # サーバーに接続します
+ #
def connect
self.connect_job_server
end
+
+ # 処理を開始します.
+ #
+ # threadを返します. worker を起動しっぱなしにするなら thread.joinしてください
def start
self.init_thread
@thread.join
end
+
+ # ワーカースレッドの状態を取り出します.
+ #
+ # マルチスレッドでブロックされてるとsleep になります
def status
@thread.status if @thread
end
+
+ # スレッド停止します.このインスタンスは死にません.start で再起動します.
def stop
@thread.kill
end
end