require 'monitor' require 'flydata-core/logger' module Flydata module Helper # Take care of action ownership management # - Receive a request from both scheduler and worker. # - Give a action with ownership to workers class ActionOwnershipChannel include MonitorMixin include FlydataCore::Logger def initialize @action_ownership_map = ActionOwnership.action_ownership_map @queue = {} super end # Called from schdeduler and worker # action_name must be symbol def request_action(action_name, action_info = {}) action_ownership = @action_ownership_map[action_name] if action_ownership.nil? # unknown action. probably a new action log_warn "unsupported action `#{action_name}'. Skip." return false end self.synchronize do if @queue.has_key?(action_name) false else @queue[action_name] = { action_ownership: action_ownership, action_info: action_info } true end end end # Called from worker only def take_action_ownership(new_owner) self.synchronize do return nil if @queue.empty? # Wait until action will be processed action = @queue.values.first action_ownership = action[:action_ownership] return nil if action_ownership.processing? # Check resource change flag if action_ownership.resource_change && @action_ownership_map.any? { |name, act_own| (act_own.resource_change && act_own.processing?) } return nil end @queue.shift # delete last acion from queue action_ownership.owner = new_owner action end end # Called from worker only def return_action_ownership(action_ownership) self.synchronize do action_ownership.owner = nil action_ownership.last_processed_time = Time.now.to_f end end # For debug def queue @queue.dup end def map @action_ownership_map.dup end def dump $logger.debug "\n" + <