Sha256: 06dcfa0be1bc217bd35fd6625775aadc055599341effff1478e8352a359531cc
Contents?: true
Size: 1.34 KB
Versions: 3
Compression:
Stored size: 1.34 KB
Contents
require 'beanstalk-client' module RosettaQueue module Gateway class BeanstalkAdapter < BaseAdapter def ack(msg) @conn.ack(msg.headers["message-id"]) end def initialize(adapter_settings = {}) @host, @port = adapter_settings[:host], adapter_settings[:port] @conn = Beanstalk::Pool.new(["#{@host}:#{@port}"]) end def disconnect; end # TODO: support options[:timeout] ? def receive(options=nil) msg = @conn.reserve msg.delete msg end def receive_once(destination=nil, opts={}) receive.body end def receive_with(message_handler) # Note that, while we call destination_for (to comply with # Rosetta's generic specs), beanstalk doesn't actually support # destinations. This is just for compatibility. destination = destination_for(message_handler) running do msg = receive.body RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}") message_handler.handle_message(msg) end end def send_message(destination, message, options) RosettaQueue.logger.info("Publishing to #{destination} :: #{message}") @conn.put(message) end private def running(&block) loop(&block) end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
rosetta_queue-0.5.2 | lib/rosetta_queue/adapters/beanstalk.rb |
rosetta_queue-0.5.0 | lib/rosetta_queue/adapters/beanstalk.rb |
rosetta_queue-0.4.0 | lib/rosetta_queue/adapters/beanstalk.rb |