lib/drbqs/queue.rb in drbqs-0.0.1 vs lib/drbqs/queue.rb in drbqs-0.0.2

- old
+ new

@@ -26,11 +26,11 @@ def initialize(queue, result, logger = nil) @queue = queue @result = result @task_id = 0 @cache = {} - @calculating = {} + @calculating = Hash.new { |hash, key| hash[key] = Array.new } @logger = logger end def queue_task(task_id) @queue.write(@cache[task_id].drb_args(task_id)) @@ -44,56 +44,72 @@ @cache[@task_id] = task queue_task(@task_id) end def get_accept_signal + count = 0 begin - sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0) - @calculating[node_id] = task_id - @logger.info("Accept: task #{task_id} by node #{node_id}.") if @logger - rescue + loop do + sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0) + count += 1 + @calculating[node_id] << task_id + @logger.info("Accept: task #{task_id} by node #{node_id}.") if @logger + end + rescue Rinda::RequestExpiredError + @logger.debug("Accept: #{count} signals.") if @logger end + count end def requeue_for_deleted_node_id(deleted) deleted.each do |node_id| - if task_id = @calculating[node_id] - queue_task(task_id) - @calculating.delete(task_id) + if task_id_ary = @calculating[node_id] + task_id_ary.each { |task_id| queue_task(task_id) } + @calculating.delete(node_id) end end end def get_result + count = 0 begin loop do + get_accept_signal sym, task_id, result = @result.take([:result, Fixnum, nil], 0) - @logger.info("Get: result of #{task_id}.") if @logger - @calculating.delete(task_id) + count += 1 + if ary = @calculating.find { |k, v| v.include?(task_id) } + node_id = ary[0] + @calculating[node_id].delete(task_id) + else + node_id = nil + end + @logger.info("Get: result of #{task_id} from node #{node_id}.") if @logger task = @cache.delete(task_id) if hook = task.hook hook.call(self, result) end end - rescue + rescue Rinda::RequestExpiredError + @logger.debug("Get: #{count} results.") if @logger end + count end + def calculating_task_number + @calculating.inject(0) { |s, key_val| s + key_val[1].size } + end + # If queue is empty, return true. Otherwise, false. # Even if there are calculating tasks, # the method can return true. def empty? - @cache.size - @calculating.size == 0 + @cache.size - calculating_task_number == 0 end # If there are no tasks in queue and calculating, # return true. Otherwise, false. def finished? @cache.size == 0 - end - - def calculating_task_number - @calculating.size end end end