Sha256: 9dfb381cfac7aef2c541e321047471d735046b8829e0b8a4eea56a0805f83d2b
Contents?: true
Size: 959 Bytes
Versions: 2
Compression:
Stored size: 959 Bytes
Contents
class MessageQueue::Adapters::Memory::Connection::Consumer < MessageQueue::Consumer attr_reader :queue, :block def initialize(*args) super @queue = [] end def subscribe(options = {}, &block) producer = options.fetch(:producer) producer.add_observer(self) @block = block end def unsubscribe(options = {}) producer = options.fetch(:producer) producer.delete_observer(self) @block = nil end def update(object, options) object = load_object(object) message = MessageQueue::Message.new(:message_id => options[:message_id], :type => options[:type], :timestamp => options[:timestamp], :routing_key => options[:routing_key], :payload => load_object(object)) if block block.call(message) else queue << message end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
message_queue-0.1.1 | lib/message_queue/adapters/memory/consumer.rb |
message_queue-0.1.0 | lib/message_queue/adapters/memory/consumer.rb |