Sha256: 7a04f7f1f313b6eb875a6b0137853bd8bec14ed43da72a2e4d0bb95d6aceef61

Contents?: true

Size: 1.89 KB

Versions: 2

Compression:

Stored size: 1.89 KB

Contents

class Pigeon::Processor
  # == Exceptions ===========================================================
  
  # == Constants ============================================================

  # == Properties ===========================================================
  
  attr_reader :task
  attr_reader :id

  # == Class Methods ========================================================

  # == Instance Methods =====================================================
  
  # Creates a new processor. An optional queue can be specified in which case
  # the processor will register itself as an observer of that queue. A block
  # can be given to filter the tasks contained in the associated queue.
  def initialize(queue = nil, &filter)
    @id = Pigeon::Support.unique_id
    @lock = Mutex.new
    @filter = filter || lambda { |task| true }
    
    if (queue)
      self.queue = queue
    
      switch_to_next_task!
    end
  end
  
  # Assigns this processor to a particular queue. If one is already assigned
  # then the observer callback for that queue will be removed.
  def queue=(queue)
    if (@queue)
      @queue.remove_observer(&@claim)
    end
    
    @queue = queue

    @claim = lambda do |task|
      @lock.synchronize do
        if (!@task and @filter.call(task))
          @task = queue.claim(task)
      
          @task.run! do
            switch_to_next_task!
          end
        end
      end
    end
    
    @queue.observe(&@claim)
  end
  
  # Returns true if the given task would be accepted by the filter defined
  # for this processor.
  def accept?(task)
    @filter.call(task)
  end
  
  # Returns true if a task is currently being processed, false otherwise.
  def task?
    !!@task
  end
  
protected
  def switch_to_next_task!
    @lock.synchronize do
      @task = nil

      if (@task = @queue.pop(&@filter))
        @task.run! do
          switch_to_next_task!
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
pigeon-0.4.4 lib/pigeon/processor.rb
pigeon-0.4.3 lib/pigeon/processor.rb