Sha256: 4b5eb3bb3c3086e11bb930c0e1401e8a5b69cb8547e5801f5e87460d8dd96b21

Contents?: true

Size: 1.79 KB

Versions: 4

Compression:

Stored size: 1.79 KB

Contents

# frozen_string_literal: true

begin
  require 'google/cloud/pubsub'
rescue LoadError # rubocop:disable Lint/SuppressedException
end

module PubSubModelSync
  class ServiceGoogle < ServiceBase
    LISTEN_SETTINGS = { threads: { callback: 1 }, message_ordering: true }.freeze
    TOPIC_SETTINGS = {}.freeze
    SUBSCRIPTION_SETTINGS = { message_ordering: true }.freeze
    attr_accessor :service, :topic, :subscription, :subscriber

    def initialize
      @service = Google::Cloud::Pubsub.new(project: config.project,
                                           credentials: config.credentials)
      @topic = service.topic(config.topic_name) ||
               service.create_topic(config.topic_name, TOPIC_SETTINGS)
      topic.enable_message_ordering!
    end

    def listen_messages
      @subscription = subscribe_to_topic
      @subscriber = subscription.listen(LISTEN_SETTINGS, &method(:process_message))
      log('Listener starting...')
      subscriber.start
      log('Listener started')
      sleep
      subscriber.stop.wait!
      log('Listener stopped')
    end

    def publish(payload)
      topic.publish_async(payload.to_json, message_headers) do |res|
        raise 'Failed to publish the message.' unless res.succeeded?
      end
    end

    def stop
      log('Listener stopping...')
      subscriber.stop!
    end

    private

    def message_headers
      { SERVICE_KEY => true, ordering_key: SERVICE_KEY }.merge(PUBLISH_SETTINGS)
    end

    def subscribe_to_topic
      topic.subscription(config.subscription_key) ||
        topic.subscribe(config.subscription_key, SUBSCRIPTION_SETTINGS)
    end

    def process_message(received_message)
      message = received_message.message
      super(message.data) if message.attributes[SERVICE_KEY]
    ensure
      received_message.acknowledge!
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
pub_sub_model_sync-0.5.10 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.5.9.1 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.5.9 lib/pub_sub_model_sync/service_google.rb
pub_sub_model_sync-0.5.8.2 lib/pub_sub_model_sync/service_google.rb