lib/legion/extensions.rb in legionio-0.2.0 vs lib/legion/extensions.rb in legionio-0.3.0
- old
+ new
@@ -1,7 +1,5 @@
-# frozen_string_literal: true
-
require 'legion/extensions/core'
require 'legion/runner'
module Legion
module Extensions
@@ -21,15 +19,17 @@
find_extensions
load_extensions
end
def shutdown
+ return nil if @loaded_extensions.nil?
+
@subscription_tasks.each do |task|
task[:threadpool].shutdown
- task[:threadpool].wait_for_termination(2)
- task[:threadpool].kill
+ task[:threadpool].kill unless task[:threadpool].wait_for_termination(5)
end
+
@loop_tasks.each { |task| task[:running_class].cancel if task[:running_class].respond_to?(:cancel) }
@once_tasks.each { |task| task[:running_class].cancel if task[:running_class].respond_to?(:cancel) }
@timer_tasks.each { |task| task[:running_class].cancel if task[:running_class].respond_to?(:cancel) }
@poll_tasks.each { |task| task[:running_class].cancel if task[:running_class].respond_to?(:cancel) }
@@ -77,24 +77,32 @@
extension.actors.each do |_key, actor|
extension.log.debug("hooking literal actor: #{actor}") if has_logger
hook_actor(**actor)
end
- extension.log.info 'Loaded'
+ extension.log.info "Loaded v#{extension::VERSION}"
rescue StandardError => e
Legion::Logging.error e.message
Legion::Logging.error e.backtrace
false
end
def hook_actor(extension:, extension_name:, actor_class:, size: 1, **opts)
- size = 1 unless size.is_a? Integer
+ size = if Legion::Settings[:extensions].key?(extension_name.to_sym) && Legion::Settings[:extensions][extension_name.to_sym].key?(:workers)
+ Legion::Settings[:extensions][extension_name.to_sym][:workers]
+ elsif size.is_a? Integer
+ size
+ else
+ 1
+ end
+
extension_hash = {
- extension: extension,
- extension_name: extension_name,
- actor_class: actor_class,
- size: size,
+ extension: extension,
+ extension_name: extension_name,
+ actor_class: actor_class,
+ size: size,
+ fallback_policy: :abort,
**opts
}
extension_hash[:running_class] = if actor_class.ancestors.include? Legion::Extensions::Actors::Subscription
actor_class
else
@@ -108,13 +116,18 @@
elsif actor_class.ancestors.include? Legion::Extensions::Actors::Loop
@loop_tasks.push(extension_hash)
elsif actor_class.ancestors.include? Legion::Extensions::Actors::Poll
@poll_tasks.push(extension_hash)
elsif actor_class.ancestors.include? Legion::Extensions::Actors::Subscription
- extension_hash[:threadpool] = Concurrent::FixedThreadPool.new(100)
+ extension_hash[:threadpool] = Concurrent::FixedThreadPool.new(size)
size.times do
extension_hash[:threadpool].post do
- actor_class.new.async.subscribe
+ klass = actor_class.new
+ if klass.respond_to?(:async)
+ klass.async.subscribe
+ else
+ klass.subscribe
+ end
end
end
@subscription_tasks.push(extension_hash)
else
Legion::Logging.fatal 'did not match any actor classes'