lib/deimos/utils/db_producer.rb in deimos-ruby-1.0.0.pre.beta25 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.0.0.pre.beta26
- old
+ new
@@ -62,26 +62,21 @@
@current_topic = topic
messages = retrieve_messages
while messages.any?
@logger.debug do
- producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
- decoded_messages = if producer
- consumer = Class.new(Deimos::Consumer)
- consumer.config.merge!(producer.config)
- messages.map do |message|
- {
- key: message[:key].present? ? consumer.new.decode_key(message[:key]) : nil,
- message: consumer.decoder.decode(message[:payload])
- }
- end
- else
- messages
- end
- "DB producer: Topic #{topic} Producing messages: #{decoded_messages}"
+ decoder = messages.first.decoder
+ "DB producer: Topic #{topic} Producing messages: #{messages.map { |m| m.decoded_message(decoder) }}"
end
- produce_messages(messages.map(&:phobos_message))
+ Deimos.instrument('db_producer.produce', topic: topic, messages: messages) do
+ begin
+ produce_messages(messages.map(&:phobos_message))
+ rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge
+ messages.each(&:delete)
+ raise
+ end
+ end
messages.first.class.where(id: messages.map(&:id)).delete_all
break if messages.size < BATCH_SIZE
KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive
send_pending_metrics
@@ -103,10 +98,19 @@
first_message = KafkaMessage.first
time_diff = first_message ? Time.zone.now - KafkaMessage.first.created_at : 0
Deimos.config.metrics&.gauge('pending_db_messages_max_wait', time_diff)
end
+ # Shut down the sync producer if we have to. Phobos will automatically
+ # create a new one. We should call this if the producer can be in a bad
+ # state and e.g. we need to clear the buffer.
+ def shutdown_producer
+ if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
+ self.class.producer.sync_producer_shutdown
+ end
+ end
+
# @param batch [Array<Hash>]
def produce_messages(batch)
batch_size = batch.size
begin
batch.in_groups_of(batch_size, false).each do |group|
@@ -117,21 +121,23 @@
tags: %W(status:success topic:#{@current_topic}),
by: group.size
)
@logger.info("Sent #{group.size} messages to #{@current_topic}")
end
- rescue Kafka::BufferOverflow
- raise if batch_size == 1
+ rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge,
+ Kafka::RecordListTooLarge => e
+ if batch_size == 1
+ shutdown_producer
+ raise
+ end
- @logger.error("Buffer overflow when publishing #{batch.size} in groups of #{batch_size}, retrying...")
+ @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...")
if batch_size < 10
batch_size = 1
else
batch_size /= 10
end
- if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3
- self.class.producer.sync_producer_shutdown
- end
+ shutdown_producer
retry
end
end
end
end