Sha256: dabd0fe3afe1bea58056db0d69b7ee1f75222dc9944fe74f47b255ae65be3769

Contents?: true

Size: 1.66 KB

Versions: 1

Compression:

Stored size: 1.66 KB

Contents

require 'json'
require 'middleware'
require 'google/cloud/pubsub'

class GooglePubsubEnhancer

  require 'google_pubsub_enhancer/constants'
  require 'google_pubsub_enhancer/middleware'
  require 'google_pubsub_enhancer/spec'

  class << self
    def name_by(type, name)
      raise unless %w(topics subscriptions).include?(type)
      "projects/#{pubsub_config['project_id']}/#{type}/#{name}"
    end

    def pubsub_config
      key = ::Google::Cloud::Pubsub::Credentials::JSON_ENV_VARS.find { |n| !ENV[n].nil? }
      @pubsub_config ||= JSON.parse(ENV[key])
    rescue => ex
      raise Exception, 'Environment not setted properly'
    end
  end

  def initialize(&block)
    @stack = ::Middleware::Builder.new(&block).__send__(:to_app)
  end

  def run(subscription_short_name, opts={})
    configurated_options = configurate_options(opts)
    subscription = create_subscription(subscription_short_name)
    work(subscription, configurated_options)
  rescue
    retry
  end

  private

  def work(subscription, opts)
    while received_messages = subscription.pull(:max => GooglePubsubEnhancer::Constants::MAX_PULL_SIZE)
      break if opts[:shutdown].call || received_messages == nil
      next if received_messages.empty?
      @stack.call({received_messages: received_messages})
      subscription.acknowledge(received_messages)
    end
  end

  def create_subscription(subscription_short_name)
    Google::Cloud::Pubsub.new.subscription(self.class.name_by('subscriptions', subscription_short_name))
  rescue => ex
    raise Exception, 'Environment not setted properly'
  end

  def configurate_options(opts)
    raise unless opts.is_a?(Hash)
    opts[:shutdown] ||= proc { }
    opts
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
google-pubsub-enhancer-0.3.0 lib/google_pubsub_enhancer.rb