Sha256: 119ca01fa40b927c07df5121ed4cc49616b4907b5cf943d5c5191fbba27791da

Contents?: true

Size: 1.2 KB

Versions: 6

Compression:

Stored size: 1.2 KB

Contents

require 'weakref'

class Arsenicum::Queue

  attr_reader   :name,  :worker_count,  :router
  attr_reader   :broker

  def initialize(name, options)
    @name         = name
    @worker_count = options.delete(:worker_count)
    @router       = build_router options.delete(:router_class)
    @broker       = Arsenicum::Core::Broker.new worker_count: worker_count, router: router
  end

  def start
    Arsenicum::Logger.info "[queue]Queue #{name} is now starting"
    broker.run
    Arsenicum::Logger.info "[queue]Queue #{name} start-up completed"

    loop do
      (message, original_message) = pick
      next sleep(0.5) unless message

      broker.delegate message, -> { handle_success(original_message) }, -> e { handle_failure(e, original_message) }
    end
  end

  def register(task)
    broker[task.id] = task
  end

  def handle_success(original_message)
    #TODO implement correctly in your derived classes.
  end

  def handle_failure(e, original_message)
    #TODO implement correctly in your derived classes.
  end

  def start_async
    Thread.new{start}
  end

  private
  def build_router(router_class)
    return unless router_class
    router_class.new self
  end

  autoload  :Sqs, 'arsenicum/queue/sqs'
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
arsenicum-0.2.1.1 lib/arsenicum/queue.rb
arsenicum-0.2.1 lib/arsenicum/queue.rb
arsenicum-0.2 lib/arsenicum/queue.rb
arsenicum-0.1.3 lib/arsenicum/queue.rb
arsenicum-0.1.2 lib/arsenicum/queue.rb
arsenicum-0.1.1 lib/arsenicum/queue.rb