lib/rdkafka/consumer.rb in karafka-rdkafka-0.14.5 vs lib/rdkafka/consumer.rb in karafka-rdkafka-0.14.6

- old
+ new

@@ -241,11 +241,11 @@ # @param list [TopicPartitionList, nil] The topic with partitions to get the offsets for or nil # to use the current subscription. # @param timeout_ms [Integer] The timeout for fetching this information. # @return [TopicPartitionList] # @raise [RdkafkaError] When getting the committed positions fails. - def committed(list=nil, timeout_ms=1200) + def committed(list=nil, timeout_ms=2_000) closed_consumer_check(__method__) if list.nil? list = assignment elsif !list.is_a?(TopicPartitionList) @@ -381,19 +381,37 @@ # Store offset of a message to be used in the next commit of this consumer # # When using this `enable.auto.offset.store` should be set to `false` in the config. # # @param message [Rdkafka::Consumer::Message] The message which offset will be stored + # @param metadata [String, nil] commit metadata string or nil if none # @return [nil] # @raise [RdkafkaError] When storing the offset fails - def store_offset(message) + def store_offset(message, metadata = nil) closed_consumer_check(__method__) list = TopicPartitionList.new - list.add_topic_and_partitions_with_offsets( - message.topic, - message.partition => message.offset + 1 - ) + + # For metadata aware commits we build the partition reference directly to save on + # objects allocations + if metadata + list.add_topic_and_partitions_with_offsets( + message.topic, + [ + Consumer::Partition.new( + message.partition, + message.offset + 1, + 0, + metadata + ) + ] + ) + else + list.add_topic_and_partitions_with_offsets( + message.topic, + message.partition => message.offset + 1 + ) + end tpl = list.to_native_tpl response = @native_kafka.with_inner do |inner| Rdkafka::Bindings.rd_kafka_offsets_store(