lib/onstomp/failover/buffers/receipts.rb in onstomp-1.0.3 vs lib/onstomp/failover/buffers/receipts.rb in onstomp-1.0.4
- old
+ new
@@ -4,98 +4,45 @@
# {OnStomp::Client client}'s {OnStomp::Connections::Base connection} and
# replays the ones that were not when the
# {OnStomp::Failover::Client failover} client reconnects.
# @todo Quite a lot of this code is shared between Written and Receipts,
# we'll want to factor the common stuff out.
-class OnStomp::Failover::Buffers::Receipts
+class OnStomp::Failover::Buffers::Receipts < OnStomp::Failover::Buffers::Base
def initialize failover
- @failover = failover
- @buffer_mutex = Mutex.new
- @buffer = []
- @txs = {}
-
- failover.before_send &method(:buffer_frame)
- failover.before_commit &method(:buffer_frame)
- failover.before_abort &method(:buffer_frame)
- failover.before_subscribe &method(:buffer_frame)
- failover.before_begin &method(:buffer_transaction)
+ super
+ [:send, :commit, :abort, :subscribe].each do |ev|
+ failover.__send__(:"before_#{ev}") do |f, *_|
+ add_to_buffer f, {:receipt => OnStomp.next_serial}
+ end
+ end
+ failover.before_begin do |f, *_|
+ add_to_transactions f, {:receipt => OnStomp.next_serial}
+ end
# We can scrub the subscription before UNSUBSCRIBE is fully written
# because if we replay before UNSUBSCRIBE was sent, we still don't
# want to be subscribed when we reconnect.
- failover.before_unsubscribe &method(:debuffer_subscription)
- failover.on_receipt &method(:debuffer_frame)
-
- failover.on_failover_connected &method(:replay)
- end
-
- # Adds a frame to a buffer so that it may be replayed if the
- # {OnStomp::Failover::Client failover} client re-connects
- def buffer_frame f, *_
- @buffer_mutex.synchronize do
- # Don't re-buffer frames that are being replayed.
- unless f.header? :'x-onstomp-failover-replay'
- # Create a receipt header, unless the frame already has one.
- f[:receipt] = OnStomp.next_serial unless f.header?(:receipt)
- @buffer << f
- end
+ failover.before_unsubscribe do |f, *_|
+ remove_subscribe_from_buffer f
end
+ failover.on_receipt { |r, *_| debuffer_frame r }
+ failover.on_failover_connected { |f,c,*_| replay_buffer c }
end
- # Records the start of a transaction so that it may be replayed if the
- # {OnStomp::Failover::Client failover} client re-connects
- def buffer_transaction f, *_
- @txs[f[:transaction]] = true
- buffer_frame f
- end
- # Removes the recorded transaction from the buffer after it has been
- # written the broker socket so that it will not be replayed when the
- # {OnStomp::Failover::Client failover} client re-connects
- def debuffer_transaction f
- tx = f[:transaction]
- if @txs.delete tx
- @buffer_mutex.synchronize do
- @buffer.reject! { |bf| bf[:transaction] == tx }
- end
- end
- end
-
- # Removes the matching SUBSCRIBE frame from the buffer after the
- # UNSUBSCRIBE has been added to the connection's write buffer
- # so that it will not be replayed when the
- # {OnStomp::Failover::Client failover} client re-connects
- def debuffer_subscription f, *_
- @buffer_mutex.synchronize do
- @buffer.reject! { |bf| bf.command == 'SUBSCRIBE' && bf[:id] == f[:id] }
- end
- end
-
# Removes frames that neither transactional nor SUBSCRIBEs from the buffer
# by looking the buffered frames up by their `receipt` header.
- def debuffer_frame r, *_
+ def debuffer_frame r
orig = @buffer_mutex.synchronize do
@buffer.detect { |f| f[:receipt] == r[:'receipt-id'] }
end
if orig
# COMMIT and ABORT debuffer the whole transaction sequence
if ['COMMIT', 'ABORT'].include? orig.command
- debuffer_transaction orig
+ remove_from_transactions orig
# Otherwise, if this isn't part of a transaction, debuffer the
# particular frame (if it's not a SUBSCRIBE)
elsif orig.command != 'SUBSCRIBE' && !orig.header?(:transaction)
@buffer_mutex.synchronize { @buffer.delete orig }
end
- end
- end
-
- # Called when the {OnStomp::Failover::Client failover} client triggers
- # `on_failover_connected` to start replaying any frames in the buffer.
- def replay fail, client, *_
- replay_frames = @buffer_mutex.synchronize do
- @buffer.select { |f| f[:'x-onstomp-failover-replay'] = '1'; true }
- end
-
- replay_frames.each do |f|
- client.transmit f
end
end
end