lib/dynflow/dispatcher/client_dispatcher.rb in dynflow-1.0.5 vs lib/dynflow/dispatcher/client_dispatcher.rb in dynflow-1.1.0
- old
+ new
@@ -24,20 +24,100 @@
finished.success(resolve_to)
self
end
end
- def initialize(world)
+ # Class used for reducing the number of sent Pings among worlds.
+ # World's coordinator record include the time when was the world
+ # seen for the last time. This class can be used to query this
+ # information and determine whether the record is "fresh enough"
+ # or whether the Ping really needs to be sent.
+ class PingCache
+ # Format string used for formating and parsing times
+ TIME_FORMAT = '%Y-%m-%d %H:%M:%S.%L'.freeze
+ DEFAULT_MAX_AGE = 60
+
+ # Formats time into a string
+ #
+ # @param time [Time] the time to format
+ # @return [String] the formatted time
+ def self.format_time(time = Time.now)
+ time.strftime(TIME_FORMAT)
+ end
+
+ # Parses time from a string
+ #
+ # @param time [String] the time string to parse
+ # @return [Time] the parsed time
+ def self.load_time(time)
+ Time.strptime(time, TIME_FORMAT)
+ end
+
+ # @param world [World] the world to which the PingCache belongs
+ def initialize(world, max_age = DEFAULT_MAX_AGE)
+ @world = world
+ @max_age = max_age
+ @executor = {}
+ end
+
+ # Records when was the world seen into the world's coordinator record
+ #
+ # @param id [String] Id of the world to be added to the cache
+ # @param time [Time] Time when was the world last seen
+ def add_record(id, time = Time.now)
+ record = find_world id
+ @executor[id] ||= record.data[:class] == 'Dynflow::Coordinator::ExecutorWorld'
+ record.data[:meta].update(:last_seen => self.class.format_time(time))
+ @world.coordinator.update_record(record)
+ end
+
+ # Looks into the cache whether the world has an executor
+ #
+ # @param id [String] Id of the world
+ # @return [TrueClass] if the world has an executor
+ # @return [FalseClass] if the world is a client world
+ # @return [NilClass] if unknown
+ def executor?(id)
+ @executor[id]
+ end
+
+ # Loads the coordinator record from the database and checks whether the world
+ # was last seen within the time limit
+ #
+ # @param id [String] Id of the world to be checked
+ # @return [TrueClass] if the world was last seen within the limit
+ # @return [FalseClass] if the world was last seen after the limit passed
+ def fresh_record?(id)
+ record = find_world(id)
+ return false if record.nil?
+ @executor[id] = record.data[:class] == 'Dynflow::Coordinator::ExecutorWorld'
+ time = self.class.load_time(record.data[:meta][:last_seen])
+ time >= Time.now - @max_age
+ end
+
+ private
+
+ def find_world(id)
+ @world.coordinator.find_records(:id => id,
+ :class => ['Dynflow::Coordinator::ExecutorWorld', 'Dynflow::Coordinator::ClientWorld']).first
+ end
+ end
+
+ attr_reader :ping_cache
+ def initialize(world, ping_cache_age)
@world = Type! world, World
@last_id = 0
@tracked_requests = {}
@terminated = nil
+ @ping_cache = PingCache.new world, ping_cache_age
end
def publish_request(future, request, timeout)
- track_request(future, request, timeout) do |tracked_request|
- dispatch_request(request, @world.id, tracked_request.id)
+ with_ping_request_caching(request, future) do
+ track_request(future, request, timeout) do |tracked_request|
+ dispatch_request(request, @world.id, tracked_request.id)
+ end
end
end
def timeout(request_id)
resolve_tracked_request(request_id, Dynflow::Error.new("Request timeout"))
@@ -56,11 +136,11 @@
AnyExecutor
end),
(on ~Event do |event|
find_executor(event.execution_plan_id)
end),
- (on Ping.(~any) | Status.(~any, ~any) do |receiver_id, _|
+ (on Ping.(~any, ~any) | Status.(~any, ~any) do |receiver_id, _|
receiver_id
end)
envelope = Envelope[request_id, client_world_id, executor_id, request]
if Dispatcher::UnknownWorld === envelope.receiver_id
raise Dynflow::Error, "Could not find an executor for #{envelope}"
@@ -78,18 +158,31 @@
@tracked_requests[envelope.request_id].accept!
end),
(on ~Failed do |msg|
resolve_tracked_request(envelope.request_id, Dynflow::Error.new(msg.error))
end),
- (on Done | Pong do
+ (on Done do
resolve_tracked_request(envelope.request_id)
end),
+ (on Pong do
+ add_ping_cache_record(envelope.sender_id)
+ resolve_tracked_request(envelope.request_id)
+ end),
(on ExecutionStatus.(~any) do |steps|
@tracked_requests.delete(envelope.request_id).success! steps
end)
end
+ # Records when was the world with provided id last seen using a PingCache
+ #
+ # @param id [String] Id of the world
+ # @see PingCache#add_record
+ def add_ping_cache_record(id)
+ log Logger::DEBUG, "adding ping cache record for #{id}"
+ @ping_cache.add_record id
+ end
+
private
def find_executor(execution_plan_id)
execution_lock = @world.coordinator.find_locks(class: Coordinator::ExecutionLock.name,
id: "execution-plan:#{execution_plan_id}").first
@@ -139,8 +232,29 @@
end)
@tracked_requests.delete(id).success! resolve_to
end
end
+ # Tries to reduce the number of sent Ping requests by first looking into a cache. If the
+ # destination world is an executor world, the result is resolved solely from the cache.
+ # For client worlds the Ping might be sent if the cache record is stale.
+ #
+ # @param request [Dynflow::Dispatcher::Request] the request to send
+ # @param future [Concurrent::Future] the future to fulfill if the world was seen recently
+ # @return [Concurrent::Future] the future tracking the request
+ def with_ping_request_caching(request, future)
+ return yield unless request.is_a?(Dynflow::Dispatcher::Ping)
+ return yield unless request.use_cache
+
+ if @ping_cache.fresh_record?(request.receiver_id)
+ future.success(true)
+ else
+ if @ping_cache.executor?(request.receiver_id)
+ future.fail
+ else
+ yield
+ end
+ end
+ end
end
end
end