Sha256: 56f47184a1113fc9012fcf40ae3acf60513b27b9e8b1d7ebfe64a4d5fa19285f
Contents?: true
Size: 1.38 KB
Versions: 2
Compression:
Stored size: 1.38 KB
Contents
require 'beanstalk-client' module RosettaQueue module Gateway class BeanstalkAdapter < BaseAdapter def ack(msg) @conn.ack(msg.headers["message-id"]) end def initialize(adapter_settings = {}) @host, @port = adapter_settings[:host], adapter_settings[:port] @conn = Beanstalk::Pool.new(["#{@host}:#{@port}"]) end def disconnect; end # TODO: support options[:timeout] ? def receive(options=nil) msg = @conn.reserve msg.delete msg end def receive_once(destination=nil, opts={}) receive.body end def receive_with(message_handler) # Note that, while we call destination_for (to comply with # Rosetta's generic specs), beanstalk doesn't actually support # destinations. This is just for compatibility. destination = destination_for(message_handler) running do msg = receive.body RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") message_handler.on_message(filter_receiving(msg)) end end def send_message(destination, message, options) RosettaQueue.logger.info("Publishing to #{destination} :: #{message}") @conn.put(message) end private def running(&block) loop(&block) end end end end
Version data entries
2 entries across 2 versions & 2 rubygems
Version | Path |
---|---|
bmabey-rosetta_queue-0.2.0 | lib/rosetta_queue/adapters/beanstalk.rb |
cwyckoff-rosetta_queue-0.3.3 | lib/rosetta_queue/adapters/beanstalk.rb |