Sha256: a6ef6d8b3a067b2a60137d428025de26a42b93919271b0262b5bf5e7e9b095a8

Contents?: true

Size: 1.53 KB

Versions: 1

Compression:

Stored size: 1.53 KB

Contents

module Noda


class JobWorker
  attr_reader :thread
  attr_accessor :max_retry_connect , :wait_time_to_retry
  def initialize( job_server_addr="localhost",job_server_port="10001" )
    @server_uri  = "druby://#{job_server_addr}:#{job_server_port}"
    @max_retry_connect  = 30
    @wait_time_to_retry =  2
    require "socket" 
    @local_addr = IPSocket::getaddress(Socket::gethostname)
    self.connect
    self
  end
  def connect_job_server
    error_conter = 0
    begin 
      @job =DRbObject.new_with_uri(@server_uri)
      @job.hash_table
      @logger = @job.logger
    rescue DRb::DRbConnError => e
      error_conter +=1
      raise e if error_conter > @max_retry_connect
      sleep @wait_time_to_retry 
      retry
    end
  end
  def handle_task()
    # @logger.info("self.class@#{@local_addr}#{self.object_id}"){"i try to pop a task."} 
    task = @job.input.pop
    if task.class ==  DRb::DRbUnknown
      self.load_class(task.name)
      task = task.reload
    end
    result = task.do_task(@job.hash_table)
    @job.output.push result
  end
  def load_class(name)
    s = @job.task_class(name)
    Noda.module_eval(s)
  end
  def init_thread
    @table = @job.hash_table
    @thread= Thread.new{
        loop{
          self.handle_task()
          sleep 0.001
        }
    }
  end
  def connect
    self.connect_job_server
  end
  def start
    self.init_thread
    @thread.join
  end
  def status
    @thread.status if @thread
  end
  def stop
    @thread.kill
  end
end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
noda-0.0.10 lib/noda/job_worker.rb