# Copyright 2015 Google Inc. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "google/cloud/errors" require "google/cloud/core/environment" require "google/cloud/pubsub/service" require "google/cloud/pubsub/credentials" require "google/cloud/pubsub/topic" module Google module Cloud module Pubsub ## # # Project # # Represents the project that pubsub messages are pushed to and pulled # from. {Topic} is a named resource to which messages are sent by # publishers. {Subscription} is a named resource representing the stream # of messages from a single, specific topic, to be delivered to the # subscribing application. {Message} is a combination of data and # attributes that a publisher sends to a topic and is eventually delivered # to subscribers. # # See {Google::Cloud#pubsub} # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # topic = pubsub.topic "my-topic" # topic.publish "task completed" # class Project ## # @private The Service object. attr_accessor :service ## # @private Creates a new Pub/Sub Project instance. def initialize service @service = service end # The Pub/Sub project connected to. # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new( # project: "my-todo-project", # keyfile: "/path/to/keyfile.json" # ) # # pubsub.project #=> "my-todo-project" # def project service.project end ## # @private Default project. def self.default_project ENV["PUBSUB_PROJECT"] || ENV["GOOGLE_CLOUD_PROJECT"] || ENV["GCLOUD_PROJECT"] || Google::Cloud::Core::Environment.project_id end ## # Retrieves topic by name. # # The topic will be created if the topic does not exist and the # `autocreate` option is set to true. # # @param [String] topic_name Name of a topic. # @param [Boolean] autocreate Flag to control whether the requested # topic will be created if it does not exist. Ignored if `skip_lookup` # is `true`. The default value is `false`. # @param [String] project If the topic belongs to a project other than # the one currently connected to, the alternate project ID can be # specified here. # @param [Boolean] skip_lookup Optionally create a {Topic} object # without verifying the topic resource exists on the Pub/Sub service. # Calls made on this object will raise errors if the topic resource # does not exist. Default is `false`. # # @return [Google::Cloud::Pubsub::Topic, nil] Returns `nil` if topic # does not exist. Will return a newly created{ # Google::Cloud::Pubsub::Topic} if the topic does not exist and # `autocreate` is set to `true`. # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "existing-topic" # # @example By default `nil` will be returned if topic does not exist. # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "non-existing-topic" #=> nil # # @example With the `autocreate` option set to `true`. # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "non-existing-topic", autocreate: true # # @example Create topic in a different project with the `project` flag. # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "another-topic", project: "another-project" # # @example Skip the lookup against the service with `skip_lookup`: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "another-topic", skip_lookup: true # def topic topic_name, autocreate: nil, project: nil, skip_lookup: nil ensure_service! options = { project: project } return Topic.new_lazy(topic_name, service, options) if skip_lookup grpc = service.get_topic topic_name Topic.from_grpc grpc, service rescue Google::Cloud::NotFoundError return create_topic(topic_name) if autocreate nil end alias_method :get_topic, :topic alias_method :find_topic, :topic ## # Creates a new topic. # # @param [String] topic_name Name of a topic. # # @return [Google::Cloud::Pubsub::Topic] # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.create_topic "my-topic" # def create_topic topic_name ensure_service! grpc = service.create_topic topic_name Topic.from_grpc grpc, service end alias_method :new_topic, :create_topic ## # Retrieves a list of topics for the given project. # # @param [String] token The `token` value returned by the last call to # `topics`; indicates that this is a continuation of a call, and that # the system should return the next page of data. # @param [Integer] max Maximum number of topics to return. # # @return [Array<Google::Cloud::Pubsub::Topic>] (See # {Google::Cloud::Pubsub::Topic::List}) # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # topics = pubsub.topics # topics.each do |topic| # puts topic.name # end # # @example Retrieve all topics: (See {Topic::List#all}) # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # topics = pubsub.topics # topics.all do |topic| # puts topic.name # end # def topics token: nil, max: nil ensure_service! options = { token: token, max: max } grpc = service.list_topics options Topic::List.from_grpc grpc, service, max end alias_method :find_topics, :topics alias_method :list_topics, :topics ## # Publishes one or more messages to the given topic. The topic will be # created if the topic does previously not exist and the `autocreate` # option is provided. # # A note about auto-creating the topic: Any message published to a topic # without a subscription will be lost. # # @param [String] topic_name Name of a topic. # @param [String, File] data The message data. # @param [Hash] attributes Optional attributes for the message. # @option attributes [Boolean] :autocreate Flag to control whether the # provided topic will be created if it does not exist. # @yield [publisher] a block for publishing multiple messages in one # request # @yieldparam [Topic::Publisher] publisher the topic publisher object # # @return [Message, Array<Message>] Returns the published message when # called without a block, or an array of messages when called with a # block. # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # msg = pubsub.publish "my-topic", "new-message" # # @example A message can be published using a File object: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # msg = pubsub.publish "my-topic", File.open("message.txt") # # @example Additionally, a message can be published with attributes: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # msg = pubsub.publish "my-topic", "new-message", foo: :bar, # this: :that # # @example Multiple messages can be sent at the same time using a block: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # msgs = pubsub.publish "my-topic" do |p| # p.publish "new-message-1", foo: :bar # p.publish "new-message-2", foo: :baz # p.publish "new-message-3", foo: :bif # end # # @example With `autocreate`: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # msg = pubsub.publish "new-topic", "new-message", autocreate: true # def publish topic_name, data = nil, attributes = {} # Fix parameters if data.is_a?(::Hash) && attributes.empty? attributes = data data = nil end # extract autocreate option autocreate = attributes.delete :autocreate ensure_service! publisher = Topic::Publisher.new data, attributes yield publisher if block_given? return nil if publisher.messages.count.zero? publish_batch_messages topic_name, publisher, autocreate end ## # Creates a new {Subscription} object for the provided topic. The topic # will be created if the topic does previously not exist and the # `autocreate` option is provided. # # @param [String] topic_name Name of a topic. # @param [String] subscription_name Name of the new subscription. Must # start with a letter, and contain only letters ([A-Za-z]), numbers # ([0-9], dashes (-), underscores (_), periods (.), tildes (~), plus # (+) or percent signs (%). It must be between 3 and 255 characters in # length, and it must not start with "goog". # @param [Integer] deadline The maximum number of seconds after a # subscriber receives a message before the subscriber should # acknowledge the message. # @param [String] endpoint A URL locating the endpoint to which messages # should be pushed. # @param [String] autocreate Flag to control whether the topic will be # created if it does not exist. # # @return [Google::Cloud::Pubsub::Subscription] # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # sub = pubsub.subscribe "my-topic", "my-topic-sub" # puts sub.name # => "my-topic-sub" # # @example The name is optional, and will be generated if not given. # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # sub = pubsub.subscribe "my-topic" # puts sub.name # => "generated-sub-name" # # @example Wait 2 minutes for acknowledgement and push all to endpoint: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # sub = pubsub.subscribe "my-topic", "my-topic-sub", # deadline: 120, # endpoint: "https://example.com/push" # # @example With `autocreate`: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # sub = pubsub.subscribe "new-topic", "new-topic-sub", # autocreate: true # def subscribe topic_name, subscription_name, deadline: nil, endpoint: nil, autocreate: nil ensure_service! options = { deadline: deadline, endpoint: endpoint } grpc = service.create_subscription topic_name, subscription_name, options Subscription.from_grpc grpc, service rescue Google::Cloud::NotFoundError => e if autocreate create_topic topic_name return subscribe(topic_name, subscription_name, deadline: deadline, endpoint: endpoint, autocreate: false) end raise e end alias_method :create_subscription, :subscribe alias_method :new_subscription, :subscribe ## # Retrieves subscription by name. # # @param [String] subscription_name Name of a subscription. # @param [String] project If the subscription belongs to a project other # than the one currently connected to, the alternate project ID can be # specified here. # @param [Boolean] skip_lookup Optionally create a {Subscription} object # without verifying the subscription resource exists on the Pub/Sub # service. Calls made on this object will raise errors if the service # resource does not exist. Default is `false`. # # @return [Google::Cloud::Pubsub::Subscription, nil] Returns `nil` if # the subscription does not exist # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # subscription = pubsub.subscription "my-sub" # puts subscription.name # # @example Skip the lookup against the service with `skip_lookup`: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # # No API call is made to retrieve the subscription information. # subscription = pubsub.subscription "my-sub", skip_lookup: true # puts subscription.name # def subscription subscription_name, project: nil, skip_lookup: nil ensure_service! options = { project: project } if skip_lookup return Subscription.new_lazy subscription_name, service, options end grpc = service.get_subscription subscription_name Subscription.from_grpc grpc, service rescue Google::Cloud::NotFoundError nil end alias_method :get_subscription, :subscription alias_method :find_subscription, :subscription ## # Retrieves a list of subscriptions for the given project. # # @param [String] token A previously-returned page token representing # part of the larger set of results to view. # @param [Integer] max Maximum number of subscriptions to return. # # @return [Array<Google::Cloud::Pubsub::Subscription>] (See # {Google::Cloud::Pubsub::Subscription::List}) # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # subscriptions = pubsub.subscriptions # subscriptions.each do |subscription| # puts subscription.name # end # # @example Retrieve all subscriptions: (See {Subscription::List#all}) # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # subscriptions = pubsub.subscriptions # subscriptions.all do |subscription| # puts subscription.name # end # def subscriptions token: nil, max: nil ensure_service! options = { token: token, max: max } grpc = service.list_subscriptions options Subscription::List.from_grpc grpc, service, max end alias_method :find_subscriptions, :subscriptions alias_method :list_subscriptions, :subscriptions protected ## # @private Raise an error unless an active connection to the service is # available. def ensure_service! fail "Must have active connection to service" unless service end ## # Call the publish API with arrays of data data and attrs. def publish_batch_messages topic_name, batch, autocreate = false grpc = service.publish topic_name, batch.messages batch.to_gcloud_messages Array(grpc.message_ids) rescue Google::Cloud::NotFoundError => e if autocreate create_topic topic_name return publish_batch_messages topic_name, batch, false end raise e end end end end end