Sha256: dc089c9f5a2b4d7395c8d615e397ce8d0f9eca62722b8bacdfa4c09253ad5768

Contents?: true

Size: 1.43 KB

Versions: 3

Compression:

Stored size: 1.43 KB

Contents

module Qwirk
  module Adapter
    module InMemory

      class ReplyQueue
        def initialize(name)
          @name                   = name
          @outstanding_hash_mutex = Mutex.new
          @read_condition         = ConditionVariable.new
          @array                  = []
        end

        def timeout_read(timeout)
          @outstanding_hash_mutex.synchronize do
            return @array.shift unless @array.empty?
            return nil unless timeout > 0
            timed_read_condition_wait(timeout)
            return @array.shift
          end
          return nil
        end

        def write(obj)
          @outstanding_hash_mutex.synchronize do
            @array << obj
            @read_condition.signal
            return
          end
        end

        def to_s
          "reply_queue:#{@name}"
        end

        #######
        private
        #######

        if RUBY_PLATFORM == 'jruby' || RUBY_VERSION[0,3] != '1.8'
          def timed_read_condition_wait(timeout)
            # This method not available in MRI 1.8
            @read_condition.wait(@outstanding_hash_mutex, timeout)
          end
        else
          require 'timeout'
          def timed_read_condition_wait(timeout)
            Timeout.timeout(timeout) do
              @read_condition.wait(@outstanding_hash_mutex)
            end
          rescue Timeout::Error => e
            return nil
          end
        end

      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
qwirk-0.2.4 lib/qwirk/adapter/in_memory/reply_queue.rb
qwirk-0.2.3 lib/qwirk/adapter/in_memory/reply_queue.rb
qwirk-0.2.2 lib/qwirk/adapter/in_memory/reply_queue.rb