Sha256: 9da9db2761f233e7d333be380e7b453e22ce63db1108860c8a2fa3dac27af647

Contents?: true

Size: 1.38 KB

Versions: 2

Compression:

Stored size: 1.38 KB

Contents

module PhobosDBCheckpoint
  module Handler
    include Phobos::Handler

    def self.included(base)
      base.extend(ClassMethods)
    end

    def ack(entity_id, event_time, event_type = nil, event_version = nil)
      PhobosDBCheckpoint::Ack.new(entity_id, event_time, event_type, event_version)
    end

    module ClassMethods
      include Phobos::Instrumentation
      include Phobos::Handler::ClassMethods

      def around_consume(payload, metadata)
        event = PhobosDBCheckpoint::Event.new(
          topic: metadata[:topic],
          group_id: metadata[:group_id],
          payload: payload
        )

        event_metadata = {checksum: event.checksum}.merge(metadata)
        if event.exists?
          instrument('db_checkpoint.event_already_consumed', event_metadata)
          return
        end

        event_action = yield
        case event_action
        when PhobosDBCheckpoint::Ack
          instrument('db_checkpoint.event_acknowledged', event_metadata) do
            event.acknowledge!(event_action)
          end
        else
          instrument('db_checkpoint.event_skipped', event_metadata)
        end
      ensure
        # Returns any connections in use by the current thread back to the pool, and also returns
        # connections to the pool cached by threads that are no longer alive.
        ActiveRecord::Base.clear_active_connections!
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
phobos_db_checkpoint-0.3.0 lib/phobos_db_checkpoint/handler.rb
phobos_db_checkpoint-0.2.0 lib/phobos_db_checkpoint/handler.rb