lib/async/container/generic.rb in async-container-0.16.6 vs lib/async/container/generic.rb in async-container-0.16.7
- old
+ new
@@ -28,14 +28,16 @@
require_relative 'keyed'
require_relative 'statistics'
module Async
module Container
+ # An environment variable key to override {.processor_count}.
ASYNC_CONTAINER_PROCESSOR_COUNT = 'ASYNC_CONTAINER_PROCESSOR_COUNT'
- # The processor count which may be used for the default number of container threads/processes. You can override the value provided by the system by specifying the ASYNC_CONTAINER_PROCESSOR_COUNT environment variable.
- # @return [Integer] the number of hardware processors which can run threads/processes simultaneously.
+ # The processor count which may be used for the default number of container threads/processes. You can override the value provided by the system by specifying the `ASYNC_CONTAINER_PROCESSOR_COUNT` environment variable.
+ # @returns [Integer] The number of hardware processors which can run threads/processes simultaneously.
+ # @raises [RuntimeError] If the process count is invalid.
def self.processor_count(env = ENV)
count = env.fetch(ASYNC_CONTAINER_PROCESSOR_COUNT) do
Etc.nprocessors rescue 1
end.to_i
@@ -44,10 +46,11 @@
end
return count
end
+ # A base class for implementing containers.
class Generic
def self.run(*arguments, **options, &block)
self.new.run(*arguments, **options, &block)
end
@@ -63,45 +66,59 @@
@keyed = {}
end
attr :state
+ # A human readable representation of the container.
+ # @returns [String]
def to_s
"#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures."
end
+ # Look up a child process by key.
+ # A key could be a symbol, a file path, or something else which the child instance represents.
def [] key
@keyed[key]&.value
end
+ # Statistics relating to the behavior of children instances.
+ # @attribute [Statistics]
attr :statistics
+ # Whether any failures have occurred within the container.
+ # @returns [Boolean]
def failed?
@statistics.failed?
end
- # Whether there are running tasks.
+ # Whether the container has running children instances.
def running?
@group.running?
end
# Sleep until some state change occurs.
- # @param duration [Integer] the maximum amount of time to sleep for.
+ # @parameter duration [Numeric] the maximum amount of time to sleep for.
def sleep(duration = nil)
@group.sleep(duration)
end
# Wait until all spawned tasks are completed.
def wait
@group.wait
end
+ # Returns true if all children instances have the specified status flag set.
+ # e.g. `:ready`.
+ # This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details.
+ # @returns [Boolean]
def status?(flag)
# This also returns true if all processes have exited/failed:
@state.all?{|_, state| state[flag]}
end
+ # Wait until all the children instances have indicated that they are ready.
+ # @returns [Boolean] The children all became ready.
def wait_until_ready
while true
Async.logger.debug(self) do |buffer|
buffer.puts "Waiting for ready:"
@state.each do |child, state|
@@ -115,10 +132,12 @@
return true
end
end
end
+ # Stop the children instances.
+ # @parameter timeout [Boolean | Numeric] Whether to stop gracefully, or a specific timeout.
def stop(timeout = true)
@running = false
@group.stop(timeout)
if @group.running?
@@ -126,10 +145,14 @@
end
ensure
@running = true
end
+ # Spawn a child instance into the container.
+ # @parameter name [String] The name of the child instance.
+ # @parameter restart [Boolean] Whether to restart the child instance if it fails.
+ # @parameter key [Symbol] A key used for reloading child instances.
def spawn(name: nil, restart: false, key: nil, &block)
name ||= UNNAMED
if mark?(key)
Async.logger.debug(self) {"Reusing existing child for #{key}: #{name}"}
@@ -170,24 +193,28 @@
end.resume
return true
end
- def async(**options, &block)
- spawn(**options) do |instance|
- Async::Reactor.run(instance, &block)
- end
- end
-
+ # Run multiple instances of the same block in the container.
+ # @parameter count [Integer] The number of instances to start.
def run(count: Container.processor_count, **options, &block)
count.times do
spawn(**options, &block)
end
return self
end
+ # @deprecated Please use {spawn} or {run} instead.
+ def async(**options, &block)
+ spawn(**options) do |instance|
+ Async::Reactor.run(instance, &block)
+ end
+ end
+
+ # Reload the container's keyed instances.
def reload
@keyed.each_value(&:clear!)
yield
@@ -198,10 +225,11 @@
end
return dirty
end
+ # Mark the container's keyed instance which ensures that it won't be discarded.
def mark?(key)
if key
if value = @keyed[key]
value.mark!
@@ -210,9 +238,10 @@
end
return false
end
+ # Whether a child instance exists for the given key.
def key?(key)
if key
@keyed.key?(key)
end
end