# frozen_string_literal: true module BusinessFlow # Mixin for business flow to acquire and retain a cluster lock. module ClusterLock def self.included(klass) klass.extend(ClassMethods) end def self.disable! @disabled = true end def self.enable! @disabled = false end def self.disabled? !!@disabled end def self.default_servers=(servers) @default_servers = if servers.is_a?(String) proc { servers } elsif servers Callable.new(servers) else nil end end def self.default_servers @default_servers ||= proc {} end def assert_cluster_lock! @_business_flow_cluster_lock.assert! unless BusinessFlow::ClusterLock.disabled? rescue ZK::Exceptions::ZKError => e BusinessFlow.add_error(errors, :cluster_lock, :assert_failed, e.message) raise end def read_attribute_for_validation(key) if key == :cluster_lock ClassMethods.lock_name(self) else super(key) end end # DSL Methods module ClassMethods # rubocop:disable Metrics/ModuleLength # Error raised when there is an internal issue with acquiring a lock. class LockFailure < StandardError attr_reader :error_type def initialize(error_type, message) @error_type = error_type super(message) end def add_to(flow) errors = flow.errors unless errors.key?(:cluster_lock) BusinessFlow.add_error(errors, :cluster_lock, error_type, message) end flow end end # Holder for information about the lock class LockInfo attr_reader :lock_name, :zookeeper_servers def initialize(lock_name, zookeeper_servers) @lock_name = lock_name @zookeeper_servers = zookeeper_servers end end def with_cluster_lock(lock_name = nil, opts = {}, &blk) if lock_name.is_a?(String) @lock_name = Step.new(Callable.new(proc { lock_name }), {}) elsif lock_name || blk @lock_name = Step.new(Callable.new(lock_name || blk), { default_output: :lock_name }.merge(opts)) else @lock_name ||= Step.new(Callable.new(default_lock_name), opts) end end def with_zookeeper_servers(servers = nil, opts = {}, &blk) if servers.is_a?(String) @zookeeper_servers = Step.new(Callable.new(proc { servers }), {}) elsif servers || blk @zookeeper_servers = Step.new(Callable.new(servers || blk), { default_output: :zookeeper_servers }.merge(opts)) else @zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts) end end def default_lock_name proc { self.class.name } end def build(parameter_object) add_cluster_luck_info_to_result_class super(parameter_object) end def execute(flow) lock_info = LockInfo.new( ClassMethods.lock_name(flow), ClassMethods.zookeeper_server_list(flow) ) ClassMethods.with_lock(flow, lock_info) do super(flow)._business_flow_cluster_lock_finalize(lock_info) end rescue LockFailure => e result_from(e.add_to(flow))._business_flow_cluster_lock_finalize(lock_info) end RESULT_FINALIZE = proc do |cluster_lock_info| @cluster_lock_info = cluster_lock_info self end def add_cluster_luck_info_to_result_class return if @cluster_lock_info_added result_class = const_get(:Result) DSL::PublicField.new(:cluster_lock_info).add_to(result_class) result_class.send(:define_method, :_business_flow_cluster_lock_finalize, RESULT_FINALIZE) @cluster_lock_info_added = true end # :reek:NilCheck def self.zookeeper_server_list(flow) servers = catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s } if servers.nil? || servers.empty? raise LockFailure.new(:no_servers, 'no zookeeper servers provided') end servers end # :reek:NilCheck def self.lock_name(flow) lock_name = catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s } if lock_name.nil? || lock_name.empty? raise LockFailure.new(:no_lock_name, 'no lock name provided') end lock_name end def self.instrumented_acquire_lock(flow, lock_info) flow.class.instrument(:cluster_lock_setup, flow) do |payload| payload[:lock_name] = lock_info.lock_name if payload acquire_lock(flow, lock_info, payload) end end def self.acquire_lock(flow, lock_info, payload) zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers) lock = flow.instance_variable_set( :@_business_flow_cluster_lock, ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name) ) inner_acquire_lock(zk_connection, lock, payload) end def self.inner_acquire_lock(zk_connection, lock, payload) lock_held = lock.lock(wait: false) payload[:lock_acquired] = lock_held if payload unless lock_held zk_connection.close! raise LockFailure.new(:lock_unavailable, 'the lock was not available') end [zk_connection, lock] end def self.cleanup(lock, zk_connection) begin lock&.unlock rescue ZK::Exceptions::OperationTimeOut # Just let the connection close handle this. end zk_connection&.close! end # :reek:ControlParameter # I'm using a case statement instead of a hash in a constant to ensure that this # doesn't throw exceptions if this file is required before zookeeper is. def self.exception_to_error_type(exc) case exc when ZK::Exceptions::LockAssertionFailedError :assert_failed when ZK::Exceptions::OperationTimeOut :zookeeper_timeout else :unknown_failure end end def self.with_lock(flow, lock_info, &_blk) unless BusinessFlow::ClusterLock.disabled? zk_connection, lock = instrumented_acquire_lock(flow, lock_info) end yield lock_info rescue ZK::Exceptions::LockAssertionFailedError, ZK::Exceptions::OperationTimeOut => e # This would occur if we asserted a cluster lock while executing the flow. # This will have set an error on the flow, so we can carry on. raise LockFailure.new(exception_to_error_type(e), e.message) ensure cleanup(lock, zk_connection) end end end end