Sha256: fd89330f337c10bae26421fee9f6d9ae7cf83a766087fdb16b720fc379731a55
Contents?: true
Size: 1.11 KB
Versions: 11
Compression:
Stored size: 1.11 KB
Contents
# frozen_string_literal: true module PgEventstore # This class pulls events from db and feeds given SubscriptionRunners # @!visibility private class SubscriptionRunnersFeeder # @param config_name [Symbol] def initialize(config_name) @config_name = config_name end # @param runners [Array<PgEventstore::SubscriptionRunner>] # @return [void] def feed(runners) runners = runners.select(&:running?).select(&:time_to_feed?) return if runners.empty? runners_query_options = runners.map { |runner| [runner.id, runner.next_chunk_query_opts] } raw_events = subscription_queries.subscriptions_events(runners_query_options) grouped_events = raw_events.group_by { |attrs| attrs['runner_id'] } runners.each do |runner| runner.feed(grouped_events[runner.id]) if grouped_events[runner.id] end end private # @return [PgEventstore::Connection] def connection PgEventstore.connection(@config_name) end # @return [PgEventstore::SubscriptionQueries] def subscription_queries SubscriptionQueries.new(connection) end end end
Version data entries
11 entries across 11 versions & 1 rubygems