lib/rdkafka/consumer.rb in karafka-rdkafka-0.14.5 vs lib/rdkafka/consumer.rb in karafka-rdkafka-0.14.6
- old
+ new
@@ -241,11 +241,11 @@
# @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil
# to use the current subscription.
# @param timeout_ms [Integer] The timeout for fetching this information.
# @return [TopicPartitionList]
# @raise [RdkafkaError] When getting the committed positions fails.
- def committed(list=nil, timeout_ms=1200)
+ def committed(list=nil, timeout_ms=2_000)
closed_consumer_check(__method__)
if list.nil?
list = assignment
elsif !list.is_a?(TopicPartitionList)
@@ -381,19 +381,37 @@
# Store offset of a message to be used in the next commit of this consumer
#
# When using this `enable.auto.offset.store` should be set to `false` in the config.
#
# @param message [Rdkafka::Consumer::Message] The message which offset will be stored
+ # @param metadata [String, nil] commit metadata string or nil if none
# @return [nil]
# @raise [RdkafkaError] When storing the offset fails
- def store_offset(message)
+ def store_offset(message, metadata = nil)
closed_consumer_check(__method__)
list = TopicPartitionList.new
- list.add_topic_and_partitions_with_offsets(
- message.topic,
- message.partition => message.offset + 1
- )
+
+ # For metadata aware commits we build the partition reference directly to save on
+ # objects allocations
+ if metadata
+ list.add_topic_and_partitions_with_offsets(
+ message.topic,
+ [
+ Consumer::Partition.new(
+ message.partition,
+ message.offset + 1,
+ 0,
+ metadata
+ )
+ ]
+ )
+ else
+ list.add_topic_and_partitions_with_offsets(
+ message.topic,
+ message.partition => message.offset + 1
+ )
+ end
tpl = list.to_native_tpl
response = @native_kafka.with_inner do |inner|
Rdkafka::Bindings.rd_kafka_offsets_store(