require "active_record/railtie" require "active_support/core_ext/time/zones" require "rdkafka" require "nulogy_message_bus_consumer/engine" require "nulogy_message_bus_consumer/clock" require "nulogy_message_bus_consumer/config" require "nulogy_message_bus_consumer/deployment/ecs" require "nulogy_message_bus_consumer/handlers/log_unprocessed_messages" require "nulogy_message_bus_consumer/kafka_utils" require "nulogy_message_bus_consumer/lag_tracker" require "nulogy_message_bus_consumer/message" require "nulogy_message_bus_consumer/null_logger" require "nulogy_message_bus_consumer/pipeline" require "nulogy_message_bus_consumer/processed_message" require "nulogy_message_bus_consumer/steps/commit_on_success" require "nulogy_message_bus_consumer/steps/connect_to_message_bus" require "nulogy_message_bus_consumer/steps/deduplicate_messages" require "nulogy_message_bus_consumer/steps/log_consumer_lag" require "nulogy_message_bus_consumer/steps/log_messages" require "nulogy_message_bus_consumer/steps/seek_beginning_of_topic" require "nulogy_message_bus_consumer/steps/stream_messages" require "nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left" require "nulogy_message_bus_consumer/steps/supervise_consumer_lag" module NulogyMessageBusConsumer module_function mattr_accessor :config mattr_accessor :logger def configure(options = {}) self.config ||= Config.new config.update(options) if options.present? yield(config) if block_given? end def logger @logger ||= NullLogger.new end def invoke_pipeline(*steps) Pipeline.new(steps).invoke end def recommended_consumer_pipeline(config: self.config, logger: self.logger) Pipeline.new([ # System processing/health steps. # Note: that since they are before `StreamMessages`, they will only # be called once, without any messages. Steps::ConnectToMessageBus.new(config, logger), Steps::LogConsumerLag.new(logger), Steps::SuperviseConsumerLag.new( logger, check_interval_seconds: config.lag_check_interval_seconds, tracker: LagTracker.new(failing_checks: config.lag_checks) ), Steps::StreamMessages.new(logger), # Message processing steps start here. Steps::LogMessages.new(logger), Steps::CommitOnSuccess.new, Steps::DeduplicateMessages.new(logger) ]) end def consumer_audit_pipeline(config: self.config, logger: self.logger) Pipeline.new([ Steps::ConnectToMessageBus.new(config, logger), Steps::SeekBeginningOfTopic.new, Steps::StreamMessagesUntilNoneAreLeft.new(logger), Handlers::LogUnprocessedMessages.new(logger) ]) end end