lib/drbqs/queue.rb in drbqs-0.0.9 vs lib/drbqs/queue.rb in drbqs-0.0.10
- old
+ new
@@ -1,16 +1,21 @@
+require 'drbqs/history'
+
module DRbQS
class QueueServer
- attr_reader :calculating
+ include HistoryUtils
+ attr_reader :calculating, :history
+
def initialize(queue, result, logger = nil)
@queue = queue
@result = result
@task_id = 0
@cache = {}
@calculating = Hash.new { |hash, key| hash[key] = Array.new }
+ @history = DRbQS::History.new
@logger = logger
end
def queue_task(task_id)
@queue.write(@cache[task_id].drb_args(task_id))
@@ -22,20 +27,22 @@
def add(task)
@task_id += 1
@logger.info("New task: #{@task_id}") if @logger
@cache[@task_id] = task
queue_task(@task_id)
+ @history.set(@task_id, :add)
@task_id
end
def get_accept_signal
count = 0
begin
loop do
sym, task_id, node_id = @result.take([:accept, Fixnum, Fixnum], 0)
count += 1
@calculating[node_id] << task_id
+ @history.set(task_id, :calculate, node_id)
@logger.info("Accept: task #{task_id} by node #{node_id}.") if @logger
end
rescue Rinda::RequestExpiredError
@logger.debug("Accept: #{count} signals.") if @logger
end
@@ -45,35 +52,51 @@
def requeue_for_deleted_node_id(deleted)
deleted.each do |node_id|
if task_id_ary = @calculating[node_id]
task_id_ary.each do |task_id|
queue_task(task_id)
+ @history.set(task_id, :requeue)
@logger.info("Requeue: task #{task_id}.") if @logger
end
@calculating.delete(node_id)
end
end
end
+ def delete_task_id(node_id, task_id)
+ unless @calculating[node_id].delete(task_id)
+ @logger.error("Task #{task_id} does not exist in list of calculating tasks.") if @logger
+ end
+ if ary = @calculating.find { |k, v| v.include?(task_id) }
+ @logger.error("Node #{ary[0]} is calculating task #{task_id}, too.") if @logger
+ end
+ end
+ private :delete_task_id
+
+ def exec_task_hook(task_id, result)
+ if task = @cache.delete(task_id)
+ if hook = task.hook
+ @history.set(task_id, :hook)
+ hook.call(self, result)
+ end
+ else
+ @logger.error("Task #{task_id} is not cached.") if @logger
+ end
+ end
+ private :exec_task_hook
+
def get_result
count = 0
begin
loop do
get_accept_signal
- sym, task_id, result = @result.take([:result, Fixnum, nil], 0)
+ sym, task_id, node_id, result = @result.take([:result, Fixnum, Fixnum, nil], 0)
count += 1
- if ary = @calculating.find { |k, v| v.include?(task_id) }
- node_id = ary[0]
- @calculating[node_id].delete(task_id)
- else
- node_id = nil
- end
+ @history.set(task_id, :result, node_id)
@logger.info("Get: result of #{task_id} from node #{node_id}.") if @logger
- task = @cache.delete(task_id)
- if hook = task.hook
- hook.call(self, result)
- end
+ delete_task_id(node_id, task_id)
+ exec_task_hook(task_id, result)
end
rescue Rinda::RequestExpiredError
@logger.debug("Get: #{count} results.") if @logger
end
count
@@ -92,9 +115,25 @@
# If there are no tasks in queue and calculating,
# return true. Otherwise, false.
def finished?
@cache.size == 0
+ end
+
+ def all_logs
+ s = ''
+ @history.each do |task_id, events|
+ s << "Task #{task_id}\n"
+ events.each do |ev|
+ case ev[1]
+ when :add, :requeue, :hook
+ s << " #{time_to_string(ev[0])}\t#{ev[1]}\n"
+ when :calculate, :result
+ s << " #{time_to_string(ev[0])}\t#{ev[1]} (node #{ev[2]})\n"
+ end
+ end
+ end
+ s
end
end
end