Sha256: 7a04f7f1f313b6eb875a6b0137853bd8bec14ed43da72a2e4d0bb95d6aceef61
Contents?: true
Size: 1.89 KB
Versions: 2
Compression:
Stored size: 1.89 KB
Contents
class Pigeon::Processor # == Exceptions =========================================================== # == Constants ============================================================ # == Properties =========================================================== attr_reader :task attr_reader :id # == Class Methods ======================================================== # == Instance Methods ===================================================== # Creates a new processor. An optional queue can be specified in which case # the processor will register itself as an observer of that queue. A block # can be given to filter the tasks contained in the associated queue. def initialize(queue = nil, &filter) @id = Pigeon::Support.unique_id @lock = Mutex.new @filter = filter || lambda { |task| true } if (queue) self.queue = queue switch_to_next_task! end end # Assigns this processor to a particular queue. If one is already assigned # then the observer callback for that queue will be removed. def queue=(queue) if (@queue) @queue.remove_observer(&@claim) end @queue = queue @claim = lambda do |task| @lock.synchronize do if (!@task and @filter.call(task)) @task = queue.claim(task) @task.run! do switch_to_next_task! end end end end @queue.observe(&@claim) end # Returns true if the given task would be accepted by the filter defined # for this processor. def accept?(task) @filter.call(task) end # Returns true if a task is currently being processed, false otherwise. def task? !!@task end protected def switch_to_next_task! @lock.synchronize do @task = nil if (@task = @queue.pop(&@filter)) @task.run! do switch_to_next_task! end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
pigeon-0.4.4 | lib/pigeon/processor.rb |
pigeon-0.4.3 | lib/pigeon/processor.rb |