Sha256: 06dcfa0be1bc217bd35fd6625775aadc055599341effff1478e8352a359531cc

Contents?: true

Size: 1.34 KB

Versions: 3

Compression:

Stored size: 1.34 KB

Contents

require 'beanstalk-client'

module RosettaQueue
  module Gateway

    class BeanstalkAdapter < BaseAdapter

      def ack(msg)
        @conn.ack(msg.headers["message-id"])
      end

      def initialize(adapter_settings = {})
        @host, @port = adapter_settings[:host], adapter_settings[:port]
        @conn = Beanstalk::Pool.new(["#{@host}:#{@port}"])
      end

      def disconnect; end

      # TODO: support options[:timeout] ?
      def receive(options=nil)
        msg = @conn.reserve
        msg.delete
        msg
      end

      def receive_once(destination=nil, opts={})
        receive.body
      end

      def receive_with(message_handler)
        # Note that, while we call destination_for (to comply with
        # Rosetta's generic specs), beanstalk doesn't actually support
        # destinations. This is just for compatibility.
        destination = destination_for(message_handler)

        running do
          msg = receive.body
          RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
          message_handler.handle_message(msg)
        end
      end

      def send_message(destination, message, options)
        RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")
        @conn.put(message)
      end

      private

        def running(&block)
          loop(&block)
        end

    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
rosetta_queue-0.5.2 lib/rosetta_queue/adapters/beanstalk.rb
rosetta_queue-0.5.0 lib/rosetta_queue/adapters/beanstalk.rb
rosetta_queue-0.4.0 lib/rosetta_queue/adapters/beanstalk.rb