Sha256: b5438313d4742bca0327426b70cf511a57dfe1174047fdf0e2a823602033a389

Contents?: true

Size: 1.9 KB

Versions: 11

Compression:

Stored size: 1.9 KB

Contents

# frozen_string_literal: true

module PgEventstore
  # This class actually processes events.
  # @!visibility private
  class EventsProcessor
    include Extensions::CallbacksExtension
    extend Forwardable

    def_delegators :@basic_runner, :state, :start, :stop, :wait_for_finish, :stop_async, :restore, :running?

    # @param handler [#call]
    def initialize(handler)
      @handler = handler
      @raw_events = []
      @basic_runner = BasicRunner.new(0, 5)
      attach_runner_callbacks
    end

    # @param raw_events [Array<Hash>]
    # @return [void]
    def feed(raw_events)
      callbacks.run_callbacks(:feed, raw_events.last&.dig('global_position'))
      @raw_events.push(*raw_events)
    end

    # Number of unprocessed events which are currently in a queue
    # @return [Integer]
    def events_left_in_chunk
      @raw_events.size
    end

    private

    # @param raw_event [Hash]
    # @return [void]
    def process_event(raw_event)
      callbacks.run_callbacks(:process, raw_event['global_position']) do
        @handler.call(raw_event)
      end
    end

    def attach_runner_callbacks
      @basic_runner.define_callback(:process_async, :before, method(:process_async))
      @basic_runner.define_callback(:after_runner_died, :before, method(:after_runner_died))
      @basic_runner.define_callback(:before_runner_restored, :before, method(:before_runner_restored))
      @basic_runner.define_callback(:change_state, :before, method(:change_state))
    end

    def process_async
      raw_event = @raw_events.shift
      return sleep 0.5 if raw_event.nil?

      process_event(raw_event)
    rescue
      @raw_events.unshift(raw_event)
      raise
    end

    def after_runner_died(...)
      callbacks.run_callbacks(:error, ...)
    end

    def before_runner_restored
      callbacks.run_callbacks(:restart)
    end

    def change_state(...)
      callbacks.run_callbacks(:change_state, ...)
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
pg_eventstore-0.10.1 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.9.0 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.8.0 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.7.2 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.7.1 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.7.0 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.6.0 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.5.3 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.5.2 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.5.0 lib/pg_eventstore/subscriptions/events_processor.rb
pg_eventstore-0.4.0 lib/pg_eventstore/subscriptions/events_processor.rb