Sha256: dbc94f2fbf18fdd67ddd569e540f91da17965f9ecc2e6f3e3152f956a39698af

Contents?: true

Size: 1.68 KB

Versions: 1

Compression:

Stored size: 1.68 KB

Contents

module PhobosDBCheckpoint
  module Handler
    include Phobos::Handler

    def self.included(base)
      base.extend(ClassMethods)
    end

    def ack(entity_id, event_time, event_type = nil, event_version = nil)
      PhobosDBCheckpoint::Ack.new(entity_id, event_time, event_type, event_version)
    end

    module ClassMethods
      include Phobos::Instrumentation
      include Phobos::Handler::ClassMethods

      def around_consume(payload, metadata)
        event = PhobosDBCheckpoint::Event.new(
          topic: metadata[:topic],
          group_id: metadata[:group_id],
          payload: payload
        )

        event_metadata = { checksum: event.checksum }.merge(metadata)

        instrument('db_checkpoint.around_consume', event_metadata) do
          event_exists = instrument('db_checkpoint.event_already_exists_check', event_metadata) { event.exists? }
          if event_exists
            instrument('db_checkpoint.event_already_consumed', event_metadata)
            return
          end

          event_action = instrument('db_checkpoint.event_action', event_metadata) do
            yield
          end

          case event_action
          when PhobosDBCheckpoint::Ack
            instrument('db_checkpoint.event_acknowledged', event_metadata) do
              event.acknowledge!(event_action)
            end
          else
            instrument('db_checkpoint.event_skipped', event_metadata)
          end
        end
      ensure
        # Returns any connections in use by the current thread back to the pool, and also returns
        # connections to the pool cached by threads that are no longer alive.
        ActiveRecord::Base.clear_active_connections!
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
phobos_db_checkpoint-0.5.0 lib/phobos_db_checkpoint/handler.rb