lib/messaging/adapters/postgres/consumer.rb in messaging-4.0.11 vs lib/messaging/adapters/postgres/consumer.rb in messaging-4.0.12
- old
+ new
@@ -49,10 +49,11 @@
end
def recreate_consumer_record
self.id = nil
@new_record = true
+ @attributes = @attributes.except('id') if ::Rails::VERSION::MAJOR == 2
save
end
def process_messages
while @running do
@@ -67,11 +68,13 @@
ensure
save_position
end
def save_position
- save if last_processed_position_changed? || last_processed_transaction_id_changed?
+ return unless last_processed_position_changed? || last_processed_transaction_id_changed?
+ recreate_consumer_record unless self.class.exists? id
+ save
end
def process_message(message)
Middleware.run(Config.consumer.middlewares, message) { handle message }
self.last_processed_position = message.global_position
@@ -84,10 +87,9 @@
transaction_id: message.transaction_id,
global_position: message.global_position,
message: message.inspect
}
)
- Messaging.logger.error "[#{name}] Error processing message: #{e.message}"
pause_to_avoid_creating_exception_storm
false
end