# Copyright 2016 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "google/cloud/errors" require "google/cloud/pubsub/credentials" require "google/cloud/pubsub/convert" require "google/cloud/pubsub/version" require "google/cloud/pubsub/v1" require "google/gax/errors" module Google module Cloud module PubSub ## # @private Represents the GAX Pub/Sub service, including all the API # methods. class Service attr_accessor :project, :credentials, :host, :timeout, :client_config ## # Creates a new Service instance. def initialize project, credentials, host: nil, timeout: nil, client_config: nil @project = project @credentials = credentials @host = host || V1::PublisherClient::SERVICE_ADDRESS @timeout = timeout @client_config = client_config || {} end def channel require "grpc" GRPC::Core::Channel.new host, chan_args, chan_creds end def chan_args { "grpc.max_send_message_length" => -1, "grpc.max_receive_message_length" => -1, "grpc.keepalive_time_ms" => 300000 } end def chan_creds return credentials if insecure? require "grpc" GRPC::Core::ChannelCredentials.new.compose \ GRPC::Core::CallCredentials.new credentials.client.updater_proc end def subscriber return mocked_subscriber if mocked_subscriber @subscriber ||= begin V1::SubscriberClient.new( credentials: channel, timeout: timeout, client_config: client_config, lib_name: "gccl", lib_version: Google::Cloud::PubSub::VERSION ) end end attr_accessor :mocked_subscriber def publisher return mocked_publisher if mocked_publisher @publisher ||= begin V1::PublisherClient.new( credentials: channel, timeout: timeout, client_config: client_config, lib_name: "gccl", lib_version: Google::Cloud::PubSub::VERSION ) end end attr_accessor :mocked_publisher def insecure? credentials == :this_channel_is_insecure end ## # Gets the configuration of a topic. # Since the topic only has the name attribute, # this method is only useful to check the existence of a topic. # If other attributes are added in the future, # they will be returned here. def get_topic topic_name, options = {} execute do publisher.get_topic topic_path(topic_name, options), options: default_options end end ## # Lists matching topics. def list_topics options = {} call_options = default_options if (token = options[:token]) call_options = Google::Gax::CallOptions.new( kwargs: default_headers, page_token: token ) end execute do paged_enum = publisher.list_topics project_path(options), page_size: options[:max], options: call_options paged_enum.page.response end end ## # Creates the given topic with the given name. def create_topic topic_name, labels: nil, kms_key_name: nil, options: {} execute do publisher.create_topic topic_path(topic_name, options), labels: labels, kms_key_name: kms_key_name, options: default_options end end def update_topic topic_obj, *fields mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s) execute do publisher.update_topic \ topic_obj, mask, options: default_options end end ## # Deletes the topic with the given name. All subscriptions to this topic # are also deleted. Raises GRPC status code 5 if the topic does not # exist. After a topic is deleted, a new topic may be created with the # same name. def delete_topic topic_name execute do publisher.delete_topic topic_path(topic_name), options: default_options end end ## # Adds one or more messages to the topic. # Raises GRPC status code 5 if the topic does not exist. # The messages parameter is an array of arrays. # The first element is the data, second is attributes hash. def publish topic, messages execute do publisher.publish topic_path(topic), messages, options: default_options end end ## # Gets the details of a subscription. def get_subscription subscription_name, options = {} subscription = subscription_path subscription_name, options execute do subscriber.get_subscription subscription, options: default_options end end ## # Lists matching subscriptions by project and topic. def list_topics_subscriptions topic, options = {} call_options = default_options if (token = options[:token]) call_options = Google::Gax::CallOptions.new( kwargs: default_headers, page_token: token ) end execute do paged_enum = publisher.list_topic_subscriptions \ topic_path(topic, options), page_size: options[:max], options: call_options paged_enum.page.response end end ## # Lists matching subscriptions by project. def list_subscriptions options = {} call_options = default_options if (token = options[:token]) call_options = Google::Gax::CallOptions.new( kwargs: default_headers, page_token: token ) end execute do paged_enum = subscriber.list_subscriptions project_path(options), page_size: options[:max], options: call_options paged_enum.page.response end end ## # Creates a subscription on a given topic for a given subscriber. def create_subscription topic, subscription_name, options = {} name = subscription_path subscription_name, options topic = topic_path topic push_config = if options[:endpoint] Google::Cloud::PubSub::V1::PushConfig.new \ push_endpoint: options[:endpoint], attributes: (options[:attributes] || {}).to_h end deadline = options[:deadline] retain_acked = options[:retain_acked] mrd = Convert.number_to_duration options[:retention] labels = options[:labels] execute do subscriber.create_subscription \ name, topic, push_config: push_config, ack_deadline_seconds: deadline, retain_acked_messages: retain_acked, message_retention_duration: mrd, labels: labels, options: default_options end end def update_subscription subscription_obj, *fields mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s) execute do subscriber.update_subscription \ subscription_obj, mask, options: default_options end end ## # Deletes an existing subscription. # All pending messages in the subscription are immediately dropped. def delete_subscription subscription execute do subscriber.delete_subscription subscription_path(subscription), options: default_options end end ## # Pulls a single message from the server. def pull subscription, options = {} subscription = subscription_path subscription, options max_messages = options.fetch(:max, 100).to_i return_immediately = !(!options.fetch(:immediate, true)) execute do subscriber.pull subscription, max_messages, return_immediately: return_immediately, options: default_options end end def streaming_pull request_enum execute do subscriber.streaming_pull request_enum, options: default_options end end ## # Acknowledges receipt of a message. def acknowledge subscription, *ack_ids execute do subscriber.acknowledge subscription_path(subscription), ack_ids, options: default_options end end ## # Modifies the PushConfig for a specified subscription. def modify_push_config subscription, endpoint, attributes subscription = subscription_path subscription # Convert attributes to strings to match the protobuf definition attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }] push_config = Google::Cloud::PubSub::V1::PushConfig.new( push_endpoint: endpoint, attributes: attributes ) execute do subscriber.modify_push_config subscription, push_config, options: default_options end end ## # Modifies the ack deadline for a specific message. def modify_ack_deadline subscription, ids, deadline execute do subscriber.modify_ack_deadline subscription_path(subscription), Array(ids), deadline, options: default_options end end ## # Lists snapshots by project. def list_snapshots options = {} call_options = default_options if (token = options[:token]) call_options = Google::Gax::CallOptions.new( kwargs: default_headers, page_token: token ) end execute do paged_enum = subscriber.list_snapshots project_path(options), page_size: options[:max], options: call_options paged_enum.page.response end end ## # Creates a snapshot on a given subscription. def create_snapshot subscription, snapshot_name, labels: nil name = snapshot_path snapshot_name execute do subscriber.create_snapshot name, subscription_path(subscription), labels: labels, options: default_options end end def update_snapshot snapshot_obj, *fields mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s) execute do subscriber.update_snapshot \ snapshot_obj, mask, options: default_options end end ## # Deletes an existing snapshot. # All pending messages in the snapshot are immediately dropped. def delete_snapshot snapshot execute do subscriber.delete_snapshot snapshot_path(snapshot), options: default_options end end ## # Adjusts the given subscription to a time or snapshot. def seek subscription, time_or_snapshot subscription = subscription_path subscription execute do if a_time? time_or_snapshot time = Convert.time_to_timestamp time_or_snapshot subscriber.seek subscription, time: time, options: default_options else if time_or_snapshot.is_a? Snapshot time_or_snapshot = time_or_snapshot.name end subscriber.seek subscription, snapshot: snapshot_path(time_or_snapshot), options: default_options end end end def get_topic_policy topic_name, options = {} execute do publisher.get_iam_policy topic_path(topic_name, options), options: default_options end end def set_topic_policy topic_name, new_policy, options = {} resource = topic_path topic_name, options execute do publisher.set_iam_policy resource, new_policy, options: default_options end end def test_topic_permissions topic_name, permissions, options = {} resource = topic_path topic_name, options execute do publisher.test_iam_permissions resource, permissions, options: default_options end end def get_subscription_policy subscription_name, options = {} resource = subscription_path subscription_name, options execute do subscriber.get_iam_policy resource, options: default_options end end def set_subscription_policy subscription_name, new_policy, options = {} resource = subscription_path subscription_name, options execute do subscriber.set_iam_policy resource, new_policy, options: default_options end end def test_subscription_permissions subscription_name, permissions, options = {} resource = subscription_path subscription_name, options execute do subscriber.test_iam_permissions resource, permissions, options: default_options end end def project_path options = {} project_name = options[:project] || project "projects/#{project_name}" end def topic_path topic_name, options = {} return topic_name if topic_name.to_s.include? "/" "#{project_path options}/topics/#{topic_name}" end def subscription_path subscription_name, options = {} return subscription_name if subscription_name.to_s.include? "/" "#{project_path options}/subscriptions/#{subscription_name}" end def snapshot_path snapshot_name, options = {} if snapshot_name.nil? || snapshot_name.to_s.include?("/") return snapshot_name end "#{project_path options}/snapshots/#{snapshot_name}" end def inspect "#{self.class}(#{@project})" end protected def a_time? obj return false unless obj.respond_to? :to_time # Rails' String#to_time returns nil if the string doesn't parse. return false if obj.to_time.nil? true end def default_headers { "google-cloud-resource-prefix" => "projects/#{@project}" } end def default_options Google::Gax::CallOptions.new kwargs: default_headers end def execute yield rescue Google::Gax::GaxError => e # GaxError wraps BadStatus, but exposes it as #cause raise Google::Cloud::Error.from_error(e.cause) end end end Pubsub = PubSub unless const_defined? :Pubsub end end