Sha256: 05be179408223dc8c6e65cc29da34d90ef9cbb0ced10bd7b051a3a69569d9721
Contents?: true
Size: 1.29 KB
Versions: 1
Compression:
Stored size: 1.29 KB
Contents
# frozen_string_literal: true require 'eventsimple/outbox/models/cursor' module Eventsimple module Outbox module Consumer def self.extended(klass) klass.class_exec do class_attribute :_event_klass class_attribute :_processor_klass class_attribute :stop_consumer, default: false Signal.trap('SIGINT') do self.stop_consumer = true STDOUT.puts('SIGINT received, stopping consumer') end end end def consumes_event(event_klass, concurrency: 1) event_klass._outbox_mode = true event_klass._outbox_concurrency = concurrency self._event_klass = event_klass end def processor(processor_klass) self._processor_klass = processor_klass end def start # rubocop:disable Metrics/AbcSize cursor = Outbox::Cursor.fetch(_event_klass, 0) until stop_consumer _event_klass.unscoped.in_batches(start: cursor + 1, load: true).each do |batch| batch.each do |event| _processor_klass.new(event).call break if stop_consumer end cursor = batch.last.id Outbox::Cursor.set(_event_klass, 0, cursor) end sleep(1) end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
eventsimple-1.0.0 | lib/eventsimple/outbox/consumer.rb |