Sha256: ed9b35554229d7d9d64009cdbca988ac98c7d4542bf2bd90748ced86592e0b6a

Contents?: true

Size: 1.71 KB

Versions: 8

Compression:

Stored size: 1.71 KB

Contents

# frozen_string_literal: true

begin
  require 'google/cloud/pubsub'
rescue LoadError # rubocop:disable Lint/SuppressedException
end

module PubSubModelSync
  class ServiceGoogle < ServiceBase
    attr_accessor :service, :topic, :subscription, :config, :subscriber

    def initialize
      @config = PubSubModelSync::Config
      @service = Google::Cloud::Pubsub.new(project: config.project,
                                           credentials: config.credentials)
      @topic = service.topic(config.topic_name) ||
               service.create_topic(config.topic_name)
    end

    def listen_messages
      @subscription = subscribe_to_topic
      @subscriber = subscription.listen(&method(:process_message))
      log('Listener starting...')
      subscriber.start
      log('Listener started')
      sleep
      subscriber.stop.wait!
      log('Listener stopped')
    end

    def publish(data, attributes)
      log("Publishing message: #{[data, attributes]}")

      payload = { data: data, attributes: attributes }.to_json
      topic.publish(payload, { SERVICE_KEY => true })
    end

    def stop
      log('Listener stopping...')
      subscriber.stop!
    end

    private

    def subscribe_to_topic
      topic.subscription(config.subscription_name) ||
        topic.subscribe(config.subscription_name)
    end

    def process_message(received_message)
      message = received_message.message
      return unless message.attributes[SERVICE_KEY]

      perform_message(message.data)
    rescue => e
      log("Error processing message: #{[received_message, e.message]}", :error)
    ensure
      received_message.acknowledge!
    end

    def log(msg, kind = :info)
      config.log("Google Service ==> #{msg}", kind)
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
pub_sub_model_sync-0.3.1 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.3.0 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.2.4 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.2.3 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.2.2 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.2.1 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.2.0 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.1.5 lib/pub_sub_model_sync/service_google.rb