Sha256: 83110d4787c477ceb4bdb9826b7e6d35d4c583587ccd8356fd4ea083c9d9531b

Contents?: true

Size: 1.8 KB

Versions: 1

Compression:

Stored size: 1.8 KB

Contents

module DRbQS
  class TaskClient
    attr_reader :node_id, :calculating_task

    def initialize(node_id, queue, result, logger = nil)
      @node_id = node_id
      @queue = queue
      @result = result
      @calculating_task = nil
      @task_queue = Queue.new
      @result_queue = Queue.new
      @logger = logger
    end

    def calculating?
      !!@calculating_task
    end

    def task_empty?
      @task_queue.empty?
    end

    def result_empty?
      @result_queue.empty?
    end

    def dequeue_result
      @result_queue.deq
    end
    private :dequeue_result

    def queue_task(task_id, ary)
      @task_queue.enq(ary)
      @calculating_task = task_id
    end

    def dequeue_task
      @task_queue.deq
    end

    def get_task
      begin
        @queue.take([Fixnum, nil, Symbol, nil], 0)
      rescue Rinda::RequestExpiredError
        nil
      end
    end

    def add_new_task
      unless @calculating_task
        if ary = get_task
          task_id, obj, method_sym, args = ary
          queue_task(task_id, [obj, method_sym, args])
          @logger.info("Send accept signal: node #{@node_id} caluclating #{@calculating_task}") if @logger
          @result.write([:accept, @calculating_task, @node_id])
        end
      end
    end

    def send_result
      if !result_empty?
        result = dequeue_result
        @logger.info("Send result: #{@calculating_task}") { result.inspect } if @logger
        @result.write([:result, @calculating_task, @node_id, result])
        @calculating_task = nil
      end
    end

    def queue_result(result)
      @result_queue.enq(result)
    end

    def dump_result_queue
      results = []
      while !result_empty?
        results << dequeue_result
      end
      if results.size > 0
        Marshal.dump(results)
      else
        nil
      end
    end
  end

end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
drbqs-0.0.10 lib/drbqs/task_client.rb