Sha256: 00859d5501b8a4f1022952b0942ebea251510eafc9e162d49dfed44a1a4ade08

Contents?: true

Size: 1.02 KB

Versions: 2

Compression:

Stored size: 1.02 KB

Contents

# frozen_string_literal: true

module PgEventstore
  # This class pulls events from db and feeds given SubscriptionRunners
  # @!visibility private
  class SubscriptionRunnersFeeder
    # @param config_name [Symbol]
    def initialize(config_name)
      @config_name = config_name
    end

    # @param runners [Array<PgEventstore::SubscriptionRunner>]
    # @return [void]
    def feed(runners)
      runners = runners.select(&:running?).select(&:time_to_feed?)
      return if runners.empty?

      runners_query_options = runners.to_h { |runner| [runner.id, runner.next_chunk_query_opts] }
      grouped_events = subscription_queries.subscriptions_events(runners_query_options)
      runners.each do |runner|
        runner.feed(grouped_events[runner.id] || [])
      end
    end

    private

    # @return [PgEventstore::Connection]
    def connection
      PgEventstore.connection(@config_name)
    end

    # @return [PgEventstore::SubscriptionQueries]
    def subscription_queries
      SubscriptionQueries.new(connection)
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
pg_eventstore-1.0.0.rc2 lib/pg_eventstore/subscriptions/subscription_runners_feeder.rb
pg_eventstore-1.0.0.rc1 lib/pg_eventstore/subscriptions/subscription_runners_feeder.rb