lib/kafka/producer.rb in ruby-kafka-0.3.8 vs lib/kafka/producer.rb in ruby-kafka-0.3.9
- old
+ new
@@ -132,11 +132,11 @@
def initialize(cluster:, logger:, instrumenter:, compressor:, ack_timeout:, required_acks:, max_retries:, retry_backoff:, max_buffer_size:, max_buffer_bytesize:)
@cluster = cluster
@logger = logger
@instrumenter = instrumenter
- @required_acks = required_acks
+ @required_acks = required_acks == :all ? -1 : required_acks
@ack_timeout = ack_timeout
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@max_buffer_bytesize = max_buffer_bytesize
@@ -183,15 +183,16 @@
# @return [nil]
def produce(value, key: nil, topic:, partition: nil, partition_key: nil)
create_time = Time.now
message = PendingMessage.new(
- value: value,
- key: key,
- topic: topic,
- partition: partition,
- partition_key: partition_key,
- create_time: create_time,
+ value,
+ key,
+ topic,
+ partition,
+ partition_key,
+ create_time,
+ key.to_s.bytesize + value.to_s.bytesize
)
if buffer_size >= @max_buffer_size
buffer_overflow topic, "Max buffer size (#{@max_buffer_size} messages) exceeded"
end