lib/kafka/producer.rb in ruby-kafka-0.7.0 vs lib/kafka/producer.rb in ruby-kafka-0.7.1.beta1
- old
+ new
@@ -7,11 +7,10 @@
require "kafka/pending_message_queue"
require "kafka/pending_message"
require "kafka/compressor"
module Kafka
-
# Allows sending messages to a Kafka cluster.
#
# Typically you won't instantiate this class yourself, but rather have {Kafka::Client}
# do it for you, e.g.
#
@@ -124,13 +123,15 @@
#
# producer.shutdown
# end
#
class Producer
+ class AbortTransaction < StandardError; end
- def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
+ def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
@cluster = cluster
+ @transaction_manager = transaction_manager
@logger = logger
@instrumenter = instrumenter
@required_acks = required_acks == :all ? -1 : required_acks
@ack_timeout = ack_timeout
@max_retries = max_retries
@@ -199,10 +200,16 @@
if buffer_bytesize + message.bytesize >= @max_buffer_bytesize
buffer_overflow topic,
"Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached"
end
+ # If the producer is in transactional mode, all the message production
+ # must be used when the producer is currently in transaction
+ if @transaction_manager.transactional? && !@transaction_manager.in_transaction?
+ raise 'You must trigger begin_transaction before producing messages'
+ end
+
@target_topics.add(topic)
@pending_message_queue.write(message)
@instrumenter.instrument("produce_message.producer", {
value: value,
@@ -265,21 +272,94 @@
# Closes all connections to the brokers.
#
# @return [nil]
def shutdown
+ @transaction_manager.close
@cluster.disconnect
end
+ # Initializes the producer to ready for future transactions. This method
+ # should be triggered once, before any tranactions are created.
+ #
+ # @return [nil]
+ def init_transactions
+ @transaction_manager.init_transactions
+ end
+
+ # Mark the beginning of a transaction. This method transitions the state
+ # of the transaction trantiions to IN_TRANSACTION.
+ #
+ # All producing operations can only be executed while the transation is
+ # in this state. The records are persisted by Kafka brokers, but not visible
+ # the consumers until the #commit_transaction method is trigger. After a
+ # timeout period without committed, the transaction is timeout and
+ # considered as aborted.
+ #
+ # @return [nil]
+ def begin_transaction
+ @transaction_manager.begin_transaction
+ end
+
+ # This method commits the pending transaction, marks all the produced
+ # records committed. After that, they are visible to the consumers.
+ #
+ # This method can only be called if and only if the current transaction
+ # is at IN_TRANSACTION state.
+ #
+ # @return [nil]
+ def commit_transaction
+ @transaction_manager.commit_transaction
+ end
+
+ # This method abort the pending transaction, marks all the produced
+ # records aborted. All the records will be wiped out by the brokers and the
+ # cosumers don't have a chance to consume those messages, except they enable
+ # consuming uncommitted option.
+ #
+ # This method can only be called if and only if the current transaction
+ # is at IN_TRANSACTION state.
+ #
+ # @return [nil]
+ def abort_transaction
+ @transaction_manager.abort_transaction
+ end
+
+ # Syntactic sugar to enable easier transaction usage. Do the following steps
+ #
+ # - Start the transaction (with Producer#begin_transaction)
+ # - Yield the given block
+ # - Commit the transaction (with Producer#commit_transaction)
+ #
+ # If the block raises exception, the transaction is automatically aborted
+ # *before* bubble up the exception.
+ #
+ # If the block raises Kafka::Producer::AbortTransaction indicator exception,
+ # it aborts the transaction silently, without throwing up that exception.
+ #
+ # @return [nil]
+ def transaction
+ raise 'This method requires a block' unless block_given?
+ begin_transaction
+ yield
+ commit_transaction
+ rescue Kafka::Producer::AbortTransaction
+ abort_transaction
+ rescue
+ abort_transaction
+ raise
+ end
+
private
def deliver_messages_with_retries(notification)
attempt = 0
@cluster.add_target_topics(@target_topics)
operation = ProduceOperation.new(
cluster: @cluster,
+ transaction_manager: @transaction_manager,
buffer: @buffer,
required_acks: @required_acks,
ack_timeout: @ack_timeout,
compressor: @compressor,
logger: @logger,