lib/google_pubsub_enhancer.rb in google-pubsub-enhancer-0.5.0 vs lib/google_pubsub_enhancer.rb in google-pubsub-enhancer-0.5.1
- old
+ new
@@ -17,11 +17,11 @@
def pubsub_config
key = ::Google::Cloud::Pubsub::Credentials::JSON_ENV_VARS.find { |n| !ENV[n].nil? }
@pubsub_config ||= JSON.parse(ENV[key])
rescue => ex
- raise Exception, 'Environment not setted properly'
+ raise Exception, "Environment not setted properly"
end
end
def initialize(logger: Logger.new(STDOUT),&block)
@logger = logger
@@ -43,18 +43,20 @@
return if opts[:shutdown].call
while received_messages = subscription.pull(:max => GooglePubsubEnhancer::Constants::MAX_PULL_SIZE)
break if opts[:shutdown].call || received_messages == nil
next if received_messages.empty?
@logger.debug{"#{received_messages.length} messages received"}
- @stack.call({received_messages: received_messages})
- subscription.acknowledge(received_messages)
+ env = {received_messages: received_messages, nacked_messages: []}
+ @stack.call(env)
+ p env
+ subscription.acknowledge(env[:received_messages] - env[:nacked_messages])
end
end
def create_subscription(subscription_short_name)
Google::Cloud::Pubsub.new.subscription(self.class.name_by('subscriptions', subscription_short_name))
rescue => ex
- raise Exception, 'Environment not setted properly'
+ raise Exception, 'Problem with subscription. Check spelling or permissions!'
end
def configurate_options(opts)
raise unless opts.is_a?(Hash)
opts[:shutdown] ||= proc { }