Sha256: 7ceb41508e164d82dcfef2795a32f7604975f3db3abc5f55fb8a27adb219ef3b

Contents?: true

Size: 1.23 KB

Versions: 1

Compression:

Stored size: 1.23 KB

Contents

# `rd_kafka_offsets_store()` (et.al) returns an error for any
# partition that is not currently assigned (through `rd_kafka_*assign()`).
# This prevents a race condition where an application would store offsets
# after the assigned partitions had been revoked (which resets the stored
# offset), that could cause these old stored offsets to be committed later
# when the same partitions were assigned to this consumer again - effectively
# overwriting any committed offsets by any consumers that were assigned the
# same partitions previously. This would typically result in the offsets
# rewinding and messages to be reprocessed.
# As an extra effort to avoid this situation the stored offset is now
# also reset when partitions are assigned (through `rd_kafka_*assign()`).
module Racecar
  class ErroneousStateError < StandardError
    def initialize(rdkafka_error)
      raise rdkafka_error unless rdkafka_error.is_a?(Rdkafka::RdkafkaError)

      @rdkafka_error = rdkafka_error
    end

    attr_reader :rdkafka_error

    def code
      @rdkafka_error.code
    end

    def to_s
      <<~EOM
        Partition is no longer assigned to this consumer and the offset could not be stored for commit:
        #{@rdkafka_error.to_s}
      EOM
    end

  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
racecar-2.8.1 lib/racecar/erroneous_state_error.rb