Sha256: ef9a05091eed22c6f724b693f94ea27557aa5305676b849158b7d0acf73e9b4b

Contents?: true

Size: 1.48 KB

Versions: 1

Compression:

Stored size: 1.48 KB

Contents

module Irrc
  module Runner
    def run(threads)
      done = []

      loop do
        # Trick to avoid dead lock
        if last_thread_of?(threads) && @queue.empty?
          terminate

          # Queue guard objects notifying other threads to return results
          logger.debug "Queue #{threads - 1} guard objects"
          (threads - 1).times { @queue.push nil }

          return done
        end

        query = @queue.pop

        # Trick to avoid dead lock
        if query.nil?
          terminate
          return done
        end

        connect unless established?

        begin
          logger.info "Processing #{query.object}"
          query = process(query)
          query.success
          logger.debug "Queue new #{query.children.size} queries"
          query.children.each {|q| @queue << q }
        rescue
          logger.error "#{$!.message} when processing #{query.object} for #{query.root.object}"
          query.fail
        end

        done << query if query.root?
      end
    end


    private

    def cache(object, sources, &block)
      @cache["#{object}:#{sources}"] ||= yield
    end

    def execute(command)
      return if command.nil? || command == ''

      logger.debug %(Executing "#{command}")
      connection.cmd(command).tap {|result| logger.debug %(Got "#{result}") }
    end

    def last_thread_of?(num_threads)
      return @queue.num_waiting == num_threads - 1
    end

    def terminate
      logger.info "No more queries"
      close
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
irrc-0.2.1 lib/irrc/runner.rb