# frozen_string_literal: true

module Karafka
  module Patches
    module Rdkafka
      # Binding patches that slightly change how rdkafka operates in certain places
      module Bindings
        include ::Rdkafka::Bindings

        # Alias internally
        RB = ::Rdkafka::Bindings

        class << self
          # Handle assignments on cooperative rebalance
          #
          # @param client_ptr [FFI::Pointer]
          # @param code [Integer]
          # @param partitions_ptr [FFI::Pointer]
          def on_cooperative_rebalance(client_ptr, code, partitions_ptr)
            case code
            when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
              RB.rd_kafka_incremental_assign(client_ptr, partitions_ptr)
            when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
              RB.rd_kafka_commit(client_ptr, nil, false)
              RB.rd_kafka_incremental_unassign(client_ptr, partitions_ptr)
            else
              RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
            end
          end

          # Handle assignments on a eager rebalance
          #
          # @param client_ptr [FFI::Pointer]
          # @param code [Integer]
          # @param partitions_ptr [FFI::Pointer]
          def on_eager_rebalance(client_ptr, code, partitions_ptr)
            case code
            when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
              RB.rd_kafka_assign(client_ptr, partitions_ptr)
            when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
              RB.rd_kafka_commit(client_ptr, nil, false)
              RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
            else
              RB.rd_kafka_assign(client_ptr, FFI::Pointer::NULL)
            end
          end

          # Trigger Karafka callbacks
          #
          # @param code [Integer]
          # @param opaque [Rdkafka::Opaque]
          # @param consumer [Rdkafka::Consumer]
          # @param tpl [Rdkafka::Consumer::TopicPartitionList]
          def trigger_callbacks(code, opaque, consumer, tpl)
            case code
            when RB::RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS
              opaque.call_on_partitions_assigned(consumer, tpl)
            when RB::RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS
              opaque.call_on_partitions_revoked(consumer, tpl)
            end
          rescue StandardError => e
            Karafka.monitor.instrument(
              'error.occurred',
              caller: self,
              error: e,
              type: 'connection.client.rebalance_callback.error'
            )
          end
        end

        # This patch changes few things:
        # - it commits offsets (if any) upon partition revocation, so less jobs need to be
        #   reprocessed if they are assigned to a different process
        # - reports callback errors into the errors instrumentation instead of the logger
        # - catches only StandardError instead of Exception as we fully control the directly
        #   executed callbacks
        #
        # @see https://docs.confluent.io/2.0.0/clients/librdkafka/classRdKafka_1_1RebalanceCb.html
        RebalanceCallback = FFI::Function.new(
          :void, %i[pointer int pointer pointer]
        ) do |client_ptr, code, partitions_ptr, opaque_ptr|
          # Patch reference
          pr = ::Karafka::Patches::Rdkafka::Bindings

          if RB.rd_kafka_rebalance_protocol(client_ptr) == 'COOPERATIVE'
            pr.on_cooperative_rebalance(client_ptr, code, partitions_ptr)
          else
            pr.on_eager_rebalance(client_ptr, code, partitions_ptr)
          end

          opaque = ::Rdkafka::Config.opaques[opaque_ptr.to_i]
          return unless opaque

          tpl = ::Rdkafka::Consumer::TopicPartitionList.from_native_tpl(partitions_ptr).freeze
          consumer = ::Rdkafka::Consumer.new(client_ptr)

          pr.trigger_callbacks(code, opaque, consumer, tpl)
        end
      end
    end
  end
end

# We need to replace the original callback with ours.
# At the moment there is no API in rdkafka-ruby to do so
::Rdkafka::Bindings.send(
  :remove_const,
  'RebalanceCallback'
)

::Rdkafka::Bindings.const_set(
  'RebalanceCallback',
  Karafka::Patches::Rdkafka::Bindings::RebalanceCallback
)

::Rdkafka::Bindings.attach_function(
  :rd_kafka_rebalance_protocol,
  %i[pointer],
  :string
)

::Rdkafka::Bindings.attach_function(
  :rd_kafka_incremental_assign,
  %i[pointer pointer],
  :string
)

::Rdkafka::Bindings.attach_function(
  :rd_kafka_incremental_unassign,
  %i[pointer pointer],
  :string
)