lib/racecar/runner.rb in racecar-1.3.0 vs lib/racecar/runner.rb in racecar-2.0.0.alpha1

- old
+ new

@@ -1,153 +1,222 @@ -require "kafka" +require "rdkafka" +require "racecar/pause" module Racecar class Runner - attr_reader :processor, :config, :logger, :consumer + attr_reader :processor, :config, :logger def initialize(processor, config:, logger:, instrumenter: NullInstrumenter) @processor, @config, @logger = processor, config, logger @instrumenter = instrumenter + @stop_requested = false + Rdkafka::Config.logger = logger + + if processor.respond_to?(:statistics_callback) + Rdkafka::Config.statistics_callback = processor.method(:statistics_callback).to_proc + end + + setup_pauses end - def stop - Thread.new do - processor.teardown - consumer.stop unless consumer.nil? - end.join + def setup_pauses + timeout = if config.pause_timeout == -1 + nil + elsif config.pause_timeout == 0 + # no op, handled elsewhere + elsif config.pause_timeout > 0 + config.pause_timeout + else + raise ArgumentError, "Invalid value for pause_timeout: must be integer greater or equal -1" + end + + @pauses = Hash.new {|h, k| + h[k] = Hash.new {|h2, k2| + h2[k2] = ::Racecar::Pause.new( + timeout: timeout, + max_timeout: config.max_pause_timeout, + exponential_backoff: config.pause_with_exponential_backoff + ) + } + } end def run - kafka = Kafka.new( - client_id: config.client_id, - seed_brokers: config.brokers, - logger: logger, - connect_timeout: config.connect_timeout, - socket_timeout: config.socket_timeout, - ssl_ca_cert: config.ssl_ca_cert, - ssl_ca_cert_file_path: config.ssl_ca_cert_file_path, - ssl_client_cert: config.ssl_client_cert, - ssl_client_cert_key: config.ssl_client_cert_key, - ssl_client_cert_key_password: config.ssl_client_cert_key_password, - sasl_plain_username: config.sasl_plain_username, - sasl_plain_password: config.sasl_plain_password, - sasl_scram_username: config.sasl_scram_username, - sasl_scram_password: config.sasl_scram_password, - sasl_scram_mechanism: config.sasl_scram_mechanism, - sasl_oauth_token_provider: config.sasl_oauth_token_provider, - sasl_over_ssl: config.sasl_over_ssl, - ssl_ca_certs_from_system: config.ssl_ca_certs_from_system, - ssl_verify_hostname: config.ssl_verify_hostname, - partitioner: Kafka::Partitioner.new(hash_function: config.partitioner) - ) + install_signal_handlers + @stop_requested = false - @consumer = kafka.consumer( - group_id: config.group_id, - offset_commit_interval: config.offset_commit_interval, - offset_commit_threshold: config.offset_commit_threshold, - session_timeout: config.session_timeout, - heartbeat_interval: config.heartbeat_interval, - offset_retention_time: config.offset_retention_time, - fetcher_max_queue_size: config.max_fetch_queue_size, + # Configure the consumer with a producer so it can produce messages and + # with a consumer so that it can support advanced use-cases. + processor.configure( + producer: producer, + consumer: consumer, + instrumenter: @instrumenter, ) - # Stop the consumer on SIGINT, SIGQUIT or SIGTERM. - trap("QUIT") { stop } - trap("INT") { stop } - trap("TERM") { stop } + instrument_payload = { consumer_class: processor.class.to_s, consumer_set: consumer } - # Print the consumer config to STDERR on USR1. - trap("USR1") { $stderr.puts config.inspect } - - config.subscriptions.each do |subscription| - consumer.subscribe( - subscription.topic, - start_from_beginning: subscription.start_from_beginning, - max_bytes_per_partition: subscription.max_bytes_per_partition, - ) + # Main loop + loop do + break if @stop_requested + resume_paused_partitions + @instrumenter.instrument("main_loop.racecar", instrument_payload) do + case process_method + when :batch then + msg_per_part = consumer.batch_poll(config.max_wait_time).group_by(&:partition) + msg_per_part.each_value do |messages| + process_batch(messages) + end + when :single then + message = consumer.poll(config.max_wait_time) + process(message) if message + end + end end - # Configure the consumer with a producer so it can produce messages. - producer = kafka.producer( - compression_codec: config.producer_compression_codec, - ) + logger.info "Gracefully shutting down" + processor.deliver! + processor.teardown + consumer.commit + consumer.close + end - processor.configure(consumer: consumer, producer: producer) + def stop + @stop_requested = true + end - begin - if processor.respond_to?(:process) - consumer.each_message(max_wait_time: config.max_wait_time, max_bytes: config.max_bytes) do |message| - payload = { - consumer_class: processor.class.to_s, - topic: message.topic, - partition: message.partition, - offset: message.offset, - } + private - @instrumenter.instrument("process_message.racecar", payload) do - processor.process(message) - producer.deliver_messages - end - end - elsif processor.respond_to?(:process_batch) - consumer.each_batch(max_wait_time: config.max_wait_time, max_bytes: config.max_bytes) do |batch| - payload = { - consumer_class: processor.class.to_s, - topic: batch.topic, - partition: batch.partition, - first_offset: batch.first_offset, - message_count: batch.messages.count, - } + attr_reader :pauses - @instrumenter.instrument("process_batch.racecar", payload) do - processor.process_batch(batch) - producer.deliver_messages - end - end + def process_method + @process_method ||= begin + case + when processor.respond_to?(:process_batch) then :batch + when processor.respond_to?(:process) then :single else raise NotImplementedError, "Consumer class must implement process or process_batch method" end - rescue Kafka::ProcessingError => e - @logger.error "Error processing partition #{e.topic}/#{e.partition} at offset #{e.offset}" + end + end - if config.pause_timeout > 0 - # Pause fetches from the partition. We'll continue processing the other partitions in the topic. - # The partition is automatically resumed after the specified timeout, and will continue where we - # left off. - @logger.warn "Pausing partition #{e.topic}/#{e.partition} for #{config.pause_timeout} seconds" - consumer.pause( - e.topic, - e.partition, - timeout: config.pause_timeout, - max_timeout: config.max_pause_timeout, - exponential_backoff: config.pause_with_exponential_backoff?, - ) - elsif config.pause_timeout == -1 - # A pause timeout of -1 means indefinite pausing, which in ruby-kafka is done by passing nil as - # the timeout. - @logger.warn "Pausing partition #{e.topic}/#{e.partition} indefinitely, or until the process is restarted" - consumer.pause(e.topic, e.partition, timeout: nil) + def consumer + @consumer ||= begin + # Manually store offset after messages have been processed successfully + # to avoid marking failed messages as committed. The call just updates + # a value within librdkafka and is asynchronously written to proper + # storage through auto commits. + config.consumer << "enable.auto.offset.store=false" + ConsumerSet.new(config, logger) + end + end + + def producer + @producer ||= Rdkafka::Config.new(producer_config).producer.tap do |producer| + producer.delivery_callback = delivery_callback + end + end + + def producer_config + # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md + producer_config = { + "bootstrap.servers" => config.brokers.join(","), + "client.id" => config.client_id, + "statistics.interval.ms" => 1000, + } + producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil? + producer_config.merge!(config.rdkafka_producer) + producer_config + end + + def delivery_callback + ->(delivery_report) do + data = {offset: delivery_report.offset, partition: delivery_report.partition} + @instrumenter.instrument("acknowledged_message.racecar", data) + end + end + + def install_signal_handlers + # Stop the consumer on SIGINT, SIGQUIT or SIGTERM. + trap("QUIT") { stop } + trap("INT") { stop } + trap("TERM") { stop } + + # Print the consumer config to STDERR on USR1. + trap("USR1") { $stderr.puts config.inspect } + end + + def process(message) + payload = { + consumer_class: processor.class.to_s, + topic: message.topic, + partition: message.partition, + offset: message.offset, + } + + @instrumenter.instrument("process_message.racecar", payload) do + with_pause(message.topic, message.partition, message.offset..message.offset) do + processor.process(message) + processor.deliver! + consumer.store_offset(message) end + end + end - config.error_handler.call(e.cause, { - topic: e.topic, - partition: e.partition, - offset: e.offset, - }) + def process_batch(messages) + payload = { + consumer_class: processor.class.to_s, + topic: messages.first.topic, + partition: messages.first.partition, + first_offset: messages.first.offset, + message_count: messages.size, + } - # Restart the consumer loop. - retry - rescue Kafka::InvalidSessionTimeout - raise ConfigError, "`session_timeout` is set either too high or too low" - rescue Kafka::Error => e - error = "#{e.class}: #{e.message}\n" + e.backtrace.join("\n") - @logger.error "Consumer thread crashed: #{error}" + @instrumenter.instrument("process_batch.racecar", payload) do + first, last = messages.first, messages.last + with_pause(first.topic, first.partition, first.offset..last.offset) do + processor.process_batch(messages) + processor.deliver! + consumer.store_offset(messages.last) + end + end + end - config.error_handler.call(e) + def with_pause(topic, partition, offsets) + return yield if config.pause_timeout == 0 - raise - else - @logger.info "Gracefully shutting down" + begin + yield + # We've successfully processed a batch from the partition, so we can clear the pause. + pauses[topic][partition].reset! + rescue => e + desc = "#{topic}/#{partition}" + logger.error "Failed to process #{desc} at #{offsets}: #{e}" + + pause = pauses[topic][partition] + logger.warn "Pausing partition #{desc} for #{pause.backoff_interval} seconds" + consumer.pause(topic, partition, offsets.first) + pause.pause! + end + end + + def resume_paused_partitions + return if config.pause_timeout == 0 + + pauses.each do |topic, partitions| + partitions.each do |partition, pause| + @instrumenter.instrument("pause_status.racecar", { + topic: topic, + partition: partition, + duration: pause.pause_duration, + }) + + if pause.paused? && pause.expired? + logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired" + consumer.resume(topic, partition) + pause.resume! + # TODO: # During re-balancing we might have lost the paused partition. Check if partition is still in group before seek. ? + end + end end end end end