lib/onstomp/failover/buffers/written.rb in onstomp-1.0.3 vs lib/onstomp/failover/buffers/written.rb in onstomp-1.0.4

- old
+ new

@@ -2,90 +2,38 @@ # A buffer that ensures frames are at least written to a # {OnStomp::Client client}'s {OnStomp::Connections::Base connection} and # replays the ones that were not when the # {OnStomp::Failover::Client failover} client reconnects. -class OnStomp::Failover::Buffers::Written +class OnStomp::Failover::Buffers::Written < OnStomp::Failover::Buffers::Base def initialize failover - @failover = failover - @buffer_mutex = Mutex.new - @buffer = [] - @txs = {} - - failover.before_send &method(:buffer_frame) - failover.before_commit &method(:buffer_frame) - failover.before_abort &method(:buffer_frame) - failover.before_subscribe &method(:buffer_frame) - failover.before_begin &method(:buffer_transaction) + super + [:send, :commit, :abort, :subscribe].each do |ev| + failover.__send__(:"before_#{ev}") do |f, *_| + add_to_buffer f + end + end + # We only want to scrub the transactions if ABORT or COMMIT was + # at least written fully to the socket. + [:commit, :abort].each do |ev| + failover.__send__(:"on_#{ev}") do |f,*_| + remove_from_transactions f + end + end + failover.before_begin { |f, *_| add_to_transactions f } # We can scrub the subscription before UNSUBSCRIBE is fully written # because if we replay before UNSUBSCRIBE was sent, we still don't # want to be subscribed when we reconnect. - failover.before_unsubscribe &method(:debuffer_subscription) - # We only want to scrub the transactions if ABORT or COMMIT was - # at least written fully to the socket. - failover.on_commit &method(:debuffer_transaction) - failover.on_abort &method(:debuffer_transaction) + failover.before_unsubscribe { |f, *_| remove_subscribe_from_buffer f } failover.on_send &method(:debuffer_non_transactional_frame) - - failover.on_failover_connected &method(:replay) + failover.on_failover_connected { |f,c,*_| replay_buffer c } end - - # Adds a frame to a buffer so that it may be replayed if the - # {OnStomp::Failover::Client failover} client re-connects - def buffer_frame f, *_ - @buffer_mutex.synchronize do - unless f.header? :'x-onstomp-failover-replay' - @buffer << f - end - end - end - - # Records the start of a transaction so that it may be replayed if the - # {OnStomp::Failover::Client failover} client re-connects - def buffer_transaction f, *_ - @txs[f[:transaction]] = true - buffer_frame f - end - - # Removes the recorded transaction from the buffer after it has been - # written the broker socket so that it will not be replayed when the - # {OnStomp::Failover::Client failover} client re-connects - def debuffer_transaction f, *_ - tx = f[:transaction] - if @txs.delete tx - @buffer_mutex.synchronize do - @buffer.reject! { |bf| bf[:transaction] == tx } - end - end - end - - # Removes the matching SUBSCRIBE frame from the buffer after the - # UNSUBSCRIBE has been added to the connection's write buffer - # so that it will not be replayed when the - # {OnStomp::Failover::Client failover} client re-connects - def debuffer_subscription f, *_ - @buffer_mutex.synchronize do - @buffer.reject! { |bf| bf.command == 'SUBSCRIBE' && bf[:id] == f[:id] } - end - end - + # Removes a frame that is not part of a transaction from the buffer # after it has been written the broker socket so that it will not be # replayed when the {OnStomp::Failover::Client failover} client re-connects def debuffer_non_transactional_frame f, *_ unless @txs.key?(f[:transaction]) @buffer_mutex.synchronize { @buffer.delete f } - end - end - - # Called when the {OnStomp::Failover::Client failover} client triggers - # `on_failover_connected` to start replaying any frames in the buffer. - def replay fail, client, *_ - replay_frames = @buffer_mutex.synchronize do - @buffer.select { |f| f[:'x-onstomp-failover-replay'] = '1'; true } - end - - replay_frames.each do |f| - client.transmit f end end end