require 'monitor' module Flydata module Helper # Take care of action ownership management # - Receive a request from both scheduler and worker. # - Give a action with ownership to workers class ActionOwnershipChannel include MonitorMixin def initialize @action_ownership_map = ActionOwnership.action_ownership_map @queue = {} super end # Called from schdeduler and worker # action_name must be symbol def request_action(action_name, action_info = {}) action_ownership = @action_ownership_map[action_name] if action_ownership.nil? raise "Received invalid action request:#{action_name}" end self.synchronize do if @queue.has_key?(action_name) false else @queue[action_name] = { action_ownership: action_ownership, action_info: action_info } true end end end # Called from worker only def take_action_ownership(new_owner) self.synchronize do return nil if @queue.empty? # Wait until action will be processed action = @queue.values.first action_ownership = action[:action_ownership] return nil if action_ownership.processing? # Check resource change flag if action_ownership.resource_change && @action_ownership_map.any? { |name, act_own| (act_own.resource_change && act_own.processing?) } return nil end @queue.shift # delete last acion from queue action_ownership.owner = new_owner action end end # Called from worker only def return_action_ownership(action_ownership) self.synchronize do action_ownership.owner = nil action_ownership.last_processed_time = Time.now.to_f end end # For debug def queue @queue.dup end def map @action_ownership_map.dup end def dump $logger.debug "\n" + <<EOT ======== action_ownership_channel dump queue: #{@queue} action_ownership_map: #{@action_ownership_map} ======== EOT end end end end