lib/parallel_minion/minion.rb in parallel_minion-1.2.1 vs lib/parallel_minion/minion.rb in parallel_minion-1.3.0

- old
+ new

@@ -20,11 +20,11 @@ attr_reader :duration # Metrics [String] attr_reader :metric, :wait_metric - attr_reader :on_timeout, :log_exception, :start_time + attr_reader :on_timeout, :log_exception, :start_time, :on_exception_level # Give an infinite amount of time to wait for a Minion to complete a task INFINITE = 0 # Sets whether Minions should run in a separate thread. @@ -38,12 +38,12 @@ # Supports rolling back database changes after each test, since all changes are # performed on the same database connection. # - Production: # Batch processing in Rocket Job where throughput is more important than latency. # http://rocketjob.io - def self.enabled=(enabled) - @enabled = enabled + class << self + attr_writer :enabled end # Returns whether minions are enabled to run in their own threads def self.enabled? @enabled @@ -52,12 +52,12 @@ # The list of classes for which the current scope must be copied into the # new Minion (Thread) # # Example: # ... - def self.scoped_classes - @scoped_classes + class << self + attr_reader :scoped_classes end def self.scoped_classes=(scoped_classes) @scoped_classes = scoped_classes.dup end @@ -71,12 +71,12 @@ def self.started_log_level=(level) raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level) @started_log_level = level end - def self.started_log_level - @started_log_level + class << self + attr_reader :started_log_level end # Change the log level for the Completed log message. # # Default: :info @@ -86,12 +86,12 @@ def self.completed_log_level=(level) raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level) @completed_log_level = level end - def self.completed_log_level - @completed_log_level + class << self + attr_reader :completed_log_level end self.started_log_level = :info self.completed_log_level = :info self.enabled = true @@ -175,10 +175,14 @@ # Log the exception class and message. The backtrace will not be logged # :off # Any unhandled exception raised in the block will not be logged # Default: :partial # + # :on_exception_level [:trace | :debug | :info | :warn | :error | :fatal] + # Override the log level only when an exception occurs. + # Default: ParallelMinion::Minion.completed_log_level + # # :enabled [Boolean] # Override the global setting: `ParallelMinion::Minion.enabled?` for this minion instance. # # The overhead for moving the task to a Minion (separate thread) vs running it # sequentially is about 0.3 ms if performing other tasks in-between starting @@ -191,29 +195,57 @@ # Note: # On JRuby it is recommended to add the following setting to .jrubyrc # thread.pool.enabled=true # # Example: - # ParallelMinion::Minion.new(10.days.ago, description: 'Doing something else in parallel', timeout: 1000) do |date| + # ParallelMinion::Minion.new( + # 10.days.ago, + # description: 'Doing something else in parallel', + # timeout: 1000 + # ) do |date| # MyTable.where('created_at <= ?', date).count # end - def initialize(*arguments, description: 'Minion', metric: nil, log_exception: nil, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block) + # + # Example, when the result is being ignored, log full exception as an error: + # ParallelMinion::Minion.new( + # customer, + # description: "We don't care about the result", + # log_exception: :full, + # on_exception_level: :error + # ) do |customer| + # customer.save! + # end + def initialize(*arguments, + description: 'Minion', + metric: nil, + log_exception: nil, + on_exception_level: self.class.completed_log_level, + enabled: self.class.enabled?, + timeout: INFINITE, + on_timeout: nil, + wait_metric: nil, + &block) raise 'Missing mandatory block that Minion must perform' unless block - @start_time = Time.now - @exception = nil - @arguments = arguments - @timeout = timeout.to_f - @description = description.to_s - @metric = metric - @log_exception = log_exception - @enabled = enabled - @on_timeout = on_timeout + @start_time = Time.now + @exception = nil + @arguments = arguments + @timeout = timeout.to_f + @description = description.to_s + @metric = metric + @log_exception = log_exception + @on_exception_level = on_exception_level + @enabled = enabled + @on_timeout = on_timeout - @wait_metric = (wait_metric || "#{metric}/wait") if @metric + @wait_metric = (wait_metric || "#{metric}/wait") if @metric # When minion is disabled it is obvious in the logs since the name will now be 'Inline' instead of 'Minion' - self.logger = SemanticLogger['Inline'] unless @enabled + unless @enabled + l = self.class.logger.dup + l.name = 'Inline' + self.logger = l + end @enabled ? run(&block) : run_inline(&block) end # Returns the result when the thread completes @@ -223,11 +255,16 @@ # Note: The result of any thread cannot be nil def result # Return nil if Minion is still working and has time left to finish if working? ms = time_left - logger.measure(self.class.completed_log_level, "Waited for Minion to complete: #{description}", min_duration: 0.01, metric: wait_metric) do + logger.measure( + self.class.completed_log_level, + "Waited for Minion to complete: #{description}", + min_duration: 0.01, + metric: wait_metric + ) do if @thread.join(ms.nil? ? nil : ms / 1000).nil? @thread.raise(@on_timeout.new("Minion: #{description} timed out")) if @on_timeout logger.warn("Timed out waiting for: #{description}") return end @@ -255,11 +292,11 @@ # Returns the amount of time left in milli-seconds that this Minion has to finish its task # Returns 0 if no time is left # Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left) def time_left - return nil if (timeout == 0) || (timeout == -1) + return nil if timeout.zero? || (timeout == -1) duration = timeout - (Time.now - start_time) * 1000 duration <= 0 ? 0 : duration end # Returns [Boolean] whether this minion is enabled to run in a separate thread @@ -270,36 +307,44 @@ # Returns the current scopes for each of the models for which scopes will be # copied to the Minions if defined?(ActiveRecord) if ActiveRecord::VERSION::MAJOR >= 4 def self.current_scopes - scoped_classes.collect { |klass| klass.all } + scoped_classes.collect(&:all) end else def self.current_scopes - scoped_classes.collect { |klass| klass.scoped } + scoped_classes.collect(&:scoped) end end end private + # rubocop:disable Lint/RescueException + # Run the supplied block of code in the current thread. # Useful for debugging, testing, and when running in batch environments. def run_inline(&block) - begin - logger.public_send(self.class.started_log_level, "Started #{@description}") - logger.measure(self.class.completed_log_level, "Completed #{@description}", log_exception: @log_exception, metric: metric) do - @result = instance_exec(*arguments, &block) - end - rescue Exception => exc - @exception = exc - ensure - @duration = Time.now - start_time + logger.public_send(self.class.started_log_level, "Started #{description}") + logger.measure( + self.class.completed_log_level, + "Completed #{description}", + log_exception: log_exception, + on_exception_level: on_exception_level, + metric: metric + ) do + @result = instance_exec(*arguments, &block) end + rescue Exception => exc + @exception = exc + ensure + @duration = Time.now - start_time end + # rubocop:enable Lint/RescueException + def run(&block) # Capture tags from current thread tags = SemanticLogger.tags tags = tags.nil? || tags.empty? ? nil : tags.dup @@ -314,21 +359,30 @@ # Copy logging tags from parent thread, if any SemanticLogger.tagged(*tags) do SemanticLogger.named_tagged(named_tags) do logger.public_send(self.class.started_log_level, "Started #{description}") + # rubocop:disable Lint/RescueException begin - proc = Proc.new { run_in_scope(scopes, &block) } - logger.measure(self.class.completed_log_level, "Completed #{description}", log_exception: log_exception, metric: metric, &proc) + proc = proc { run_in_scope(scopes, &block) } + logger.measure( + self.class.completed_log_level, + "Completed #{description}", + log_exception: log_exception, + on_exception_level: on_exception_level, + metric: metric, + &proc + ) rescue Exception => exc @exception = exc nil ensure @duration = Time.now - start_time # Return any database connections used by this thread back to the pool ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base) end + # rubocop:enable Lint/RescueException end end end end @@ -336,14 +390,13 @@ if scopes.nil? || scopes.empty? @result = instance_exec(*@arguments, &block) else # Use the captured scope when running the block. # Each Class to scope requires passing a block to .scoping. - proc = Proc.new { instance_exec(*@arguments, &block) } + proc = proc { instance_exec(*@arguments, &block) } first = scopes.shift - scopes.each { |scope| proc = Proc.new { scope.scoping(&proc) } } + scopes.each { |scope| proc = proc { scope.scoping(&proc) } } @result = first.scoping(&proc) end end - end end