# frozen_string_literal: true

module BusinessFlow
  # Mixin for business flow to acquire and retain a cluster lock.
  module ClusterLock
    def self.included(klass)
      klass.extend(ClassMethods)
    end

    def self.disable!
      @disabled = true
    end

    def self.enable!
      @disabled = false
    end

    def self.disabled?
      !!@disabled
    end

    def self.default_servers=(servers)
      if servers.is_a?(String)
        @default_servers = proc { servers }
      elsif servers
        @default_servers = Callable.new(servers)
      else
        @default_servers = nil
      end
    end

    def self.default_servers
      @default_servers ||= proc { nil }
    end

    def assert_cluster_lock!
      @_business_flow_cluster_lock.assert! if !BusinessFlow::ClusterLock.disabled?
    rescue ZK::Exceptions::ZKError => exc
      errors.add(:cluster_lock, :assert_failed, message: exc.message)
      raise
    end

    # DSL Methods
    module ClassMethods
      # Error raised when there is an internal issue with acquiring a lock.
      class LockFailure < StandardError
        attr_reader :error_type
        def initialize(error_type, message)
          @error_type = error_type
          super(message)
        end

        def add_to(flow)
          errors = flow.errors
          errors.add(:cluster_lock, error_type, message: message) if !errors.key?(:cluster_lock)
          flow
        end
      end

      # Holder for information about the lock
      class LockInfo
        attr_reader :lock_name, :zookeeper_servers

        def initialize(lock_name, zookeeper_servers)
          @lock_name = lock_name
          @zookeeper_servers = zookeeper_servers
        end
      end

      def with_cluster_lock(lock_name = nil, opts = {}, &blk)
        if lock_name.is_a?(String)
          @lock_name = Step.new(Callable.new(proc { lock_name }), {})
        elsif lock_name || blk
          @lock_name = Step.new(Callable.new(lock_name || blk),
                                { default_output: :lock_name }.merge(opts))
        else
          @lock_name ||= Step.new(Callable.new(default_lock_name), opts)
        end
      end

      def with_zookeeper_servers(servers = nil, opts = {}, &blk)
        if servers.is_a?(String)
          @zookeeper_servers = Step.new(Callable.new(proc { servers }), {})
        elsif servers || blk
          @zookeeper_servers = Step.new(Callable.new(servers || blk),
                                        { default_output: :zookeeper_servers }.merge(opts))
        else
          @zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts)
        end
      end

      def default_lock_name
        proc { self.class.name }
      end

      def build(parameter_object)
        add_cluster_luck_info_to_result_class
        super(parameter_object)
      end

      def execute(flow)
        lock_info = LockInfo.new(
          ClassMethods.lock_name(flow),
          ClassMethods.zookeeper_server_list(flow)
        )
        ClassMethods.with_lock(flow, lock_info) do
          super(flow)._business_flow_cluster_lock_finalize(lock_info)
        end
      rescue LockFailure => exc
        return result_from(exc.add_to(flow))._business_flow_cluster_lock_finalize(lock_info)
      end

      RESULT_FINALIZE = proc do |cluster_lock_info|
        @cluster_lock_info = cluster_lock_info
        self
      end

      def add_cluster_luck_info_to_result_class
        return if @cluster_lock_info_added
        result_class = const_get(:Result)
        DSL::PublicField.new(:cluster_lock_info).add_to(result_class)
        result_class.send(:define_method, :_business_flow_cluster_lock_finalize,
                          RESULT_FINALIZE)
        @cluster_lock_info_added = true
      end

      # :reek:NilCheck
      def self.zookeeper_server_list(flow)
        servers = catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s }
        if servers.nil? || servers.length == 0
          raise LockFailure.new(:no_servers, 'no zookeeper servers provided')
        end
        servers
      end

      # :reek:NilCheck
      def self.lock_name(flow)
        lock_name = catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s }
        if lock_name.nil? || lock_name.length == 0
          raise LockFailure.new(:no_lock_name, 'no lock name provided')
        end
        lock_name
      end

      def self.instrumented_acquire_lock(flow, lock_info)
        flow.class.instrument(:cluster_lock_setup, flow) do |payload|
          payload[:lock_name] = lock_info.lock_name if payload
          acquire_lock(flow, lock_info, payload)
        end
      end

      def self.acquire_lock(flow, lock_info, payload)
        zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers)
        lock = flow.instance_variable_set(
          :@_business_flow_cluster_lock,
          ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name)
        )
        inner_acquire_lock(zk_connection, lock, payload)
      end

      def self.inner_acquire_lock(zk_connection, lock, payload)
        lock_held = lock.lock(wait: false)
        payload[:lock_acquired] = lock_held if payload
        if !lock_held
          zk_connection.close!
          raise LockFailure.new(:lock_unavailable, 'the lock was not available')
        end
        [zk_connection, lock]
      end

      def self.cleanup(lock, zk_connection)
        lock.unlock if lock
        zk_connection.close! if zk_connection
      end

      def self.with_lock(flow, lock_info, &blk)
        zk_connection, lock =
          if !BusinessFlow::ClusterLock.disabled?
            instrumented_acquire_lock(flow, lock_info)
          end
        yield lock_info
      rescue ZK::Exceptions::LockAssertionFailedError => exc
        # This would occur if we asserted a cluster lock while executing the flow.
        # This will have set an error on the flow, so we can carry on.
        raise LockFailure.new(:assert_failed, exc.message)
      rescue ZK::Exceptions::OperationTimeOut
        # Sometimes this happens. Just let the ensure block take care of everything
      ensure
        cleanup(lock, zk_connection)
      end
    end
  end
end