lib/deimos/utils/db_producer.rb in deimos-ruby-1.0.0.pre.beta25 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.0.0.pre.beta26

- old
+ new

@@ -62,26 +62,21 @@ @current_topic = topic messages = retrieve_messages while messages.any? @logger.debug do - producer = Deimos::Producer.descendants.find { |c| c.topic == topic } - decoded_messages = if producer - consumer = Class.new(Deimos::Consumer) - consumer.config.merge!(producer.config) - messages.map do |message| - { - key: message[:key].present? ? consumer.new.decode_key(message[:key]) : nil, - message: consumer.decoder.decode(message[:payload]) - } - end - else - messages - end - "DB producer: Topic #{topic} Producing messages: #{decoded_messages}" + decoder = messages.first.decoder + "DB producer: Topic #{topic} Producing messages: #{messages.map { |m| m.decoded_message(decoder) }}" end - produce_messages(messages.map(&:phobos_message)) + Deimos.instrument('db_producer.produce', topic: topic, messages: messages) do + begin + produce_messages(messages.map(&:phobos_message)) + rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, Kafka::RecordListTooLarge + messages.each(&:delete) + raise + end + end messages.first.class.where(id: messages.map(&:id)).delete_all break if messages.size < BATCH_SIZE KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive send_pending_metrics @@ -103,10 +98,19 @@ first_message = KafkaMessage.first time_diff = first_message ? Time.zone.now - KafkaMessage.first.created_at : 0 Deimos.config.metrics&.gauge('pending_db_messages_max_wait', time_diff) end + # Shut down the sync producer if we have to. Phobos will automatically + # create a new one. We should call this if the producer can be in a bad + # state and e.g. we need to clear the buffer. + def shutdown_producer + if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3 + self.class.producer.sync_producer_shutdown + end + end + # @param batch [Array<Hash>] def produce_messages(batch) batch_size = batch.size begin batch.in_groups_of(batch_size, false).each do |group| @@ -117,21 +121,23 @@ tags: %W(status:success topic:#{@current_topic}), by: group.size ) @logger.info("Sent #{group.size} messages to #{@current_topic}") end - rescue Kafka::BufferOverflow - raise if batch_size == 1 + rescue Kafka::BufferOverflow, Kafka::MessageSizeTooLarge, + Kafka::RecordListTooLarge => e + if batch_size == 1 + shutdown_producer + raise + end - @logger.error("Buffer overflow when publishing #{batch.size} in groups of #{batch_size}, retrying...") + @logger.error("Got error #{e.class.name} when publishing #{batch.size} in groups of #{batch_size}, retrying...") if batch_size < 10 batch_size = 1 else batch_size /= 10 end - if self.class.producer.respond_to?(:sync_producer_shutdown) # Phobos 1.8.3 - self.class.producer.sync_producer_shutdown - end + shutdown_producer retry end end end end