# Copyright 2016 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


require "google/cloud/errors"
require "google/cloud/pubsub/credentials"
require "google/cloud/pubsub/convert"
require "google/cloud/pubsub/version"
require "google/cloud/pubsub/v1"
require "securerandom"

module Google
  module Cloud
    module PubSub
      ##
      # @private Represents the Pub/Sub service API, including IAM mixins.
      class Service
        attr_accessor :project
        attr_accessor :credentials
        attr_accessor :host
        attr_accessor :timeout
        ###
        # The same client_id is used across all streaming pull connections that are created by this client. This is
        # intentional, as it indicates to the server that any guarantees, such as message ordering, made for a stream
        # that is disconnected will be made for the stream that is created to replace it. The attr_accessor allows the
        # value to be replaced for unit testing.
        attr_accessor :client_id

        ##
        # Creates a new Service instance.
        def initialize project, credentials, host: nil, timeout: nil
          @project = project
          @credentials = credentials
          @host = host
          @timeout = timeout
          @client_id = SecureRandom.uuid.freeze
        end

        def subscriber
          return mocked_subscriber if mocked_subscriber
          @subscriber ||= V1::Subscriber::Client.new do |config|
            config.credentials = credentials if credentials
            config.timeout = timeout if timeout
            config.endpoint = host if host
            config.lib_name = "gccl"
            config.lib_version = Google::Cloud::PubSub::VERSION
            config.metadata = { "google-cloud-resource-prefix": "projects/#{@project}" }
          end
        end
        attr_accessor :mocked_subscriber

        def publisher
          return mocked_publisher if mocked_publisher
          @publisher ||= V1::Publisher::Client.new do |config|
            config.credentials = credentials if credentials
            config.timeout = timeout if timeout
            config.endpoint = host if host
            config.lib_name = "gccl"
            config.lib_version = Google::Cloud::PubSub::VERSION
            config.metadata = { "google-cloud-resource-prefix": "projects/#{@project}" }
          end
        end
        attr_accessor :mocked_publisher

        def iam
          return mocked_iam if mocked_iam
          @iam ||= V1::IAMPolicy::Client.new do |config|
            config.credentials = credentials if credentials
            config.timeout = timeout if timeout
            config.endpoint = host if host
            config.lib_name = "gccl"
            config.lib_version = Google::Cloud::PubSub::VERSION
            config.metadata = { "google-cloud-resource-prefix": "projects/#{@project}" }
          end
        end
        attr_accessor :mocked_iam

        def schemas
          return mocked_schemas if mocked_schemas
          @schemas ||= V1::SchemaService::Client.new do |config|
            config.credentials = credentials if credentials
            config.timeout = timeout if timeout
            config.endpoint = host if host
            config.lib_name = "gccl"
            config.lib_version = Google::Cloud::PubSub::VERSION
            config.metadata = { "google-cloud-resource-prefix": "projects/#{@project}" }
          end
        end
        attr_accessor :mocked_schemas

        ##
        # Gets the configuration of a topic.
        # Since the topic only has the name attribute,
        # this method is only useful to check the existence of a topic.
        # If other attributes are added in the future,
        # they will be returned here.
        def get_topic topic_name, options = {}
          publisher.get_topic topic: topic_path(topic_name, options)
        end

        ##
        # Lists matching topics.
        def list_topics options = {}
          paged_enum = publisher.list_topics project:    project_path(options),
                                             page_size:  options[:max],
                                             page_token: options[:token]

          paged_enum.response
        end

        ##
        # Creates the given topic with the given name.
        def create_topic topic_name,
                         labels: nil,
                         kms_key_name: nil,
                         persistence_regions: nil,
                         schema_name: nil,
                         message_encoding: nil,
                         retention: nil,
                         options: {}
          if persistence_regions
            message_storage_policy = Google::Cloud::PubSub::V1::MessageStoragePolicy.new(
              allowed_persistence_regions: Array(persistence_regions)
            )
          end

          if schema_name || message_encoding
            unless schema_name && message_encoding
              raise ArgumentError, "Schema settings must include both schema_name and message_encoding."
            end
            schema_settings = Google::Cloud::PubSub::V1::SchemaSettings.new(
              schema:   schema_path(schema_name),
              encoding: message_encoding.to_s.upcase
            )
          end

          publisher.create_topic \
            name:                       topic_path(topic_name, options),
            labels:                     labels,
            kms_key_name:               kms_key_name,
            message_storage_policy:     message_storage_policy,
            schema_settings:            schema_settings,
            message_retention_duration: Convert.number_to_duration(retention)
        end

        def update_topic topic_obj, *fields
          mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s)
          publisher.update_topic topic: topic_obj, update_mask: mask
        end

        ##
        # Deletes the topic with the given name. All subscriptions to this topic
        # are also deleted. Raises GRPC status code 5 if the topic does not
        # exist. After a topic is deleted, a new topic may be created with the
        # same name.
        def delete_topic topic_name
          publisher.delete_topic topic: topic_path(topic_name)
        end

        ##
        # Adds one or more messages to the topic.
        # Raises GRPC status code 5 if the topic does not exist.
        # The messages parameter is an array of arrays.
        # The first element is the data, second is attributes hash.
        def publish topic, messages
          publisher.publish topic: topic_path(topic), messages: messages
        end

        ##
        # Gets the details of a subscription.
        def get_subscription subscription_name, options = {}
          subscriber.get_subscription subscription: subscription_path(subscription_name, options)
        end

        ##
        # Lists matching subscriptions by project and topic.
        def list_topics_subscriptions topic, options = {}
          publisher.list_topic_subscriptions topic:      topic_path(topic, options),
                                             page_size:  options[:max],
                                             page_token: options[:token]
        end

        ##
        # Lists matching subscriptions by project.
        def list_subscriptions options = {}
          paged_enum = subscriber.list_subscriptions project:    project_path(options),
                                                     page_size:  options[:max],
                                                     page_token: options[:token]

          paged_enum.response
        end

        ##
        # Creates a subscription on a given topic for a given subscriber.
        def create_subscription topic, subscription_name, options = {}
          subscriber.create_subscription \
            name:                       subscription_path(subscription_name, options),
            topic:                      topic_path(topic),
            push_config:                options[:push_config],
            ack_deadline_seconds:       options[:deadline],
            retain_acked_messages:      options[:retain_acked],
            message_retention_duration: Convert.number_to_duration(options[:retention]),
            labels:                     options[:labels],
            enable_message_ordering:    options[:message_ordering],
            filter:                     options[:filter],
            dead_letter_policy:         dead_letter_policy(options),
            retry_policy:               options[:retry_policy]
        end

        def update_subscription subscription_obj, *fields
          mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s)
          subscriber.update_subscription subscription: subscription_obj, update_mask: mask
        end

        ##
        # Deletes an existing subscription. All pending messages in the subscription are immediately dropped.
        def delete_subscription subscription
          subscriber.delete_subscription subscription: subscription_path(subscription)
        end

        ##
        # Detaches a subscription from its topic. All messages retained in the subscription are dropped. Subsequent
        # `Pull` and `StreamingPull` requests will raise `FAILED_PRECONDITION`. If the subscription is a push
        # subscription, pushes to the endpoint will stop.
        def detach_subscription subscription
          publisher.detach_subscription subscription: subscription_path(subscription)
        end

        ##
        # Pulls a single message from the server.
        def pull subscription, options = {}
          max_messages = options.fetch(:max, 100).to_i
          return_immediately = !(!options.fetch(:immediate, true))

          subscriber.pull subscription:       subscription_path(subscription, options),
                          max_messages:       max_messages,
                          return_immediately: return_immediately
        end

        def streaming_pull request_enum
          subscriber.streaming_pull request_enum
        end

        ##
        # Acknowledges receipt of a message.
        def acknowledge subscription, *ack_ids
          subscriber.acknowledge subscription: subscription_path(subscription), ack_ids: ack_ids
        end

        ##
        # Modifies the PushConfig for a specified subscription.
        def modify_push_config subscription, endpoint, attributes
          # Convert attributes to strings to match the protobuf definition
          attributes = Hash[attributes.map { |k, v| [String(k), String(v)] }]
          push_config = Google::Cloud::PubSub::V1::PushConfig.new(
            push_endpoint: endpoint,
            attributes:    attributes
          )

          subscriber.modify_push_config subscription: subscription_path(subscription),
                                        push_config:  push_config
        end

        ##
        # Modifies the ack deadline for a specific message.
        def modify_ack_deadline subscription, ids, deadline
          subscriber.modify_ack_deadline subscription:         subscription_path(subscription),
                                         ack_ids:              Array(ids),
                                         ack_deadline_seconds: deadline
        end

        ##
        # Lists snapshots by project.
        def list_snapshots options = {}
          paged_enum = subscriber.list_snapshots project:    project_path(options),
                                                 page_size:  options[:max],
                                                 page_token: options[:token]

          paged_enum.response
        end

        ##
        # Creates a snapshot on a given subscription.
        def create_snapshot subscription, snapshot_name, labels: nil
          subscriber.create_snapshot name:         snapshot_path(snapshot_name),
                                     subscription: subscription_path(subscription),
                                     labels:       labels
        end

        def update_snapshot snapshot_obj, *fields
          mask = Google::Protobuf::FieldMask.new paths: fields.map(&:to_s)
          subscriber.update_snapshot snapshot: snapshot_obj, update_mask: mask
        end

        ##
        # Deletes an existing snapshot.
        # All pending messages in the snapshot are immediately dropped.
        def delete_snapshot snapshot
          subscriber.delete_snapshot snapshot: snapshot_path(snapshot)
        end

        ##
        # Adjusts the given subscription to a time or snapshot.
        def seek subscription, time_or_snapshot
          if a_time? time_or_snapshot
            time = Convert.time_to_timestamp time_or_snapshot
            subscriber.seek subscription: subscription, time: time
          else
            time_or_snapshot = time_or_snapshot.name if time_or_snapshot.is_a? Snapshot
            subscriber.seek subscription: subscription_path(subscription),
                            snapshot:     snapshot_path(time_or_snapshot)
          end
        end

        ##
        # Lists schemas in the current (or given) project.
        # @param view [String, Symbol, nil] Possible values:
        #   * `BASIC` - Include the name and type of the schema, but not the definition.
        #   * `FULL` - Include all Schema object fields.
        #
        def list_schemas view, options = {}
          schema_view = Google::Cloud::PubSub::V1::SchemaView.const_get view.to_s.upcase
          paged_enum = schemas.list_schemas parent:     project_path(options),
                                            view:       schema_view,
                                            page_size:  options[:max],
                                            page_token: options[:token]

          paged_enum.response
        end

        ##
        # Creates a schema in the current (or given) project.
        def create_schema schema_id, type, definition, options = {}
          schema = Google::Cloud::PubSub::V1::Schema.new(
            type:       type,
            definition: definition
          )
          schemas.create_schema parent:    project_path(options),
                                schema:    schema,
                                schema_id: schema_id
        end

        ##
        # Gets the details of a schema.
        # @param view [String, Symbol, nil] The set of fields to return in the response. Possible values:
        #   * `BASIC` - Include the name and type of the schema, but not the definition.
        #   * `FULL` - Include all Schema object fields.
        #
        def get_schema schema_name, view, options = {}
          schema_view = Google::Cloud::PubSub::V1::SchemaView.const_get view.to_s.upcase
          schemas.get_schema name: schema_path(schema_name, options),
                             view: schema_view
        end

        ##
        # Delete a schema.
        def delete_schema schema_name
          schemas.delete_schema name: schema_path(schema_name)
        end

        ##
        # Validate the definition string intended for a schema.
        def validate_schema type, definition, options = {}
          schema = Google::Cloud::PubSub::V1::Schema.new(
            type:       type,
            definition: definition
          )
          schemas.validate_schema parent: project_path(options),
                                  schema: schema
        end

        ##
        # Validates a message against a schema.
        #
        # @param message_data [String] Message to validate against the provided `schema_spec`.
        # @param message_encoding [Google::Cloud::PubSub::V1::Encoding] The encoding expected for messages.
        # @param schema_name [String] Name of the schema against which to validate.
        # @param project [String] Name of the project if not the default project.
        # @param type [String] Ad-hoc schema type against which to validate.
        # @param definition [String] Ad-hoc schema definition against which to validate.
        #
        def validate_message message_data, message_encoding, schema_name: nil, project: nil, type: nil, definition: nil
          if type && definition
            schema = Google::Cloud::PubSub::V1::Schema.new(
              type:       type,
              definition: definition
            )
          end
          schemas.validate_message parent:   project_path(project: project),
                                   name:     schema_path(schema_name),
                                   schema:   schema,
                                   message:  message_data,
                                   encoding: message_encoding
        end

        # Helper methods

        def get_topic_policy topic_name, options = {}
          iam.get_iam_policy resource: topic_path(topic_name, options)
        end

        def set_topic_policy topic_name, new_policy, options = {}
          iam.set_iam_policy resource: topic_path(topic_name, options), policy: new_policy
        end

        def test_topic_permissions topic_name, permissions, options = {}
          iam.test_iam_permissions resource: topic_path(topic_name, options), permissions: permissions
        end

        def get_subscription_policy subscription_name, options = {}
          iam.get_iam_policy resource: subscription_path(subscription_name, options)
        end

        def set_subscription_policy subscription_name, new_policy, options = {}
          iam.set_iam_policy resource: subscription_path(subscription_name, options), policy: new_policy
        end

        def test_subscription_permissions subscription_name, permissions, options = {}
          iam.test_iam_permissions resource: subscription_path(subscription_name, options), permissions: permissions
        end

        def project_path options = {}
          project_name = options[:project] || project
          "projects/#{project_name}"
        end

        def topic_path topic_name, options = {}
          return topic_name if topic_name.to_s.include? "/"
          "#{project_path options}/topics/#{topic_name}"
        end

        def subscription_path subscription_name, options = {}
          return subscription_name if subscription_name.to_s.include? "/"
          "#{project_path options}/subscriptions/#{subscription_name}"
        end

        def snapshot_path snapshot_name, options = {}
          return snapshot_name if snapshot_name.nil? || snapshot_name.to_s.include?("/")
          "#{project_path options}/snapshots/#{snapshot_name}"
        end

        def schema_path schema_name, options = {}
          return schema_name if schema_name.nil? || schema_name.to_s.include?("/")
          "#{project_path options}/schemas/#{schema_name}"
        end

        def inspect
          "#<#{self.class.name} (#{@project})>"
        end

        protected

        def a_time? obj
          return false unless obj.respond_to? :to_time
          # Rails' String#to_time returns nil if the string doesn't parse.
          return false if obj.to_time.nil?
          true
        end

        def dead_letter_policy options
          return nil unless options[:dead_letter_topic_name]
          policy = Google::Cloud::PubSub::V1::DeadLetterPolicy.new dead_letter_topic: options[:dead_letter_topic_name]
          if options[:dead_letter_max_delivery_attempts]
            policy.max_delivery_attempts = options[:dead_letter_max_delivery_attempts]
          end
          policy
        end
      end
    end

    Pubsub = PubSub unless const_defined? :Pubsub
  end
end