Sha256: 56f47184a1113fc9012fcf40ae3acf60513b27b9e8b1d7ebfe64a4d5fa19285f

Contents?: true

Size: 1.38 KB

Versions: 2

Compression:

Stored size: 1.38 KB

Contents

require 'beanstalk-client'

module RosettaQueue
  module Gateway
  
    class BeanstalkAdapter < BaseAdapter

      def ack(msg)
        @conn.ack(msg.headers["message-id"])
      end

      def initialize(adapter_settings = {})
        @host, @port = adapter_settings[:host], adapter_settings[:port]
        @conn = Beanstalk::Pool.new(["#{@host}:#{@port}"])
      end

      def disconnect; end

      # TODO: support options[:timeout] ?
      def receive(options=nil)
        msg = @conn.reserve
        msg.delete
        msg
      end
      
      def receive_once(destination=nil, opts={})
        receive.body
      end

      def receive_with(message_handler)
        # Note that, while we call destination_for (to comply with
        # Rosetta's generic specs), beanstalk doesn't actually support
        # destinations. This is just for compatibility.
        destination = destination_for(message_handler)

        running do
          msg = receive.body
          RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
          message_handler.on_message(filter_receiving(msg))
        end
      end
      
      def send_message(destination, message, options)
        RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")        
        @conn.put(message)
      end

      private
      
        def running(&block)
          loop(&block)
        end

    end
  end
end

Version data entries

2 entries across 2 versions & 2 rubygems

Version Path
bmabey-rosetta_queue-0.2.0 lib/rosetta_queue/adapters/beanstalk.rb
cwyckoff-rosetta_queue-0.3.3 lib/rosetta_queue/adapters/beanstalk.rb