lib/kafka/producer.rb in ruby-kafka-0.7.6 vs lib/kafka/producer.rb in ruby-kafka-0.7.7
- old
+ new
@@ -66,10 +66,12 @@
# Compression is enabled by passing the `compression_codec` parameter with the
# name of one of the algorithms allowed by Kafka:
#
# * `:snappy` for [Snappy](http://google.github.io/snappy/) compression.
# * `:gzip` for [gzip](https://en.wikipedia.org/wiki/Gzip) compression.
+ # * `:lz4` for [LZ4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)) compression.
+ # * `:zstd` for [zstd](https://facebook.github.io/zstd/) compression.
#
# By default, all message sets will be compressed if you specify a compression
# codec. To increase the compression threshold, set `compression_threshold` to
# an integer value higher than one.
#
@@ -324,9 +326,23 @@
# is at IN_TRANSACTION state.
#
# @return [nil]
def abort_transaction
@transaction_manager.abort_transaction
+ end
+
+ # Sends batch last offset to the consumer group coordinator, and also marks
+ # this offset as part of the current transaction. This offset will be considered
+ # committed only if the transaction is committed successfully.
+ #
+ # This method should be used when you need to batch consumed and produced messages
+ # together, typically in a consume-transform-produce pattern. Thus, the specified
+ # group_id should be the same as config parameter group_id of the used
+ # consumer.
+ #
+ # @return [nil]
+ def send_offsets_to_transaction(batch:, group_id:)
+ @transaction_manager.send_offsets_to_txn(offsets: { batch.topic => { batch.partition => { offset: batch.last_offset + 1, leader_epoch: batch.leader_epoch } } }, group_id: group_id)
end
# Syntactic sugar to enable easier transaction usage. Do the following steps
#
# - Start the transaction (with Producer#begin_transaction)