Sha256: 84cd415180a113dbc4dea6a2bf81fb519486f8cc9153ece1c7dc883421f0d0da

Contents?: true

Size: 1.46 KB

Versions: 10

Compression:

Stored size: 1.46 KB

Contents

# frozen_string_literal: true

module Rabbit
  module Publishing
    class ChannelsPool
      class BaseQueue < Queue
        def initialize(session, max_size)
          super()

          @session    = session
          @max_size   = max_size - 1
          @ch_size    = 0
          @create_mon = Mutex.new
          @ch_dec_mon = Mutex.new
        end

        def pop
          add_channel if size.zero?

          super
        end
        alias_method :deq, :pop

        def push(channel)
          return @ch_dec_mon.synchronize { @ch_size -= 1 } unless channel&.open?

          super
        end
        alias_method :enq, :push

        def add_channel
          @create_mon.synchronize do
            return unless @ch_size < @max_size

            push create_channel
            @ch_size += 1
          end
        end

        protected

        def create_channel
          @session.create_channel
        end
      end

      class ConfirmQueue < BaseQueue
        def create_channel
          ch = @session.create_channel
          ch.confirm_select

          ch
        end
      end

      def initialize(session)
        max_size = session.channel_max

        @pools = {
          true => ConfirmQueue.new(session, max_size / 2),
          false => BaseQueue.new(session, max_size / 2),
        }.freeze
      end

      def with_channel(confirm)
        pool = @pools[confirm]
        ch = pool.deq
        yield ch
      ensure
        pool.enq ch
      end
    end
  end
end

Version data entries

10 entries across 10 versions & 1 rubygems

Version Path
rabbit_messaging-1.1.0 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-1.0.0 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-0.15.0 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-0.13.0 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-0.12.1 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-0.12.0 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-0.11.0 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-0.10.0 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-0.9.0 lib/rabbit/publishing/channels_pool.rb
rabbit_messaging-0.8.1 lib/rabbit/publishing/channels_pool.rb