Sha256: d5e09db858b0d8f52d1a8ca56399ac3b36881b30ebe65ddbf46f20292e476755

Contents?: true

Size: 1.53 KB

Versions: 6

Compression:

Stored size: 1.53 KB

Contents

require_relative 'base'
require 'rabbitmq/http/client'

class RabbitmqProvider < BaseProvider
  class Context
    def self.supports_client_acks?
      false
    end
  end

  def fetch_messages(destination_name, opts = {})
    if read_queues_directly
      super
    else
      destination = fetch_destination(destination_name)
      pause_if_needed
      result = []
      loop do
        msgs = rabbitmq.get_messages(vhost, destination.name, { count: 10, requeue: false, encoding: 'auto' }.merge(opts))
        msgs.each do |msg|
          if msg.nil?
            break
          else
            result << build_message(msg)
          end
        end
        break if msgs.nil? || msgs.empty?
      end
      result
    end
  end

  def purge_destination(destination_name, opts = {})
    not_exists_ok = opts.fetch(:not_exists_ok, true)
    if read_queues_directly
      super
    else
      destination = fetch_destination(destination_name)
      begin
        rabbitmq.purge_queue(vhost, destination.name)
      rescue Faraday::ResourceNotFound
        raise unless not_exists_ok
      end
    end
  end

  private

  def build_message(data)
    props = data.properties.dup
    headers = props.delete(:headers)
    MessageDriver::Message::Base.new(Context, data.payload, headers, props)
  end

  def vhost
    @vhost ||= BrokerConfig.config[:vhost]
  end

  def rabbitmq
    @rabbitmq ||= begin
      endpoint = 'http://127.0.0.1:15672'
      RabbitMQ::HTTP::Client.new(endpoint, username: 'guest', password: 'guest')
    end
  end
end

Provider = RabbitmqProvider

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
message-driver-0.6.1 test_lib/provider/rabbitmq.rb
message-driver-0.6.0 test_lib/provider/rabbitmq.rb
message-driver-0.5.3 test_lib/provider/rabbitmq.rb
message-driver-0.5.2 test_lib/provider/rabbitmq.rb
message-driver-0.5.1 test_lib/provider/rabbitmq.rb
message-driver-0.5.0 test_lib/provider/rabbitmq.rb