Sha256: 0806ece016e887cd9b77996b8cbcc6acd7521d0c7ebf33c2a077b078eb5ede62

Contents?: true

Size: 1.26 KB

Versions: 3

Compression:

Stored size: 1.26 KB

Contents

module Msgr
  class TestPool
    def initialize(*)
      @queue = []
      @mutex = Mutex.new
      @event = ConditionVariable.new
    end

    def post(message, &block)
      @mutex.synchronize do
        @queue << [block, message]
        @event.signal
      end
    end

    def run(**kwargs)
      @mutex.synchronize do
        ns_run(**kwargs)
      end
    end

    def clear
      @mutex.synchronize do
        @queue.clear
      end
    end

    alias_method :reset, :clear

    private

    def ns_run(count: 1, timeout: 5)
      received = 0

      while received < count
        if (item = @queue.pop)
          item[0].call item[1]
          received += 1
        else
          start = Time.now.to_f

          @event.wait(@mutex, timeout)

          stop = Time.now.to_f
          diff = stop - start
          timeout -= diff

          if timeout <= 0
            raise TimeoutError.new \
              "Expected to receive #{count} messages but received #{received}."
          end
        end
      end
    end

    class << self
      def new(*args)
        @instance ||= super(*args)
      end

      def run(*args)
        new.run(*args)
      end

      def clear
        @instance ? @instance.clear : nil
      end

      alias_method :reset, :clear
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
msgr-0.14.1.1.b125 lib/msgr/test_pool.rb
msgr-0.14.1.1.b124 lib/msgr/test_pool.rb
msgr-0.14.1.1.b112 lib/msgr/test_pool.rb