lib/racecar/runner.rb in racecar-1.3.0 vs lib/racecar/runner.rb in racecar-2.0.0.alpha1
- old
+ new
@@ -1,153 +1,222 @@
-require "kafka"
+require "rdkafka"
+require "racecar/pause"
module Racecar
class Runner
- attr_reader :processor, :config, :logger, :consumer
+ attr_reader :processor, :config, :logger
def initialize(processor, config:, logger:, instrumenter: NullInstrumenter)
@processor, @config, @logger = processor, config, logger
@instrumenter = instrumenter
+ @stop_requested = false
+ Rdkafka::Config.logger = logger
+
+ if processor.respond_to?(:statistics_callback)
+ Rdkafka::Config.statistics_callback = processor.method(:statistics_callback).to_proc
+ end
+
+ setup_pauses
end
- def stop
- Thread.new do
- processor.teardown
- consumer.stop unless consumer.nil?
- end.join
+ def setup_pauses
+ timeout = if config.pause_timeout == -1
+ nil
+ elsif config.pause_timeout == 0
+ # no op, handled elsewhere
+ elsif config.pause_timeout > 0
+ config.pause_timeout
+ else
+ raise ArgumentError, "Invalid value for pause_timeout: must be integer greater or equal -1"
+ end
+
+ @pauses = Hash.new {|h, k|
+ h[k] = Hash.new {|h2, k2|
+ h2[k2] = ::Racecar::Pause.new(
+ timeout: timeout,
+ max_timeout: config.max_pause_timeout,
+ exponential_backoff: config.pause_with_exponential_backoff
+ )
+ }
+ }
end
def run
- kafka = Kafka.new(
- client_id: config.client_id,
- seed_brokers: config.brokers,
- logger: logger,
- connect_timeout: config.connect_timeout,
- socket_timeout: config.socket_timeout,
- ssl_ca_cert: config.ssl_ca_cert,
- ssl_ca_cert_file_path: config.ssl_ca_cert_file_path,
- ssl_client_cert: config.ssl_client_cert,
- ssl_client_cert_key: config.ssl_client_cert_key,
- ssl_client_cert_key_password: config.ssl_client_cert_key_password,
- sasl_plain_username: config.sasl_plain_username,
- sasl_plain_password: config.sasl_plain_password,
- sasl_scram_username: config.sasl_scram_username,
- sasl_scram_password: config.sasl_scram_password,
- sasl_scram_mechanism: config.sasl_scram_mechanism,
- sasl_oauth_token_provider: config.sasl_oauth_token_provider,
- sasl_over_ssl: config.sasl_over_ssl,
- ssl_ca_certs_from_system: config.ssl_ca_certs_from_system,
- ssl_verify_hostname: config.ssl_verify_hostname,
- partitioner: Kafka::Partitioner.new(hash_function: config.partitioner)
- )
+ install_signal_handlers
+ @stop_requested = false
- @consumer = kafka.consumer(
- group_id: config.group_id,
- offset_commit_interval: config.offset_commit_interval,
- offset_commit_threshold: config.offset_commit_threshold,
- session_timeout: config.session_timeout,
- heartbeat_interval: config.heartbeat_interval,
- offset_retention_time: config.offset_retention_time,
- fetcher_max_queue_size: config.max_fetch_queue_size,
+ # Configure the consumer with a producer so it can produce messages and
+ # with a consumer so that it can support advanced use-cases.
+ processor.configure(
+ producer: producer,
+ consumer: consumer,
+ instrumenter: @instrumenter,
)
- # Stop the consumer on SIGINT, SIGQUIT or SIGTERM.
- trap("QUIT") { stop }
- trap("INT") { stop }
- trap("TERM") { stop }
+ instrument_payload = { consumer_class: processor.class.to_s, consumer_set: consumer }
- # Print the consumer config to STDERR on USR1.
- trap("USR1") { $stderr.puts config.inspect }
-
- config.subscriptions.each do |subscription|
- consumer.subscribe(
- subscription.topic,
- start_from_beginning: subscription.start_from_beginning,
- max_bytes_per_partition: subscription.max_bytes_per_partition,
- )
+ # Main loop
+ loop do
+ break if @stop_requested
+ resume_paused_partitions
+ @instrumenter.instrument("main_loop.racecar", instrument_payload) do
+ case process_method
+ when :batch then
+ msg_per_part = consumer.batch_poll(config.max_wait_time).group_by(&:partition)
+ msg_per_part.each_value do |messages|
+ process_batch(messages)
+ end
+ when :single then
+ message = consumer.poll(config.max_wait_time)
+ process(message) if message
+ end
+ end
end
- # Configure the consumer with a producer so it can produce messages.
- producer = kafka.producer(
- compression_codec: config.producer_compression_codec,
- )
+ logger.info "Gracefully shutting down"
+ processor.deliver!
+ processor.teardown
+ consumer.commit
+ consumer.close
+ end
- processor.configure(consumer: consumer, producer: producer)
+ def stop
+ @stop_requested = true
+ end
- begin
- if processor.respond_to?(:process)
- consumer.each_message(max_wait_time: config.max_wait_time, max_bytes: config.max_bytes) do |message|
- payload = {
- consumer_class: processor.class.to_s,
- topic: message.topic,
- partition: message.partition,
- offset: message.offset,
- }
+ private
- @instrumenter.instrument("process_message.racecar", payload) do
- processor.process(message)
- producer.deliver_messages
- end
- end
- elsif processor.respond_to?(:process_batch)
- consumer.each_batch(max_wait_time: config.max_wait_time, max_bytes: config.max_bytes) do |batch|
- payload = {
- consumer_class: processor.class.to_s,
- topic: batch.topic,
- partition: batch.partition,
- first_offset: batch.first_offset,
- message_count: batch.messages.count,
- }
+ attr_reader :pauses
- @instrumenter.instrument("process_batch.racecar", payload) do
- processor.process_batch(batch)
- producer.deliver_messages
- end
- end
+ def process_method
+ @process_method ||= begin
+ case
+ when processor.respond_to?(:process_batch) then :batch
+ when processor.respond_to?(:process) then :single
else
raise NotImplementedError, "Consumer class must implement process or process_batch method"
end
- rescue Kafka::ProcessingError => e
- @logger.error "Error processing partition #{e.topic}/#{e.partition} at offset #{e.offset}"
+ end
+ end
- if config.pause_timeout > 0
- # Pause fetches from the partition. We'll continue processing the other partitions in the topic.
- # The partition is automatically resumed after the specified timeout, and will continue where we
- # left off.
- @logger.warn "Pausing partition #{e.topic}/#{e.partition} for #{config.pause_timeout} seconds"
- consumer.pause(
- e.topic,
- e.partition,
- timeout: config.pause_timeout,
- max_timeout: config.max_pause_timeout,
- exponential_backoff: config.pause_with_exponential_backoff?,
- )
- elsif config.pause_timeout == -1
- # A pause timeout of -1 means indefinite pausing, which in ruby-kafka is done by passing nil as
- # the timeout.
- @logger.warn "Pausing partition #{e.topic}/#{e.partition} indefinitely, or until the process is restarted"
- consumer.pause(e.topic, e.partition, timeout: nil)
+ def consumer
+ @consumer ||= begin
+ # Manually store offset after messages have been processed successfully
+ # to avoid marking failed messages as committed. The call just updates
+ # a value within librdkafka and is asynchronously written to proper
+ # storage through auto commits.
+ config.consumer << "enable.auto.offset.store=false"
+ ConsumerSet.new(config, logger)
+ end
+ end
+
+ def producer
+ @producer ||= Rdkafka::Config.new(producer_config).producer.tap do |producer|
+ producer.delivery_callback = delivery_callback
+ end
+ end
+
+ def producer_config
+ # https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
+ producer_config = {
+ "bootstrap.servers" => config.brokers.join(","),
+ "client.id" => config.client_id,
+ "statistics.interval.ms" => 1000,
+ }
+ producer_config["compression.codec"] = config.producer_compression_codec.to_s unless config.producer_compression_codec.nil?
+ producer_config.merge!(config.rdkafka_producer)
+ producer_config
+ end
+
+ def delivery_callback
+ ->(delivery_report) do
+ data = {offset: delivery_report.offset, partition: delivery_report.partition}
+ @instrumenter.instrument("acknowledged_message.racecar", data)
+ end
+ end
+
+ def install_signal_handlers
+ # Stop the consumer on SIGINT, SIGQUIT or SIGTERM.
+ trap("QUIT") { stop }
+ trap("INT") { stop }
+ trap("TERM") { stop }
+
+ # Print the consumer config to STDERR on USR1.
+ trap("USR1") { $stderr.puts config.inspect }
+ end
+
+ def process(message)
+ payload = {
+ consumer_class: processor.class.to_s,
+ topic: message.topic,
+ partition: message.partition,
+ offset: message.offset,
+ }
+
+ @instrumenter.instrument("process_message.racecar", payload) do
+ with_pause(message.topic, message.partition, message.offset..message.offset) do
+ processor.process(message)
+ processor.deliver!
+ consumer.store_offset(message)
end
+ end
+ end
- config.error_handler.call(e.cause, {
- topic: e.topic,
- partition: e.partition,
- offset: e.offset,
- })
+ def process_batch(messages)
+ payload = {
+ consumer_class: processor.class.to_s,
+ topic: messages.first.topic,
+ partition: messages.first.partition,
+ first_offset: messages.first.offset,
+ message_count: messages.size,
+ }
- # Restart the consumer loop.
- retry
- rescue Kafka::InvalidSessionTimeout
- raise ConfigError, "`session_timeout` is set either too high or too low"
- rescue Kafka::Error => e
- error = "#{e.class}: #{e.message}\n" + e.backtrace.join("\n")
- @logger.error "Consumer thread crashed: #{error}"
+ @instrumenter.instrument("process_batch.racecar", payload) do
+ first, last = messages.first, messages.last
+ with_pause(first.topic, first.partition, first.offset..last.offset) do
+ processor.process_batch(messages)
+ processor.deliver!
+ consumer.store_offset(messages.last)
+ end
+ end
+ end
- config.error_handler.call(e)
+ def with_pause(topic, partition, offsets)
+ return yield if config.pause_timeout == 0
- raise
- else
- @logger.info "Gracefully shutting down"
+ begin
+ yield
+ # We've successfully processed a batch from the partition, so we can clear the pause.
+ pauses[topic][partition].reset!
+ rescue => e
+ desc = "#{topic}/#{partition}"
+ logger.error "Failed to process #{desc} at #{offsets}: #{e}"
+
+ pause = pauses[topic][partition]
+ logger.warn "Pausing partition #{desc} for #{pause.backoff_interval} seconds"
+ consumer.pause(topic, partition, offsets.first)
+ pause.pause!
+ end
+ end
+
+ def resume_paused_partitions
+ return if config.pause_timeout == 0
+
+ pauses.each do |topic, partitions|
+ partitions.each do |partition, pause|
+ @instrumenter.instrument("pause_status.racecar", {
+ topic: topic,
+ partition: partition,
+ duration: pause.pause_duration,
+ })
+
+ if pause.paused? && pause.expired?
+ logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired"
+ consumer.resume(topic, partition)
+ pause.resume!
+ # TODO: # During re-balancing we might have lost the paused partition. Check if partition is still in group before seek. ?
+ end
+ end
end
end
end
end