lib/dynflow/dispatcher/client_dispatcher.rb in dynflow-1.0.5 vs lib/dynflow/dispatcher/client_dispatcher.rb in dynflow-1.1.0

- old
+ new

@@ -24,20 +24,100 @@ finished.success(resolve_to) self end end - def initialize(world) + # Class used for reducing the number of sent Pings among worlds. + # World's coordinator record include the time when was the world + # seen for the last time. This class can be used to query this + # information and determine whether the record is "fresh enough" + # or whether the Ping really needs to be sent. + class PingCache + # Format string used for formating and parsing times + TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%L'.freeze + DEFAULT_MAX_AGE = 60 + + # Formats time into a string + # + # @param time [Time] the time to format + # @return [String] the formatted time + def self.format_time(time = Time.now) + time.strftime(TIME_FORMAT) + end + + # Parses time from a string + # + # @param time [String] the time string to parse + # @return [Time] the parsed time + def self.load_time(time) + Time.strptime(time, TIME_FORMAT) + end + + # @param world [World] the world to which the PingCache belongs + def initialize(world, max_age = DEFAULT_MAX_AGE) + @world = world + @max_age = max_age + @executor = {} + end + + # Records when was the world seen into the world's coordinator record + # + # @param id [String] Id of the world to be added to the cache + # @param time [Time] Time when was the world last seen + def add_record(id, time = Time.now) + record = find_world id + @executor[id] ||= record.data[:class] == 'Dynflow::Coordinator::ExecutorWorld' + record.data[:meta].update(:last_seen => self.class.format_time(time)) + @world.coordinator.update_record(record) + end + + # Looks into the cache whether the world has an executor + # + # @param id [String] Id of the world + # @return [TrueClass] if the world has an executor + # @return [FalseClass] if the world is a client world + # @return [NilClass] if unknown + def executor?(id) + @executor[id] + end + + # Loads the coordinator record from the database and checks whether the world + # was last seen within the time limit + # + # @param id [String] Id of the world to be checked + # @return [TrueClass] if the world was last seen within the limit + # @return [FalseClass] if the world was last seen after the limit passed + def fresh_record?(id) + record = find_world(id) + return false if record.nil? + @executor[id] = record.data[:class] == 'Dynflow::Coordinator::ExecutorWorld' + time = self.class.load_time(record.data[:meta][:last_seen]) + time >= Time.now - @max_age + end + + private + + def find_world(id) + @world.coordinator.find_records(:id => id, + :class => ['Dynflow::Coordinator::ExecutorWorld', 'Dynflow::Coordinator::ClientWorld']).first + end + end + + attr_reader :ping_cache + def initialize(world, ping_cache_age) @world = Type! world, World @last_id = 0 @tracked_requests = {} @terminated = nil + @ping_cache = PingCache.new world, ping_cache_age end def publish_request(future, request, timeout) - track_request(future, request, timeout) do |tracked_request| - dispatch_request(request, @world.id, tracked_request.id) + with_ping_request_caching(request, future) do + track_request(future, request, timeout) do |tracked_request| + dispatch_request(request, @world.id, tracked_request.id) + end end end def timeout(request_id) resolve_tracked_request(request_id, Dynflow::Error.new("Request timeout")) @@ -56,11 +136,11 @@ AnyExecutor end), (on ~Event do |event| find_executor(event.execution_plan_id) end), - (on Ping.(~any) | Status.(~any, ~any) do |receiver_id, _| + (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _| receiver_id end) envelope = Envelope[request_id, client_world_id, executor_id, request] if Dispatcher::UnknownWorld === envelope.receiver_id raise Dynflow::Error, "Could not find an executor for #{envelope}" @@ -78,18 +158,31 @@ @tracked_requests[envelope.request_id].accept! end), (on ~Failed do |msg| resolve_tracked_request(envelope.request_id, Dynflow::Error.new(msg.error)) end), - (on Done | Pong do + (on Done do resolve_tracked_request(envelope.request_id) end), + (on Pong do + add_ping_cache_record(envelope.sender_id) + resolve_tracked_request(envelope.request_id) + end), (on ExecutionStatus.(~any) do |steps| @tracked_requests.delete(envelope.request_id).success! steps end) end + # Records when was the world with provided id last seen using a PingCache + # + # @param id [String] Id of the world + # @see PingCache#add_record + def add_ping_cache_record(id) + log Logger::DEBUG, "adding ping cache record for #{id}" + @ping_cache.add_record id + end + private def find_executor(execution_plan_id) execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name, id: "execution-plan:#{execution_plan_id}").first @@ -139,8 +232,29 @@ end) @tracked_requests.delete(id).success! resolve_to end end + # Tries to reduce the number of sent Ping requests by first looking into a cache. If the + # destination world is an executor world, the result is resolved solely from the cache. + # For client worlds the Ping might be sent if the cache record is stale. + # + # @param request [Dynflow::Dispatcher::Request] the request to send + # @param future [Concurrent::Future] the future to fulfill if the world was seen recently + # @return [Concurrent::Future] the future tracking the request + def with_ping_request_caching(request, future) + return yield unless request.is_a?(Dynflow::Dispatcher::Ping) + return yield unless request.use_cache + + if @ping_cache.fresh_record?(request.receiver_id) + future.success(true) + else + if @ping_cache.executor?(request.receiver_id) + future.fail + else + yield + end + end + end end end end