Sha256: b46949607d4a1a7732a8096dd0f234e3a515792a9de074f8f9f1d02e8d70808d

Contents?: true

Size: 1.37 KB

Versions: 1

Compression:

Stored size: 1.37 KB

Contents

require 'beanstalk-client'

module RosettaQueue
  module Gateway
  
    class BeanstalkAdapter < BaseAdapter

      def ack(msg)
        @conn.ack(msg.headers["message-id"])
      end

      def initialize(user=nil, password=nil, host="localhost", port=11300)
        @host, @port = host, port
        @conn = Beanstalk::Pool.new(["#{host}:#{port}"])
      end

      def disconnect; end

      # TODO: support options[:timeout] ?
      def receive(options=nil)
        msg = @conn.reserve
        msg.delete
        msg
      end
      
      def receive_once(destination=nil, opts={})
        receive.body
      end

      def receive_with(message_handler)
        # Note that, while we call destination_for (to comply with
        # Rosetta's generic specs), beanstalk doesn't actually support
        # destinations. This is just for compatibility.
        destination = destination_for(message_handler)

        running do
          msg = receive.body
          RosettaQueue.logger.info("Receiving from #{destination} :: #{msg}")
          message_handler.on_message(filter_receiving(msg))
        end
      end
      
      def send_message(destination, message, options)
        RosettaQueue.logger.info("Publishing to #{destination} :: #{message}")        
        @conn.put(message)
      end

      private
      
        def running(&block)
          loop(&block)
        end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
bmabey-rosetta_queue-0.1.3 lib/rosetta_queue/adapters/beanstalk.rb