lib/kafka/producer.rb in ruby-kafka-0.1.7 vs lib/kafka/producer.rb in ruby-kafka-0.2.0
- old
+ new
@@ -2,11 +2,11 @@
require "kafka/partitioner"
require "kafka/message_buffer"
require "kafka/produce_operation"
require "kafka/pending_message_queue"
require "kafka/pending_message"
-require "kafka/compression"
+require "kafka/compressor"
module Kafka
# Allows sending messages to a Kafka cluster.
#
@@ -171,13 +171,16 @@
@ack_timeout = ack_timeout
@max_retries = max_retries
@retry_backoff = retry_backoff
@max_buffer_size = max_buffer_size
@max_buffer_bytesize = max_buffer_bytesize
- @compression_codec = Compression.find_codec(compression_codec)
- @compression_threshold = compression_threshold
+ @compressor = Compressor.new(
+ codec_name: @compression_codec,
+ threshold: @compression_threshold,
+ )
+
# The set of topics that are produced to.
@target_topics = Set.new
# A buffer organized by topic/partition.
@buffer = MessageBuffer.new
@@ -301,11 +304,10 @@
operation = ProduceOperation.new(
cluster: @cluster,
buffer: @buffer,
required_acks: @required_acks,
ack_timeout: @ack_timeout,
- compression_codec: @compression_codec,
- compression_threshold: @compression_threshold,
+ compressor: @compressor,
logger: @logger,
)
loop do
attempt += 1