# Copyright 2015 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # https://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. require "google/cloud/errors" require "google/cloud/pubsub/service" require "google/cloud/pubsub/credentials" require "google/cloud/pubsub/topic" require "google/cloud/pubsub/batch_publisher" require "google/cloud/pubsub/snapshot" module Google module Cloud module Pubsub ## # # Project # # Represents the project that pubsub messages are pushed to and pulled # from. {Topic} is a named resource to which messages are sent by # publishers. {Subscription} is a named resource representing the stream # of messages from a single, specific topic, to be delivered to the # subscribing application. {Message} is a combination of data and # attributes that a publisher sends to a topic and is eventually delivered # to subscribers. # # See {Google::Cloud#pubsub} # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # topic = pubsub.topic "my-topic" # topic.publish "task completed" # class Project ## # @private The Service object. attr_accessor :service ## # @private Creates a new Pub/Sub Project instance. def initialize service @service = service end # The Pub/Sub project connected to. # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new( # project_id: "my-project", # credentials: "/path/to/keyfile.json" # ) # # pubsub.project_id #=> "my-project" # def project_id service.project end alias project project_id ## # Retrieves topic by name. # # @param [String] topic_name Name of a topic. # @param [String] project If the topic belongs to a project other than # the one currently connected to, the alternate project ID can be # specified here. Optional. # @param [Boolean] skip_lookup Optionally create a {Topic} object # without verifying the topic resource exists on the Pub/Sub service. # Calls made on this object will raise errors if the topic resource # does not exist. Default is `false`. Optional. # @param [Hash] async A hash of values to configure the topic's # {AsyncPublisher} that is created when {Topic#publish_async} # is called. Optional. # # Hash keys and values may include the following: # # * `:max_bytes` (Integer) The maximum size of messages to be # collected before the batch is published. Default is 10,000,000 # (10MB). # * `:max_messages` (Integer) The maximum number of messages to be # collected before the batch is published. Default is 1,000. # * `:interval` (Numeric) The number of seconds to collect messages # before the batch is published. Default is 0.25. # * `:threads` (Hash) The number of threads to create to handle # concurrent calls by the publisher: # * `:publish` (Integer) The number of threads used to publish # messages. Default is 4. # * `:callback` (Integer) The number of threads to handle the # published messages' callbacks. Default is 8. # # @return [Google::Cloud::Pubsub::Topic, nil] Returns `nil` if topic # does not exist. # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "existing-topic" # # @example By default `nil` will be returned if topic does not exist. # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "non-existing-topic" # nil # # @example Create topic in a different project with the `project` flag. # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "another-topic", project: "another-project" # # @example Skip the lookup against the service with `skip_lookup`: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "another-topic", skip_lookup: true # # @example Configuring AsyncPublisher to increase concurrent callbacks: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.topic "my-topic", # async: { threads: { callback: 16 } } # # topic.publish_async "task completed" do |result| # if result.succeeded? # log_publish_success result.data # else # log_publish_failure result.data, result.error # end # end # # topic.async_publisher.stop.wait! # def topic topic_name, project: nil, skip_lookup: nil, async: nil ensure_service! options = { project: project } return Topic.new_lazy(topic_name, service, options) if skip_lookup grpc = service.get_topic topic_name Topic.from_grpc grpc, service, async: async rescue Google::Cloud::NotFoundError nil end alias get_topic topic alias find_topic topic ## # Creates a new topic. # # @param [String] topic_name Name of a topic. # @param [Hash] async A hash of values to configure the topic's # {AsyncPublisher} that is created when {Topic#publish_async} # is called. Optional. # # Hash keys and values may include the following: # # * `:max_bytes` (Integer) The maximum size of messages to be # collected before the batch is published. Default is 10,000,000 # (10MB). # * `:max_messages` (Integer) The maximum number of messages to be # collected before the batch is published. Default is 1,000. # * `:interval` (Numeric) The number of seconds to collect messages # before the batch is published. Default is 0.25. # * `:threads` (Hash) The number of threads to create to handle # concurrent calls by the publisher: # * `:publish` (Integer) The number of threads used to publish # messages. Default is 4. # * `:callback` (Integer) The number of threads to handle the # published messages' callbacks. Default is 8. # # @return [Google::Cloud::Pubsub::Topic] # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # topic = pubsub.create_topic "my-topic" # def create_topic topic_name, async: nil ensure_service! grpc = service.create_topic topic_name Topic.from_grpc grpc, service, async: async end alias new_topic create_topic ## # Retrieves a list of topics for the given project. # # @param [String] token The `token` value returned by the last call to # `topics`; indicates that this is a continuation of a call, and that # the system should return the next page of data. # @param [Integer] max Maximum number of topics to return. # # @return [Array<Google::Cloud::Pubsub::Topic>] (See # {Google::Cloud::Pubsub::Topic::List}) # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # topics = pubsub.topics # topics.each do |topic| # puts topic.name # end # # @example Retrieve all topics: (See {Topic::List#all}) # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # topics = pubsub.topics # topics.all do |topic| # puts topic.name # end # def topics token: nil, max: nil ensure_service! options = { token: token, max: max } grpc = service.list_topics options Topic::List.from_grpc grpc, service, max end alias find_topics topics alias list_topics topics ## # Retrieves subscription by name. # # @param [String] subscription_name Name of a subscription. # @param [String] project If the subscription belongs to a project other # than the one currently connected to, the alternate project ID can be # specified here. # @param [Boolean] skip_lookup Optionally create a {Subscription} object # without verifying the subscription resource exists on the Pub/Sub # service. Calls made on this object will raise errors if the service # resource does not exist. Default is `false`. # # @return [Google::Cloud::Pubsub::Subscription, nil] Returns `nil` if # the subscription does not exist # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # sub = pubsub.subscription "my-sub" # sub.name #=> "projects/my-project/subscriptions/my-sub" # # @example Skip the lookup against the service with `skip_lookup`: # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # # No API call is made to retrieve the subscription information. # sub = pubsub.subscription "my-sub", skip_lookup: true # sub.name #=> "projects/my-project/subscriptions/my-sub" # def subscription subscription_name, project: nil, skip_lookup: nil ensure_service! options = { project: project } if skip_lookup return Subscription.new_lazy subscription_name, service, options end grpc = service.get_subscription subscription_name Subscription.from_grpc grpc, service rescue Google::Cloud::NotFoundError nil end alias get_subscription subscription alias find_subscription subscription ## # Retrieves a list of subscriptions for the given project. # # @param [String] token A previously-returned page token representing # part of the larger set of results to view. # @param [Integer] max Maximum number of subscriptions to return. # # @return [Array<Google::Cloud::Pubsub::Subscription>] (See # {Google::Cloud::Pubsub::Subscription::List}) # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # subs = pubsub.subscriptions # subs.each do |sub| # puts sub.name # end # # @example Retrieve all subscriptions: (See {Subscription::List#all}) # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # subs = pubsub.subscriptions # subs.all do |sub| # puts sub.name # end # def subscriptions token: nil, max: nil ensure_service! options = { token: token, max: max } grpc = service.list_subscriptions options Subscription::List.from_grpc grpc, service, max end alias find_subscriptions subscriptions alias list_subscriptions subscriptions ## # Retrieves a list of snapshots for the given project. # # @param [String] token A previously-returned page token representing # part of the larger set of results to view. # @param [Integer] max Maximum number of snapshots to return. # # @return [Array<Google::Cloud::Pubsub::Snapshot>] (See # {Google::Cloud::Pubsub::Snapshot::List}) # # @example # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # snapshots = pubsub.snapshots # snapshots.each do |snapshot| # puts snapshot.name # end # # @example Retrieve all snapshots: (See {Snapshot::List#all}) # require "google/cloud/pubsub" # # pubsub = Google::Cloud::Pubsub.new # # snapshots = pubsub.snapshots # snapshots.all do |snapshot| # puts snapshot.name # end # def snapshots token: nil, max: nil ensure_service! options = { token: token, max: max } grpc = service.list_snapshots options Snapshot::List.from_grpc grpc, service, max end alias find_snapshots snapshots alias list_snapshots snapshots protected ## # @private Raise an error unless an active connection to the service is # available. def ensure_service! raise "Must have active connection to service" unless service end ## # Call the publish API with arrays of data data and attrs. def publish_batch_messages topic_name, batch grpc = service.publish topic_name, batch.messages batch.to_gcloud_messages Array(grpc.message_ids) end end end end end