lib/irrc/runner.rb in irrc-0.1.0 vs lib/irrc/runner.rb in irrc-0.2.0

- old
+ new

@@ -1,37 +1,63 @@ module Irrc module Runner - def run + def run(threads) done = [] loop do - if queue.empty? - close + # NOTE: trick to avoid dead lock + if last_thread_of?(threads) && @queue.empty? + terminate + logger.debug "Queue #{threads - 1} guard objects" + (threads - 1).times { @queue.push nil } return done end - query = queue.pop + query = @queue.pop + + # NOTE: trick to avoid dead lock + if query.nil? + terminate + return done + end + connect unless established? begin - process query + logger.info "Processing #{query.object}" + query = process(query) query.success + logger.debug "Queue new #{query.children.size} queries" + query.children.each {|q| @queue << q } rescue - logger.error $!.message + logger.error "#{$!.message} when processing #{query.object} for #{query.root.object}" query.fail end - done << query + done << query if query.root? end end private + def cache(object, sources, &block) + @cache["#{object}:#{sources}"] ||= yield + end + def execute(command) return if command.nil? || command == '' - logger.debug "Executing: #{command}" - connection.cmd(command).tap {|result| logger.debug "Returned: #{result}" } + logger.debug %(Executing "#{command}") + connection.cmd(command).tap {|result| logger.debug %(Got "#{result}") } + end + + def last_thread_of?(threads) + Thread.list.reject(&:stop?).size == 1 && Thread.list.size == threads+1 + end + + def terminate + logger.info "No more queries" + close end end end