lib/google_pubsub_enhancer.rb in google-pubsub-enhancer-0.5.4 vs lib/google_pubsub_enhancer.rb in google-pubsub-enhancer-0.5.5
- old
+ new
@@ -45,10 +45,22 @@
break if opts[:shutdown].call || received_messages == nil
next if received_messages.empty?
@logger.debug{"#{received_messages.length} messages received"}
env = {received_messages: received_messages, nacked_messages: []}
@stack.call(env)
- subscription.acknowledge(env[:received_messages] - env[:nacked_messages])
+ acknowledge(subscription,env)
+ end
+ end
+
+ def acknowledge(subscription,env)
+ acked_messages = env[:received_messages] - env[:nacked_messages]
+ begin
+ subscription.acknowledge(acked_messages)
+ rescue
+ acked_messages.each do |msg|
+ @logger.error "Retried acked message was: #{msg}"
+ end
+ retry
end
end
def create_subscription(subscription_short_name)
Google::Cloud::Pubsub.new.subscription(self.class.name_by('subscriptions', subscription_short_name))