Sha256: 7937fb4941b039604a6fa172d680c26bb1541f01f3ab1c9040d8dd6353c3d906
Contents?: true
Size: 1.7 KB
Versions: 1
Compression:
Stored size: 1.7 KB
Contents
# frozen_string_literal: true require 'google/cloud/pubsub' module PubSubModelSync class ServiceGoogle attr_accessor :service, :topic, :subscription, :config, :subscriber def initialize @config = PubSubModelSync::Config @service = Google::Cloud::Pubsub.new(project: config.project, credentials: config.credentials) @topic = service.topic(config.topic_name) || service.create_topic(config.topic_name) end def listen_messages @subscription = subscribe_to_topic @subscriber = subscription.listen(&method(:process_message)) log('Listener starting...') subscriber.start log('Listener started') sleep subscriber.stop.wait! log('Listener stopped') end def publish(data, attributes) log("Publishing message: #{[data, attributes]}") topic.publish(data.to_json, attributes) end def stop log('Listener stopping...') subscriber.stop! end private def subscribe_to_topic topic.subscription(config.subscription_name) || topic.subscribe(config.subscription_name) end def process_message(received_message) message = received_message.message attrs = message.attributes.symbolize_keys return unless attrs[:service_model_sync] data = JSON.parse(message.data).symbolize_keys args = [data, attrs[:klass], attrs[:action], attrs] PubSubModelSync::MessageProcessor.new(*args).process rescue => e log("Error processing message: #{[received_message, e.message]}") ensure received_message.acknowledge! end def log(msg) config.log("Google Service ==> #{msg}") end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
pub_sub_model_sync-0.1.1 | lib/pub_sub_model_sync/service_google.rb |