Sha256: 02b49cd4c73e963500ec25f255d7ebc65eb4058042f945827038a916bf09ed59

Contents?: true

Size: 1.3 KB

Versions: 6

Compression:

Stored size: 1.3 KB

Contents

class BaseProvider
  attr_accessor :broker_name, :read_queues_directly

  def initialize
    @read_queues_directly = false
  end

  def broker_name
    @broker_name ||= MessageDriver::Broker::DEFAULT_BROKER_NAME
  end

  def pause_if_needed(seconds = 0.1)
    seconds *= 10 if ENV['CI'] == 'true'
    case BrokerConfig.current_adapter
    when :in_memory
    else
      sleep seconds
    end
  end

  def fetch_messages(destination_name, _opts = {})
    destination = fetch_destination(destination_name)
    pause_if_needed
    result = []
    loop do
      msg = destination.pop_message
      if msg.nil?
        break
      else
        result << msg
      end
    end
    result
  end

  def purge_destination(destination_name, _opts = {})
    destination = fetch_destination(destination_name)
    if destination.respond_to? :purge
      destination.purge
    else
      fetch_messages(destination)
    end
  end

  def fetch_destination(destination)
    case destination
    when String, Symbol
      MessageDriver::Client[broker_name].find_destination(destination)
    when MessageDriver::Destination::Base
      destination
    else
      fail "didn't understand destination #{destination.inspect}"
    end
  end

  def fetch_current_adapter_context
    MessageDriver::Client[broker_name].current_adapter_context
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
message-driver-0.6.1 test_lib/provider/base.rb
message-driver-0.6.0 test_lib/provider/base.rb
message-driver-0.5.3 test_lib/provider/base.rb
message-driver-0.5.2 test_lib/provider/base.rb
message-driver-0.5.1 test_lib/provider/base.rb
message-driver-0.5.0 test_lib/provider/base.rb