Sha256: a1a22e6d94fe776136ad37df5dc841e928a7f8da7cb57c08f9f61cd176a1f8ff

Contents?: true

Size: 1.33 KB

Versions: 4

Compression:

Stored size: 1.33 KB

Contents

require 'weakref'

class Arsenicum::Queue

  attr_reader   :name,  :worker_count,  :router
  attr_reader   :broker

  def initialize(name, options)
    @name         = name
    @worker_count = options.delete(:worker_count)
    @router       = build_router options.delete(:router_class)
    @broker       = Arsenicum::Core::Broker.new worker_count: worker_count, router: router
  end

  def start
    Arsenicum::Logger.info{"[queue]Queue #{name} is now starting"}
    broker.run
    Arsenicum::Logger.info{"[queue]Queue #{name} start-up completed"}

    loop do
      begin
        (message, original_message) = pick
      rescue => e
        handle_failure e, original_message
        next
      end

      next sleep(0.5) unless message

      broker.delegate message, -> { handle_success(original_message) }, -> e { handle_failure(e, original_message) }
    end
  end

  def stop
    broker.stop
  end

  def register(task)
    broker[task.id] = task
  end

  def handle_success(original_message)
    #TODO implement correctly in your derived classes.
  end

  def handle_failure(e, original_message)
    #TODO implement correctly in your derived classes.
  end

  def start_async
    Thread.new{start}
  end

  private
  def build_router(router_class)
    return unless router_class
    router_class.new self
  end

  autoload  :Sqs, 'arsenicum/queue/sqs'
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
arsenicum-0.3.1.2 lib/arsenicum/queue.rb
arsenicum-0.3.1.1 lib/arsenicum/queue.rb
arsenicum-0.3.1 lib/arsenicum/queue.rb
arsenicum-0.3.0 lib/arsenicum/queue.rb