Sha256: 83110d4787c477ceb4bdb9826b7e6d35d4c583587ccd8356fd4ea083c9d9531b
Contents?: true
Size: 1.8 KB
Versions: 1
Compression:
Stored size: 1.8 KB
Contents
module DRbQS class TaskClient attr_reader :node_id, :calculating_task def initialize(node_id, queue, result, logger = nil) @node_id = node_id @queue = queue @result = result @calculating_task = nil @task_queue = Queue.new @result_queue = Queue.new @logger = logger end def calculating? !!@calculating_task end def task_empty? @task_queue.empty? end def result_empty? @result_queue.empty? end def dequeue_result @result_queue.deq end private :dequeue_result def queue_task(task_id, ary) @task_queue.enq(ary) @calculating_task = task_id end def dequeue_task @task_queue.deq end def get_task begin @queue.take([Fixnum, nil, Symbol, nil], 0) rescue Rinda::RequestExpiredError nil end end def add_new_task unless @calculating_task if ary = get_task task_id, obj, method_sym, args = ary queue_task(task_id, [obj, method_sym, args]) @logger.info("Send accept signal: node #{@node_id} caluclating #{@calculating_task}") if @logger @result.write([:accept, @calculating_task, @node_id]) end end end def send_result if !result_empty? result = dequeue_result @logger.info("Send result: #{@calculating_task}") { result.inspect } if @logger @result.write([:result, @calculating_task, @node_id, result]) @calculating_task = nil end end def queue_result(result) @result_queue.enq(result) end def dump_result_queue results = [] while !result_empty? results << dequeue_result end if results.size > 0 Marshal.dump(results) else nil end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
drbqs-0.0.10 | lib/drbqs/task_client.rb |