lib/async/container/generic.rb in async-container-0.16.6 vs lib/async/container/generic.rb in async-container-0.16.7

- old
+ new

@@ -28,14 +28,16 @@ require_relative 'keyed' require_relative 'statistics' module Async module Container + # An environment variable key to override {.processor_count}. ASYNC_CONTAINER_PROCESSOR_COUNT = 'ASYNC_CONTAINER_PROCESSOR_COUNT' - # The processor count which may be used for the default number of container threads/processes. You can override the value provided by the system by specifying the ASYNC_CONTAINER_PROCESSOR_COUNT environment variable. - # @return [Integer] the number of hardware processors which can run threads/processes simultaneously. + # The processor count which may be used for the default number of container threads/processes. You can override the value provided by the system by specifying the `ASYNC_CONTAINER_PROCESSOR_COUNT` environment variable. + # @returns [Integer] The number of hardware processors which can run threads/processes simultaneously. + # @raises [RuntimeError] If the process count is invalid. def self.processor_count(env = ENV) count = env.fetch(ASYNC_CONTAINER_PROCESSOR_COUNT) do Etc.nprocessors rescue 1 end.to_i @@ -44,10 +46,11 @@ end return count end + # A base class for implementing containers. class Generic def self.run(*arguments, **options, &block) self.new.run(*arguments, **options, &block) end @@ -63,45 +66,59 @@ @keyed = {} end attr :state + # A human readable representation of the container. + # @returns [String] def to_s "#{self.class} with #{@statistics.spawns} spawns and #{@statistics.failures} failures." end + # Look up a child process by key. + # A key could be a symbol, a file path, or something else which the child instance represents. def [] key @keyed[key]&.value end + # Statistics relating to the behavior of children instances. + # @attribute [Statistics] attr :statistics + # Whether any failures have occurred within the container. + # @returns [Boolean] def failed? @statistics.failed? end - # Whether there are running tasks. + # Whether the container has running children instances. def running? @group.running? end # Sleep until some state change occurs. - # @param duration [Integer] the maximum amount of time to sleep for. + # @parameter duration [Numeric] the maximum amount of time to sleep for. def sleep(duration = nil) @group.sleep(duration) end # Wait until all spawned tasks are completed. def wait @group.wait end + # Returns true if all children instances have the specified status flag set. + # e.g. `:ready`. + # This state is updated by the process readiness protocol mechanism. See {Notify::Client} for more details. + # @returns [Boolean] def status?(flag) # This also returns true if all processes have exited/failed: @state.all?{|_, state| state[flag]} end + # Wait until all the children instances have indicated that they are ready. + # @returns [Boolean] The children all became ready. def wait_until_ready while true Async.logger.debug(self) do |buffer| buffer.puts "Waiting for ready:" @state.each do |child, state| @@ -115,10 +132,12 @@ return true end end end + # Stop the children instances. + # @parameter timeout [Boolean | Numeric] Whether to stop gracefully, or a specific timeout. def stop(timeout = true) @running = false @group.stop(timeout) if @group.running? @@ -126,10 +145,14 @@ end ensure @running = true end + # Spawn a child instance into the container. + # @parameter name [String] The name of the child instance. + # @parameter restart [Boolean] Whether to restart the child instance if it fails. + # @parameter key [Symbol] A key used for reloading child instances. def spawn(name: nil, restart: false, key: nil, &block) name ||= UNNAMED if mark?(key) Async.logger.debug(self) {"Reusing existing child for #{key}: #{name}"} @@ -170,24 +193,28 @@ end.resume return true end - def async(**options, &block) - spawn(**options) do |instance| - Async::Reactor.run(instance, &block) - end - end - + # Run multiple instances of the same block in the container. + # @parameter count [Integer] The number of instances to start. def run(count: Container.processor_count, **options, &block) count.times do spawn(**options, &block) end return self end + # @deprecated Please use {spawn} or {run} instead. + def async(**options, &block) + spawn(**options) do |instance| + Async::Reactor.run(instance, &block) + end + end + + # Reload the container's keyed instances. def reload @keyed.each_value(&:clear!) yield @@ -198,10 +225,11 @@ end return dirty end + # Mark the container's keyed instance which ensures that it won't be discarded. def mark?(key) if key if value = @keyed[key] value.mark! @@ -210,9 +238,10 @@ end return false end + # Whether a child instance exists for the given key. def key?(key) if key @keyed.key?(key) end end