Sha256: 7ceb41508e164d82dcfef2795a32f7604975f3db3abc5f55fb8a27adb219ef3b
Contents?: true
Size: 1.23 KB
Versions: 1
Compression:
Stored size: 1.23 KB
Contents
# `rd_kafka_offsets_store()` (et.al) returns an error for any # partition that is not currently assigned (through `rd_kafka_*assign()`). # This prevents a race condition where an application would store offsets # after the assigned partitions had been revoked (which resets the stored # offset), that could cause these old stored offsets to be committed later # when the same partitions were assigned to this consumer again - effectively # overwriting any committed offsets by any consumers that were assigned the # same partitions previously. This would typically result in the offsets # rewinding and messages to be reprocessed. # As an extra effort to avoid this situation the stored offset is now # also reset when partitions are assigned (through `rd_kafka_*assign()`). module Racecar class ErroneousStateError < StandardError def initialize(rdkafka_error) raise rdkafka_error unless rdkafka_error.is_a?(Rdkafka::RdkafkaError) @rdkafka_error = rdkafka_error end attr_reader :rdkafka_error def code @rdkafka_error.code end def to_s <<~EOM Partition is no longer assigned to this consumer and the offset could not be stored for commit: #{@rdkafka_error.to_s} EOM end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
racecar-2.8.1 | lib/racecar/erroneous_state_error.rb |