# Copyright 2016-2021 The NATS Authors
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

module NATS

  # A Subscription represents interest in a given subject.
  # 
  # @example Create NATS subscription with callback.
  #   require 'nats/client'
  # 
  #   nc = NATS.connect("demo.nats.io")
  #   sub = nc.subscribe("foo") do |msg|
  #     puts "Received [#{msg.subject}]: #{}"
  #   end
  # 
  class Subscription
    include MonitorMixin

    attr_accessor :subject, :queue, :future, :callback, :response, :received, :max, :pending, :sid
    attr_accessor :pending_queue, :pending_size, :wait_for_msgs_cond, :concurrency_semaphore
    attr_accessor :pending_msgs_limit, :pending_bytes_limit
    attr_accessor :nc
    attr_accessor :jsi
    attr_accessor :closed

    def initialize(**opts)
      super() # required to initialize monitor
      @subject  = ''
      @queue    = nil
      @future   = nil
      @callback = nil
      @response = nil
      @received = 0
      @max      = nil
      @pending  = nil
      @sid      = nil
      @nc       = nil
      @closed   = nil

      # State from async subscriber messages delivery
      @pending_queue       = nil
      @pending_size        = 0
      @pending_msgs_limit  = nil
      @pending_bytes_limit = nil

      # Sync subscriber
      @wait_for_msgs_cond = nil

      # To limit number of concurrent messages being processed (1 to only allow sequential processing)
      @processing_concurrency = opts.fetch(:processing_concurrency, NATS::IO::DEFAULT_SINGLE_SUB_CONCURRENCY)
    end

    # Concurrency of message processing for a single subscription.
    # 1 means sequential processing
    # 2+ allow processed concurrently and possibly out of order.
    def processing_concurrency=(value)
      raise ArgumentError, "nats: subscription processing concurrency must be positive integer" unless value.positive?
      return if @processing_concurrency == value

      @processing_concurrency = value
      @concurrency_semaphore = Concurrent::Semaphore.new(value)
    end

    def concurrency_semaphore
      @concurrency_semaphore ||= Concurrent::Semaphore.new(@processing_concurrency)
    end

    # Auto unsubscribes the server by sending UNSUB command and throws away
    # subscription in case already present and has received enough messages.
    def unsubscribe(opt_max=nil)
      @nc.send(:unsubscribe, self, opt_max)
    end

    # next_msg blocks and waiting for the next message to be received.
    def next_msg(opts={})
      timeout = opts[:timeout] ||= 0.5
      synchronize do
        return @pending_queue.pop if not @pending_queue.empty?

        # Wait for a bit until getting a signal.
        MonotonicTime::with_nats_timeout(timeout) do
          wait_for_msgs_cond.wait(timeout)
        end

        if not @pending_queue.empty?
          return @pending_queue.pop
        else
          raise NATS::Timeout
        end
      end
    end

    def inspect
      "#<NATS::Subscription(subject: \"#{@subject}\", queue: \"#{@queue}\", sid: #{@sid})>"
    end

    def dispatch(msg)
      pending_queue << msg
      synchronize { self.pending_size += msg.data.size }

      # For async subscribers, send message for processing to the thread pool.
      enqueue_processing(@nc.subscription_executor) if callback

      # For sync subscribers, signal that there is a new message.
      wait_for_msgs_cond&.signal
    end

    def process(msg)
      return unless callback

      # Decrease pending size since consumed already
      synchronize { self.pending_size -= msg.data.size }

      nc.reloader.call do
        # Note: Keep some of the alternative arity versions to slightly
        # improve backwards compatibility.  Eventually fine to deprecate
        # since recommended version would be arity of 1 to get a NATS::Msg.
        case callback.arity
        when 0 then callback.call
        when 1 then callback.call(msg)
        when 2 then callback.call(msg.data, msg.reply)
        when 3 then callback.call(msg.data, msg.reply, msg.subject)
        else callback.call(msg.data, msg.reply, msg.subject, msg.header)
        end
      rescue => e
        synchronize { nc.send(:err_cb_call, nc, e, self) }
      end
    end

    # Send a message for its processing to a separate thread
    def enqueue_processing(executor)
      concurrency_semaphore.try_acquire || return # Previous message is being executed, let it finish and enqueue next one.
      executor.post do
        msg = pending_queue.pop(true)
        process(msg)
      rescue ThreadError # queue is empty
        concurrency_semaphore.release
      ensure
        concurrency_semaphore.release
        [concurrency_semaphore.available_permits, pending_queue.size].min.times do
          enqueue_processing(executor)
        end
      end
    end
  end
end