Sha256: 480c8f590bae5d5d7b3bf5f376311966433f46f467c324e8c73b36e07dd6fddf
Contents?: true
Size: 1.26 KB
Versions: 2
Compression:
Stored size: 1.26 KB
Contents
module EventSourcery module EventStore class Subscription def initialize(event_store:, poll_waiter:, from_event_id:, event_types: nil, on_new_events:, subscription_master:, events_table_name: :events) @event_store = event_store @from_event_id = from_event_id @poll_waiter = poll_waiter @event_types = event_types @on_new_events = on_new_events @subscription_master = subscription_master @current_event_id = from_event_id - 1 end def start catch(:stop) do @poll_waiter.poll do read_events end end end private def read_events loop do @subscription_master.shutdown_if_requested events = @event_store.get_next_from(@current_event_id + 1, event_types: @event_types) break if events.empty? EventSourcery.logger.debug { "New events in subscription: #{events.inspect}" } @on_new_events.call(events) @current_event_id = events.last.id EventSourcery.logger.debug { "Position in stream: #{@current_event_id}" } end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
event_sourcery-0.14.0 | lib/event_sourcery/event_store/subscription.rb |
event_sourcery-0.13.0 | lib/event_sourcery/event_store/subscription.rb |