lib/onstomp/failover/buffers/written.rb in onstomp-1.0.3 vs lib/onstomp/failover/buffers/written.rb in onstomp-1.0.4
- old
+ new
@@ -2,90 +2,38 @@
# A buffer that ensures frames are at least written to a
# {OnStomp::Client client}'s {OnStomp::Connections::Base connection} and
# replays the ones that were not when the
# {OnStomp::Failover::Client failover} client reconnects.
-class OnStomp::Failover::Buffers::Written
+class OnStomp::Failover::Buffers::Written < OnStomp::Failover::Buffers::Base
def initialize failover
- @failover = failover
- @buffer_mutex = Mutex.new
- @buffer = []
- @txs = {}
-
- failover.before_send &method(:buffer_frame)
- failover.before_commit &method(:buffer_frame)
- failover.before_abort &method(:buffer_frame)
- failover.before_subscribe &method(:buffer_frame)
- failover.before_begin &method(:buffer_transaction)
+ super
+ [:send, :commit, :abort, :subscribe].each do |ev|
+ failover.__send__(:"before_#{ev}") do |f, *_|
+ add_to_buffer f
+ end
+ end
+ # We only want to scrub the transactions if ABORT or COMMIT was
+ # at least written fully to the socket.
+ [:commit, :abort].each do |ev|
+ failover.__send__(:"on_#{ev}") do |f,*_|
+ remove_from_transactions f
+ end
+ end
+ failover.before_begin { |f, *_| add_to_transactions f }
# We can scrub the subscription before UNSUBSCRIBE is fully written
# because if we replay before UNSUBSCRIBE was sent, we still don't
# want to be subscribed when we reconnect.
- failover.before_unsubscribe &method(:debuffer_subscription)
- # We only want to scrub the transactions if ABORT or COMMIT was
- # at least written fully to the socket.
- failover.on_commit &method(:debuffer_transaction)
- failover.on_abort &method(:debuffer_transaction)
+ failover.before_unsubscribe { |f, *_| remove_subscribe_from_buffer f }
failover.on_send &method(:debuffer_non_transactional_frame)
-
- failover.on_failover_connected &method(:replay)
+ failover.on_failover_connected { |f,c,*_| replay_buffer c }
end
-
- # Adds a frame to a buffer so that it may be replayed if the
- # {OnStomp::Failover::Client failover} client re-connects
- def buffer_frame f, *_
- @buffer_mutex.synchronize do
- unless f.header? :'x-onstomp-failover-replay'
- @buffer << f
- end
- end
- end
-
- # Records the start of a transaction so that it may be replayed if the
- # {OnStomp::Failover::Client failover} client re-connects
- def buffer_transaction f, *_
- @txs[f[:transaction]] = true
- buffer_frame f
- end
-
- # Removes the recorded transaction from the buffer after it has been
- # written the broker socket so that it will not be replayed when the
- # {OnStomp::Failover::Client failover} client re-connects
- def debuffer_transaction f, *_
- tx = f[:transaction]
- if @txs.delete tx
- @buffer_mutex.synchronize do
- @buffer.reject! { |bf| bf[:transaction] == tx }
- end
- end
- end
-
- # Removes the matching SUBSCRIBE frame from the buffer after the
- # UNSUBSCRIBE has been added to the connection's write buffer
- # so that it will not be replayed when the
- # {OnStomp::Failover::Client failover} client re-connects
- def debuffer_subscription f, *_
- @buffer_mutex.synchronize do
- @buffer.reject! { |bf| bf.command == 'SUBSCRIBE' && bf[:id] == f[:id] }
- end
- end
-
+
# Removes a frame that is not part of a transaction from the buffer
# after it has been written the broker socket so that it will not be
# replayed when the {OnStomp::Failover::Client failover} client re-connects
def debuffer_non_transactional_frame f, *_
unless @txs.key?(f[:transaction])
@buffer_mutex.synchronize { @buffer.delete f }
- end
- end
-
- # Called when the {OnStomp::Failover::Client failover} client triggers
- # `on_failover_connected` to start replaying any frames in the buffer.
- def replay fail, client, *_
- replay_frames = @buffer_mutex.synchronize do
- @buffer.select { |f| f[:'x-onstomp-failover-replay'] = '1'; true }
- end
-
- replay_frames.each do |f|
- client.transmit f
end
end
end