Sha256: 84ef9d6a1a5944ed8978f118dfd948760197ef14589b5f14a8ba8606ee1dcb8e
Contents?: true
Size: 1.57 KB
Versions: 1
Compression:
Stored size: 1.57 KB
Contents
module PhobosDBCheckpoint module Handler include Phobos::Handler def self.included(base) base.extend(ClassMethods) end def ack(entity_id, event_time, event_type = nil, event_version = nil) PhobosDBCheckpoint::Ack.new(entity_id, event_time, event_type, event_version) end module ClassMethods include Phobos::Instrumentation include Phobos::Handler::ClassMethods def around_consume(payload, metadata) event = PhobosDBCheckpoint::Event.new( topic: metadata[:topic], group_id: metadata[:group_id], payload: payload ) event_metadata = { checksum: event.checksum }.merge(metadata) event_exists = instrument('db_checkpoint.event_already_exists_check', event_metadata) { event.exists? } if event_exists instrument('db_checkpoint.event_already_consumed', event_metadata) return end event_action = instrument('db_checkpoint.event_action', event_metadata) do yield end case event_action when PhobosDBCheckpoint::Ack instrument('db_checkpoint.event_acknowledged', event_metadata) do event.acknowledge!(event_action) end else instrument('db_checkpoint.event_skipped', event_metadata) end ensure # Returns any connections in use by the current thread back to the pool, and also returns # connections to the pool cached by threads that are no longer alive. ActiveRecord::Base.clear_active_connections! end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
phobos_db_checkpoint-0.4.0 | lib/phobos_db_checkpoint/handler.rb |