lib/google_pubsub_enhancer.rb in google-pubsub-enhancer-0.5.0 vs lib/google_pubsub_enhancer.rb in google-pubsub-enhancer-0.5.1

- old
+ new

@@ -17,11 +17,11 @@ def pubsub_config key = ::Google::Cloud::Pubsub::Credentials::JSON_ENV_VARS.find { |n| !ENV[n].nil? } @pubsub_config ||= JSON.parse(ENV[key]) rescue => ex - raise Exception, 'Environment not setted properly' + raise Exception, "Environment not setted properly" end end def initialize(logger: Logger.new(STDOUT),&block) @logger = logger @@ -43,18 +43,20 @@ return if opts[:shutdown].call while received_messages = subscription.pull(:max => GooglePubsubEnhancer::Constants::MAX_PULL_SIZE) break if opts[:shutdown].call || received_messages == nil next if received_messages.empty? @logger.debug{"#{received_messages.length} messages received"} - @stack.call({received_messages: received_messages}) - subscription.acknowledge(received_messages) + env = {received_messages: received_messages, nacked_messages: []} + @stack.call(env) + p env + subscription.acknowledge(env[:received_messages] - env[:nacked_messages]) end end def create_subscription(subscription_short_name) Google::Cloud::Pubsub.new.subscription(self.class.name_by('subscriptions', subscription_short_name)) rescue => ex - raise Exception, 'Environment not setted properly' + raise Exception, 'Problem with subscription. Check spelling or permissions!' end def configurate_options(opts) raise unless opts.is_a?(Hash) opts[:shutdown] ||= proc { }