lib/kafka/producer.rb in ruby-kafka-0.7.6 vs lib/kafka/producer.rb in ruby-kafka-0.7.7

- old
+ new

@@ -66,10 +66,12 @@ # Compression is enabled by passing the `compression_codec` parameter with the # name of one of the algorithms allowed by Kafka: # # * `:snappy` for [Snappy](http://google.github.io/snappy/) compression. # * `:gzip` for [gzip](https://en.wikipedia.org/wiki/Gzip) compression. + # * `:lz4` for [LZ4](https://en.wikipedia.org/wiki/LZ4_(compression_algorithm)) compression. + # * `:zstd` for [zstd](https://facebook.github.io/zstd/) compression. # # By default, all message sets will be compressed if you specify a compression # codec. To increase the compression threshold, set `compression_threshold` to # an integer value higher than one. # @@ -324,9 +326,23 @@ # is at IN_TRANSACTION state. # # @return [nil] def abort_transaction @transaction_manager.abort_transaction + end + + # Sends batch last offset to the consumer group coordinator, and also marks + # this offset as part of the current transaction. This offset will be considered + # committed only if the transaction is committed successfully. + # + # This method should be used when you need to batch consumed and produced messages + # together, typically in a consume-transform-produce pattern. Thus, the specified + # group_id should be the same as config parameter group_id of the used + # consumer. + # + # @return [nil] + def send_offsets_to_transaction(batch:, group_id:) + @transaction_manager.send_offsets_to_txn(offsets: { batch.topic => { batch.partition => { offset: batch.last_offset + 1, leader_epoch: batch.leader_epoch } } }, group_id: group_id) end # Syntactic sugar to enable easier transaction usage. Do the following steps # # - Start the transaction (with Producer#begin_transaction)