lib/legion/extensions.rb in legionio-0.2.0 vs lib/legion/extensions.rb in legionio-0.3.0

- old
+ new

@@ -1,7 +1,5 @@ -# frozen_string_literal: true - require 'legion/extensions/core' require 'legion/runner' module Legion module Extensions @@ -21,15 +19,17 @@ find_extensions load_extensions end def shutdown + return nil if @loaded_extensions.nil? + @subscription_tasks.each do |task| task[:threadpool].shutdown - task[:threadpool].wait_for_termination(2) - task[:threadpool].kill + task[:threadpool].kill unless task[:threadpool].wait_for_termination(5) end + @loop_tasks.each { |task| task[:running_class].cancel if task[:running_class].respond_to?(:cancel) } @once_tasks.each { |task| task[:running_class].cancel if task[:running_class].respond_to?(:cancel) } @timer_tasks.each { |task| task[:running_class].cancel if task[:running_class].respond_to?(:cancel) } @poll_tasks.each { |task| task[:running_class].cancel if task[:running_class].respond_to?(:cancel) } @@ -77,24 +77,32 @@ extension.actors.each do |_key, actor| extension.log.debug("hooking literal actor: #{actor}") if has_logger hook_actor(**actor) end - extension.log.info 'Loaded' + extension.log.info "Loaded v#{extension::VERSION}" rescue StandardError => e Legion::Logging.error e.message Legion::Logging.error e.backtrace false end def hook_actor(extension:, extension_name:, actor_class:, size: 1, **opts) - size = 1 unless size.is_a? Integer + size = if Legion::Settings[:extensions].key?(extension_name.to_sym) && Legion::Settings[:extensions][extension_name.to_sym].key?(:workers) + Legion::Settings[:extensions][extension_name.to_sym][:workers] + elsif size.is_a? Integer + size + else + 1 + end + extension_hash = { - extension: extension, - extension_name: extension_name, - actor_class: actor_class, - size: size, + extension: extension, + extension_name: extension_name, + actor_class: actor_class, + size: size, + fallback_policy: :abort, **opts } extension_hash[:running_class] = if actor_class.ancestors.include? Legion::Extensions::Actors::Subscription actor_class else @@ -108,13 +116,18 @@ elsif actor_class.ancestors.include? Legion::Extensions::Actors::Loop @loop_tasks.push(extension_hash) elsif actor_class.ancestors.include? Legion::Extensions::Actors::Poll @poll_tasks.push(extension_hash) elsif actor_class.ancestors.include? Legion::Extensions::Actors::Subscription - extension_hash[:threadpool] = Concurrent::FixedThreadPool.new(100) + extension_hash[:threadpool] = Concurrent::FixedThreadPool.new(size) size.times do extension_hash[:threadpool].post do - actor_class.new.async.subscribe + klass = actor_class.new + if klass.respond_to?(:async) + klass.async.subscribe + else + klass.subscribe + end end end @subscription_tasks.push(extension_hash) else Legion::Logging.fatal 'did not match any actor classes'