lib/messaging/adapters/postgres/consumer.rb in messaging-4.0.10 vs lib/messaging/adapters/postgres/consumer.rb in messaging-4.0.11

- old
+ new

@@ -21,11 +21,10 @@ shutdown end def stop @running = false - save Messaging.logger.info "[#{name}] Consumer stopping" end def shutdown Messaging.logger.info "[#{name}] Consumer stopped" @@ -41,40 +40,61 @@ def refresh_latest_processed_transaction_id reload return if last_processed_transaction_id.present? self.last_processed_transaction_id = self.class.latest_known_transaction_id + save_position + rescue ActiveRecord::RecordNotFound => e + recreate_consumer_record + retry + end + + def recreate_consumer_record + self.id = nil + @new_record = true save end def process_messages while @running do Meter.histogram('messaging.consumer.lag', unprocessed_messages_count, tags: { consumer: name }) messages = fetch_messages messages.each do |message| - process_message(message) + break unless process_message(message) end - save if messages.any? + save_position pause_polling_for_5_s unless messages.any? end + ensure + save_position end + def save_position + save if last_processed_position_changed? || last_processed_transaction_id_changed? + end + def process_message(message) Middleware.run(Config.consumer.middlewares, message) { handle message } self.last_processed_position = message.global_position self.last_processed_transaction_id = message.transaction_id + true rescue StandardError => e ExceptionHandler.call(e, { consumer: name, transaction_id: message.transaction_id, global_position: message.global_position, message: message.inspect } ) + Messaging.logger.error "[#{name}] Error processing message: #{e.message}" + pause_to_avoid_creating_exception_storm + false + end + + def pause_to_avoid_creating_exception_storm sleep 10 - retry if @running end def pause_polling_for_5_s Messaging.logger.debug "[#{name}] No new messages. Sleeping" sleep 5