Sha256: 26d452f39509c72e92dd67b8ac1a39e9b8af1d850515037cdc59b6d72dcf02dd
Contents?: true
Size: 1.86 KB
Versions: 9
Compression:
Stored size: 1.86 KB
Contents
# -*- encoding: utf-8 -*- # A buffer that ensures frames are RECEIPTed against a # {OnStomp::Client client}'s {OnStomp::Connections::Base connection} and # replays the ones that were not when the # {OnStomp::Failover::Client failover} client reconnects. # @todo Quite a lot of this code is shared between Written and Receipts, # we'll want to factor the common stuff out. class OnStomp::Failover::Buffers::Receipts < OnStomp::Failover::Buffers::Base def initialize failover super [:send, :commit, :abort, :subscribe].each do |ev| failover.__send__(:"before_#{ev}") do |f, *_| add_to_buffer f, {:receipt => OnStomp.next_serial} end end failover.before_begin do |f, *_| add_to_transactions f, {:receipt => OnStomp.next_serial} end # We can scrub the subscription before UNSUBSCRIBE is fully written # because if we replay before UNSUBSCRIBE was sent, we still don't # want to be subscribed when we reconnect. failover.before_unsubscribe do |f, *_| remove_subscribe_from_buffer f end failover.on_receipt { |r, *_| debuffer_frame r } failover.on_failover_connected { |f,c,*_| replay_buffer c } end # Removes frames that neither transactional nor SUBSCRIBEs from the buffer # by looking the buffered frames up by their `receipt` header. def debuffer_frame r orig = @buffer_mutex.synchronize do @buffer.detect { |f| f[:receipt] == r[:'receipt-id'] } end if orig # COMMIT and ABORT debuffer the whole transaction sequence if ['COMMIT', 'ABORT'].include? orig.command remove_from_transactions orig # Otherwise, if this isn't part of a transaction, debuffer the # particular frame (if it's not a SUBSCRIBE) elsif orig.command != 'SUBSCRIBE' && !orig.header?(:transaction) @buffer_mutex.synchronize { @buffer.delete orig } end end end end
Version data entries
9 entries across 9 versions & 1 rubygems