Sha256: abddd36043e70426752ea8226baaf1a19d2264529024ba071cbb7fa7645ff882

Contents?: true

Size: 1.14 KB

Versions: 1

Compression:

Stored size: 1.14 KB

Contents

module PhobosDBCheckpoint
  module Handler
    include Phobos::Handler

    def self.included(base)
      base.extend(ClassMethods)
    end

    def ack(entity_id, event_time, event_type = nil, event_version = nil)
      PhobosDBCheckpoint::Ack.new(entity_id, event_time, event_type, event_version)
    end

    module ClassMethods
      include Phobos::Instrumentation
      include Phobos::Handler::ClassMethods

      def around_consume(payload, metadata)
        event = PhobosDBCheckpoint::Event.new(
          topic: metadata[:topic],
          group_id: metadata[:group_id],
          payload: payload
        )

        event_metadata = {checksum: event.checksum}.merge(metadata)
        if event.exists?
          instrument('db_checkpoint.event_already_consumed', event_metadata)
          return
        end

        event_action = yield
        case event_action
        when PhobosDBCheckpoint::Ack
          instrument('db_checkpoint.event_acknowledged', event_metadata) do
            event.acknowledge!(event_action)
          end
        else
          instrument('db_checkpoint.event_skipped', event_metadata)
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
phobos_db_checkpoint-0.1.1 lib/phobos_db_checkpoint/handler.rb