require "rdkafka" require "nulogy_message_bus_consumer/engine" require "nulogy_message_bus_consumer/config" require "nulogy_message_bus_consumer/handlers/log_unprocessed_messages" require "nulogy_message_bus_consumer/kafka_utils" require "nulogy_message_bus_consumer/message" require "nulogy_message_bus_consumer/null_logger" require "nulogy_message_bus_consumer/pipeline" require "nulogy_message_bus_consumer/processed_message" require "nulogy_message_bus_consumer/steps/commit_on_success" require "nulogy_message_bus_consumer/steps/connect_to_message_bus" require "nulogy_message_bus_consumer/steps/deduplicate_messages" require "nulogy_message_bus_consumer/steps/log_messages" require "nulogy_message_bus_consumer/steps/monitor_replication_lag" require "nulogy_message_bus_consumer/steps/seek_beginning_of_topic" require "nulogy_message_bus_consumer/steps/stream_messages" require "nulogy_message_bus_consumer/steps/stream_messages_until_none_are_left" module NulogyMessageBusConsumer module_function mattr_accessor :config mattr_accessor :logger def configure(options = {}) self.config ||= Config.new config.update(options) if options.present? yield(config) if block_given? end def logger @logger ||= NullLogger.new end def invoke_pipeline(*steps) Pipeline.new(steps).invoke end def recommended_consumer_pipeline(config: self.config, logger: self.logger) Pipeline.new([ # The first three are really system processing steps Steps::ConnectToMessageBus.new(config, logger), Steps::MonitorReplicationLag.new(logger), Steps::StreamMessages.new(logger), # Message processing steps start here. Steps::LogMessages.new(logger), Steps::CommitOnSuccess.new, Steps::DeduplicateMessages.new(logger), ]) end def consumer_audit_pipeline(config: self.config, logger: self.logger) Pipeline.new([ Steps::ConnectToMessageBus.new(config, logger), Steps::SeekBeginningOfTopic.new, Steps::StreamMessagesUntilNoneAreLeft.new(logger), Handlers::LogUnprocessedMessages.new(logger), ]) end end