# typed: true

require_relative '../core'

require_relative 'event'
require_relative 'runtime/metrics'
require_relative 'workers'

require_relative '../../ddtrace/transport/http'

module Datadog
  module Tracing
    # Processor that sends traces and metadata to the agent
    # DEV: Our goal is for {Datadog::Tracing::Workers::TraceWriter} to replace this class in the future
    # @public_api
    class Writer
      attr_reader \
        :transport,
        :worker,
        :events

      def initialize(options = {})
        # writer and transport parameters
        @buff_size = options.fetch(:buffer_size, Workers::AsyncTransport::DEFAULT_BUFFER_MAX_SIZE)
        @flush_interval = options.fetch(:flush_interval, Workers::AsyncTransport::DEFAULT_FLUSH_INTERVAL)
        transport_options = options.fetch(:transport_options, {})

        transport_options[:agent_settings] = options[:agent_settings] if options.key?(:agent_settings)

        # transport and buffers
        @transport = options.fetch(:transport) do
          Transport::HTTP.default(**transport_options)
        end

        # handles the thread creation after an eventual fork
        @mutex_after_fork = Mutex.new
        @pid = nil

        @traces_flushed = 0

        # one worker for traces
        @worker = nil

        # Once stopped, this writer instance cannot be restarted.
        # This allow for graceful shutdown, while preventing
        # the host application from inadvertently start new
        # threads during shutdown.
        @stopped = false

        # Callback handler
        @events = Events.new
      end

      # Explicitly starts the {Writer}'s internal worker.
      #
      # The {Writer} is also automatically started when necessary during calls to {.write}.
      def start
        @mutex_after_fork.synchronize do
          return false if @stopped

          pid = Process.pid
          return if @worker && pid == @pid

          @pid = pid

          start_worker
          true
        end
      end

      # spawns a worker for spans; they share the same transport which is thread-safe
      # @!visibility private
      def start_worker
        @trace_handler = ->(items, transport) { send_spans(items, transport) }
        @worker = Workers::AsyncTransport.new(
          transport: @transport,
          buffer_size: @buff_size,
          on_trace: @trace_handler,
          interval: @flush_interval
        )

        @worker.start
      end

      # Gracefully shuts down this writer.
      #
      # Once stopped methods calls won't fail, but
      # no internal work will be performed.
      #
      # It is not possible to restart a stopped writer instance.
      def stop
        @mutex_after_fork.synchronize { stop_worker }
      end

      def stop_worker
        @stopped = true

        return if @worker.nil?

        @worker.stop
        @worker = nil

        true
      end

      private :start_worker, :stop_worker

      # flush spans to the trace-agent, handles spans only
      # @!visibility private
      def send_spans(traces, transport)
        return true if traces.empty?

        # Send traces and get responses
        responses = transport.send_traces(traces)

        # Tally up successful flushes
        responses.reject { |x| x.internal_error? || x.server_error? }.each do |response|
          @traces_flushed += response.trace_count
        end

        events.after_send.publish(self, responses)

        # Return if server error occurred.
        !responses.find(&:server_error?)
      end

      # enqueue the trace for submission to the API
      def write(trace)
        # In multiprocess environments, the main process initializes the +Writer+ instance and if
        # the process forks (i.e. a web server like Unicorn or Puma with multiple workers) the new
        # processes will share the same +Writer+ until the first write (COW). Because of that,
        # each process owns a different copy of the +@buffer+ after each write and so the
        # +AsyncTransport+ will not send data to the trace agent.
        #
        # This check ensures that if a process doesn't own the current +Writer+, async workers
        # will be initialized again (but only once for each process).
        start if @worker.nil? || @pid != Process.pid

        # TODO: Remove this, and have the tracer pump traces directly to runtime metrics
        #       instead of working through the trace writer.
        # Associate trace with runtime metrics
        Runtime::Metrics.associate_trace(trace)

        worker_local = @worker

        if worker_local
          worker_local.enqueue_trace(trace)
        elsif !@stopped
          Datadog.logger.debug('Writer either failed to start or was stopped before #write could complete')
        end
      end

      # stats returns a dictionary of stats about the writer.
      def stats
        {
          traces_flushed: @traces_flushed,
          transport: @transport.stats
        }
      end

      # Callback behavior
      class Events
        attr_reader \
          :after_send

        def initialize
          @after_send = AfterSend.new
        end

        # Triggered after the writer sends traces through the transport.
        # Provides the Writer instance and transport response list to the callback.
        class AfterSend < Tracing::Event
          def initialize
            super(:after_send)
          end
        end
      end
    end
  end
end