Sha256: e0cf19036586618f7ed046e001ae86ab26a9e6c25e421dabcea2f2a32b5f866a

Contents?: true

Size: 1.41 KB

Versions: 9

Compression:

Stored size: 1.41 KB

Contents

# -*- encoding: utf-8 -*-

# The base class for all buffers. This class exists mostly as a factoring
# out of the code shared between the {OnStomp::Failover::Buffers::Written}
# and {OnStomp::Failover::Buffers::Receipts} buffers.
class OnStomp::Failover::Buffers::Base
  def initialize failover
    @failover = failover
    @buffer_mutex = Mutex.new
    @buffer = []
    @txs = {}
  end
  
  # Returns the number of frames currently sitting in the buffer.
  # @return [Fixnum]
  def buffered
    @buffer.length
  end
  
  private
  def add_to_buffer f, heads={}
    @buffer_mutex.synchronize do
      unless f.header? :'x-onstomp-failover-replay'
        f.headers.reverse_merge! heads        
        @buffer << f 
      end
    end
  end
  
  def add_to_transactions f, heads={}
    @txs[f[:transaction]] = true
    add_to_buffer f, heads
  end
  
  def remove_from_transactions f
    tx = f[:transaction]
    if @txs.delete tx
      @buffer_mutex.synchronize do
        @buffer.reject! { |bf| bf[:transaction] == tx }
      end
    end
  end
  
  def remove_subscribe_from_buffer f
    @buffer_mutex.synchronize do
      @buffer.reject! { |bf| bf.command == 'SUBSCRIBE' && bf[:id] == f[:id] }
    end
  end
  
  def replay_buffer client
    replay_frames = @buffer_mutex.synchronize do
      @buffer.select { |f| f[:'x-onstomp-failover-replay'] = '1'; true }
    end
    
    replay_frames.each do |f|
      client.transmit f
    end
  end
end

Version data entries

9 entries across 9 versions & 1 rubygems

Version Path
onstomp-1.0.12 lib/onstomp/failover/buffers/base.rb
onstomp-1.0.11 lib/onstomp/failover/buffers/base.rb
onstomp-1.0.10 lib/onstomp/failover/buffers/base.rb
onstomp-1.0.9 lib/onstomp/failover/buffers/base.rb
onstomp-1.0.8 lib/onstomp/failover/buffers/base.rb
onstomp-1.0.7 lib/onstomp/failover/buffers/base.rb
onstomp-1.0.6 lib/onstomp/failover/buffers/base.rb
onstomp-1.0.5 lib/onstomp/failover/buffers/base.rb
onstomp-1.0.4 lib/onstomp/failover/buffers/base.rb