Sha256: e184baa1dd2e2407d5d610f608b331c5375d0c2cfbeb0a2d437cabac697d2312

Contents?: true

Size: 1.27 KB

Versions: 5

Compression:

Stored size: 1.27 KB

Contents

class BaseProvider
  attr_accessor :broker_name, :read_queues_directly

  def initialize
    @read_queues_directly = false
  end

  def broker_name
    @broker_name ||= MessageDriver::Broker::DEFAULT_BROKER_NAME
  end

  def pause_if_needed(seconds = 0.1)
    seconds *= 10 if ENV['CI'] == 'true'
    sleep seconds unless BrokerConfig.current_adapter == :in_memory
  end

  def fetch_messages(destination_name, _opts = {})
    destination = fetch_destination(destination_name)
    pause_if_needed
    result = []
    loop do
      msg = destination.pop_message
      if msg.nil?
        break
      else
        result << msg
      end
    end
    result
  end

  def purge_destination(destination_name, _opts = {})
    destination = fetch_destination(destination_name)
    if destination.respond_to? :purge
      destination.purge
    else
      fetch_messages(destination)
    end
  end

  def fetch_destination(destination)
    case destination
    when String, Symbol
      MessageDriver::Client[broker_name].find_destination(destination)
    when MessageDriver::Destination::Base
      destination
    else
      raise "didn't understand destination #{destination.inspect}"
    end
  end

  def fetch_current_adapter_context
    MessageDriver::Client[broker_name].current_adapter_context
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
message-driver-1.0.1 test_lib/provider/base.rb
message-driver-1.0.0 test_lib/provider/base.rb
message-driver-0.7.2 test_lib/provider/base.rb
message-driver-0.7.1 test_lib/provider/base.rb
message-driver-0.7.0 test_lib/provider/base.rb