Sha256: 480c8f590bae5d5d7b3bf5f376311966433f46f467c324e8c73b36e07dd6fddf

Contents?: true

Size: 1.26 KB

Versions: 2

Compression:

Stored size: 1.26 KB

Contents

module EventSourcery
  module EventStore
    class Subscription
      def initialize(event_store:,
                     poll_waiter:,
                     from_event_id:,
                     event_types: nil,
                     on_new_events:,
                     subscription_master:,
                     events_table_name: :events)
        @event_store = event_store
        @from_event_id = from_event_id
        @poll_waiter = poll_waiter
        @event_types = event_types
        @on_new_events = on_new_events
        @subscription_master = subscription_master
        @current_event_id = from_event_id - 1
      end

      def start
        catch(:stop) do
          @poll_waiter.poll do
            read_events
          end
        end
      end

      private

      def read_events
        loop do
          @subscription_master.shutdown_if_requested
          events = @event_store.get_next_from(@current_event_id + 1, event_types: @event_types)
          break if events.empty?
          EventSourcery.logger.debug { "New events in subscription: #{events.inspect}" }
          @on_new_events.call(events)
          @current_event_id = events.last.id
          EventSourcery.logger.debug { "Position in stream: #{@current_event_id}" }
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
event_sourcery-0.14.0 lib/event_sourcery/event_store/subscription.rb
event_sourcery-0.13.0 lib/event_sourcery/event_store/subscription.rb