# frozen_string_literal: true module BusinessFlow # Mixin for business flow to acquire and retain a cluster lock. module ClusterLock def self.included(klass) klass.extend(ClassMethods) end def self.disable! @disabled = true end def self.enable! @disabled = false end def self.disabled? !!@disabled end def self.default_servers=(servers) if servers.is_a?(String) @default_servers = proc { servers } elsif servers @default_servers = Callable.new(servers) else @default_servers = nil end end def self.default_servers @default_servers ||= proc { nil } end def assert_cluster_lock! @_business_flow_cluster_lock.assert! if !BusinessFlow::ClusterLock.disabled? rescue ZK::Exceptions::ZKError => exc errors.add(:cluster_lock, :assert_failed, message: exc.message) raise end def read_attribute_for_validation(key) if key == :cluster_lock ClassMethods.lock_name(self) else super(key) end end # DSL Methods module ClassMethods # Error raised when there is an internal issue with acquiring a lock. class LockFailure < StandardError attr_reader :error_type def initialize(error_type, message) @error_type = error_type super(message) end def add_to(flow) errors = flow.errors errors.add(:cluster_lock, error_type, message: message) if !errors.key?(:cluster_lock) flow end end # Holder for information about the lock class LockInfo attr_reader :lock_name, :zookeeper_servers def initialize(lock_name, zookeeper_servers) @lock_name = lock_name @zookeeper_servers = zookeeper_servers end end def with_cluster_lock(lock_name = nil, opts = {}, &blk) if lock_name.is_a?(String) @lock_name = Step.new(Callable.new(proc { lock_name }), {}) elsif lock_name || blk @lock_name = Step.new(Callable.new(lock_name || blk), { default_output: :lock_name }.merge(opts)) else @lock_name ||= Step.new(Callable.new(default_lock_name), opts) end end def with_zookeeper_servers(servers = nil, opts = {}, &blk) if servers.is_a?(String) @zookeeper_servers = Step.new(Callable.new(proc { servers }), {}) elsif servers || blk @zookeeper_servers = Step.new(Callable.new(servers || blk), { default_output: :zookeeper_servers }.merge(opts)) else @zookeeper_servers || Step.new(BusinessFlow::ClusterLock.default_servers, opts) end end def default_lock_name proc { self.class.name } end def build(parameter_object) add_cluster_luck_info_to_result_class super(parameter_object) end def execute(flow) lock_info = LockInfo.new( ClassMethods.lock_name(flow), ClassMethods.zookeeper_server_list(flow) ) ClassMethods.with_lock(flow, lock_info) do super(flow)._business_flow_cluster_lock_finalize(lock_info) end rescue LockFailure => exc return result_from(exc.add_to(flow))._business_flow_cluster_lock_finalize(lock_info) end RESULT_FINALIZE = proc do |cluster_lock_info| @cluster_lock_info = cluster_lock_info self end def add_cluster_luck_info_to_result_class return if @cluster_lock_info_added result_class = const_get(:Result) DSL::PublicField.new(:cluster_lock_info).add_to(result_class) result_class.send(:define_method, :_business_flow_cluster_lock_finalize, RESULT_FINALIZE) @cluster_lock_info_added = true end # :reek:NilCheck def self.zookeeper_server_list(flow) servers = catch(:halt_step) { flow.class.with_zookeeper_servers.call(flow)&.merge_into(flow)&.to_s } if servers.nil? || servers.length == 0 raise LockFailure.new(:no_servers, 'no zookeeper servers provided') end servers end # :reek:NilCheck def self.lock_name(flow) lock_name = catch(:halt_step) { flow.class.with_cluster_lock.call(flow)&.merge_into(flow)&.to_s } if lock_name.nil? || lock_name.length == 0 raise LockFailure.new(:no_lock_name, 'no lock name provided') end lock_name end def self.instrumented_acquire_lock(flow, lock_info) flow.class.instrument(:cluster_lock_setup, flow) do |payload| payload[:lock_name] = lock_info.lock_name if payload acquire_lock(flow, lock_info, payload) end end def self.acquire_lock(flow, lock_info, payload) zk_connection = ZK::Client::Threaded.new(lock_info.zookeeper_servers) lock = flow.instance_variable_set( :@_business_flow_cluster_lock, ZK::Locker::ExclusiveLocker.new(zk_connection, lock_info.lock_name) ) inner_acquire_lock(zk_connection, lock, payload) end def self.inner_acquire_lock(zk_connection, lock, payload) lock_held = lock.lock(wait: false) payload[:lock_acquired] = lock_held if payload if !lock_held zk_connection.close! raise LockFailure.new(:lock_unavailable, 'the lock was not available') end [zk_connection, lock] end def self.cleanup(lock, zk_connection) lock.unlock if lock zk_connection.close! if zk_connection end def self.with_lock(flow, lock_info, &blk) zk_connection, lock = if !BusinessFlow::ClusterLock.disabled? instrumented_acquire_lock(flow, lock_info) end yield lock_info rescue ZK::Exceptions::LockAssertionFailedError => exc # This would occur if we asserted a cluster lock while executing the flow. # This will have set an error on the flow, so we can carry on. raise LockFailure.new(:assert_failed, exc.message) rescue ZK::Exceptions::OperationTimeOut # Sometimes this happens. Just let the ensure block take care of everything ensure cleanup(lock, zk_connection) end end end end