Sha256: 8968cce3b75265c9682ca8db56ecb32faf092c7c21d4210fd962a4006fdc151a
Contents?: true
Size: 1.66 KB
Versions: 4
Compression:
Stored size: 1.66 KB
Contents
module DRbQS class TaskClient attr_reader :node_id, :calculating_task def initialize(node_id, queue, result, logger = nil) @node_id = node_id @queue = queue @result = result @calculating_task = nil @task_queue = Queue.new @result_queue = Queue.new @logger = logger end def task_empty? @task_queue.empty? end def result_empty? @result_queue.empty? end def dequeue_result @result_queue.deq end private :dequeue_result def queue_task(task_id, ary) @task_queue.enq(ary) @calculating_task = task_id end def dequeue_task @task_queue.deq end def add_new_task unless @calculating_task begin task_id, obj, method_sym, args = @queue.take([Fixnum, nil, Symbol, nil], 0) queue_task(task_id, [obj, method_sym, args]) @logger.info("Send accept signal: node #{@node_id} caluclating #{@calculating_task}") if @logger @result.write([:accept, @calculating_task, @node_id]) rescue Rinda::RequestExpiredError end end end def send_result if !result_empty? result = dequeue_result @logger.info("Send result: #{@calculating_task}") { result.inspect } if @logger @result.write([:result, @calculating_task, result]) @calculating_task = nil end end def queue_result(result) @result_queue.enq(result) end def dump_result_queue results = [] while !result_empty? results << dequeue_result end if results.size > 0 Marshal.dump(results) else nil end end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
drbqs-0.0.9 | lib/drbqs/task_client.rb |
drbqs-0.0.8 | lib/drbqs/task_client.rb |
drbqs-0.0.7 | lib/drbqs/task_client.rb |
drbqs-0.0.6 | lib/drbqs/task_client.rb |