lib/messaging/adapters/postgres/consumer.rb in messaging-4.0.10 vs lib/messaging/adapters/postgres/consumer.rb in messaging-4.0.11
- old
+ new
@@ -21,11 +21,10 @@
shutdown
end
def stop
@running = false
- save
Messaging.logger.info "[#{name}] Consumer stopping"
end
def shutdown
Messaging.logger.info "[#{name}] Consumer stopped"
@@ -41,40 +40,61 @@
def refresh_latest_processed_transaction_id
reload
return if last_processed_transaction_id.present?
self.last_processed_transaction_id = self.class.latest_known_transaction_id
+ save_position
+ rescue ActiveRecord::RecordNotFound => e
+ recreate_consumer_record
+ retry
+ end
+
+ def recreate_consumer_record
+ self.id = nil
+ @new_record = true
save
end
def process_messages
while @running do
Meter.histogram('messaging.consumer.lag', unprocessed_messages_count, tags: { consumer: name })
messages = fetch_messages
messages.each do |message|
- process_message(message)
+ break unless process_message(message)
end
- save if messages.any?
+ save_position
pause_polling_for_5_s unless messages.any?
end
+ ensure
+ save_position
end
+ def save_position
+ save if last_processed_position_changed? || last_processed_transaction_id_changed?
+ end
+
def process_message(message)
Middleware.run(Config.consumer.middlewares, message) { handle message }
self.last_processed_position = message.global_position
self.last_processed_transaction_id = message.transaction_id
+ true
rescue StandardError => e
ExceptionHandler.call(e,
{
consumer: name,
transaction_id: message.transaction_id,
global_position: message.global_position,
message: message.inspect
}
)
+ Messaging.logger.error "[#{name}] Error processing message: #{e.message}"
+ pause_to_avoid_creating_exception_storm
+ false
+ end
+
+ def pause_to_avoid_creating_exception_storm
sleep 10
- retry if @running
end
def pause_polling_for_5_s
Messaging.logger.debug "[#{name}] No new messages. Sleeping"
sleep 5