Sha256: 9dfb381cfac7aef2c541e321047471d735046b8829e0b8a4eea56a0805f83d2b

Contents?: true

Size: 959 Bytes

Versions: 2

Compression:

Stored size: 959 Bytes

Contents

class MessageQueue::Adapters::Memory::Connection::Consumer < MessageQueue::Consumer
  attr_reader :queue, :block

  def initialize(*args)
    super
    @queue = []
  end

  def subscribe(options = {}, &block)
    producer = options.fetch(:producer)
    producer.add_observer(self)
    @block = block
  end

  def unsubscribe(options = {})
    producer = options.fetch(:producer)
    producer.delete_observer(self)
    @block = nil
  end

  def update(object, options)
    object = load_object(object)
    message = MessageQueue::Message.new(:message_id => options[:message_id],
                                        :type => options[:type],
                                        :timestamp => options[:timestamp],
                                        :routing_key => options[:routing_key],
                                        :payload => load_object(object))
    if block
      block.call(message)
    else
      queue << message
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
message_queue-0.1.1 lib/message_queue/adapters/memory/consumer.rb
message_queue-0.1.0 lib/message_queue/adapters/memory/consumer.rb