Sha256: 851c7866af810ca835394d03f99afcb01335f8a8e522780724ba3bcef166676e

Contents?: true

Size: 1.3 KB

Versions: 1

Compression:

Stored size: 1.3 KB

Contents

module Afterparty
  class Queue
    attr_accessor :options, :temp_namespace, :login_block
    include Afterparty::QueueHelpers

    def initialize options={}, consumer_options={}
      # @consumer = ThreadedQueueConsumer.new(self, consumer_options).start
      @options = options
      @options[:namespace] ||= "default"
      # Afterparty.add_queue @options[:namespace]
      @options[:sleep] ||= 5
      @mutex = Mutex.new
      @options[:logger] ||= Logger.new($stderr)
    end

    def push job
      # @mutex.synchronize do
        return nil if job.nil?
        queue_name = @temp_namespace || @options[:namespace]
        AfterpartyJob.make_with_job job, queue_name
      # end
    end
    alias :<< :push
    alias :eng :push

    def pop
      # @mutex.synchronize do
        while true do
          unless (_job = AfterpartyJob.valid.first).nil?
            ap "poppin job"
            _job.completed = true
            _job.save
            return _job
          end
          sleep(@options[:sleep])
        end
      # end
    end
  end
  
  class TestQueue < Queue
    attr_accessor :completed_jobs
    
    def initialize opts={}, consumer_opts={}
      super
      @completed_jobs = []
      @exceptions = []
    end
    def handle_exception job, exception
      @exceptions << [job, exception]
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
afterparty-0.1.0 lib/afterparty/queue.rb