lib/kafka/producer.rb in ruby-kafka-0.7.0 vs lib/kafka/producer.rb in ruby-kafka-0.7.1.beta1

- old
+ new

@@ -7,11 +7,10 @@ require "kafka/pending_message_queue" require "kafka/pending_message" require "kafka/compressor" module Kafka - # Allows sending messages to a Kafka cluster. # # Typically you won't instantiate this class yourself, but rather have {Kafka::Client} # do it for you, e.g. # @@ -124,13 +123,15 @@ # # producer.shutdown # end # class Producer + class AbortTransaction < StandardError; end - def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) + def initialize(cluster:, transaction_manager:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:) @cluster = cluster + @transaction_manager = transaction_manager @logger = logger @instrumenter = instrumenter @required_acks = required_acks == :all ? -1 : required_acks @ack_timeout = ack_timeout @max_retries = max_retries @@ -199,10 +200,16 @@ if buffer_bytesize + message.bytesize >= @max_buffer_bytesize buffer_overflow topic, "Cannot produce to #{topic}, max buffer bytesize (#{@max_buffer_bytesize} bytes) reached" end + # If the producer is in transactional mode, all the message production + # must be used when the producer is currently in transaction + if @transaction_manager.transactional? && !@transaction_manager.in_transaction? + raise 'You must trigger begin_transaction before producing messages' + end + @target_topics.add(topic) @pending_message_queue.write(message) @instrumenter.instrument("produce_message.producer", { value: value, @@ -265,21 +272,94 @@ # Closes all connections to the brokers. # # @return [nil] def shutdown + @transaction_manager.close @cluster.disconnect end + # Initializes the producer to ready for future transactions. This method + # should be triggered once, before any tranactions are created. + # + # @return [nil] + def init_transactions + @transaction_manager.init_transactions + end + + # Mark the beginning of a transaction. This method transitions the state + # of the transaction trantiions to IN_TRANSACTION. + # + # All producing operations can only be executed while the transation is + # in this state. The records are persisted by Kafka brokers, but not visible + # the consumers until the #commit_transaction method is trigger. After a + # timeout period without committed, the transaction is timeout and + # considered as aborted. + # + # @return [nil] + def begin_transaction + @transaction_manager.begin_transaction + end + + # This method commits the pending transaction, marks all the produced + # records committed. After that, they are visible to the consumers. + # + # This method can only be called if and only if the current transaction + # is at IN_TRANSACTION state. + # + # @return [nil] + def commit_transaction + @transaction_manager.commit_transaction + end + + # This method abort the pending transaction, marks all the produced + # records aborted. All the records will be wiped out by the brokers and the + # cosumers don't have a chance to consume those messages, except they enable + # consuming uncommitted option. + # + # This method can only be called if and only if the current transaction + # is at IN_TRANSACTION state. + # + # @return [nil] + def abort_transaction + @transaction_manager.abort_transaction + end + + # Syntactic sugar to enable easier transaction usage. Do the following steps + # + # - Start the transaction (with Producer#begin_transaction) + # - Yield the given block + # - Commit the transaction (with Producer#commit_transaction) + # + # If the block raises exception, the transaction is automatically aborted + # *before* bubble up the exception. + # + # If the block raises Kafka::Producer::AbortTransaction indicator exception, + # it aborts the transaction silently, without throwing up that exception. + # + # @return [nil] + def transaction + raise 'This method requires a block' unless block_given? + begin_transaction + yield + commit_transaction + rescue Kafka::Producer::AbortTransaction + abort_transaction + rescue + abort_transaction + raise + end + private def deliver_messages_with_retries(notification) attempt = 0 @cluster.add_target_topics(@target_topics) operation = ProduceOperation.new( cluster: @cluster, + transaction_manager: @transaction_manager, buffer: @buffer, required_acks: @required_acks, ack_timeout: @ack_timeout, compressor: @compressor, logger: @logger,