lib/parallel_minion/minion.rb in parallel_minion-1.2.1 vs lib/parallel_minion/minion.rb in parallel_minion-1.3.0
- old
+ new
@@ -20,11 +20,11 @@
attr_reader :duration
# Metrics [String]
attr_reader :metric, :wait_metric
- attr_reader :on_timeout, :log_exception, :start_time
+ attr_reader :on_timeout, :log_exception, :start_time, :on_exception_level
# Give an infinite amount of time to wait for a Minion to complete a task
INFINITE = 0
# Sets whether Minions should run in a separate thread.
@@ -38,12 +38,12 @@
# Supports rolling back database changes after each test, since all changes are
# performed on the same database connection.
# - Production:
# Batch processing in Rocket Job where throughput is more important than latency.
# http://rocketjob.io
- def self.enabled=(enabled)
- @enabled = enabled
+ class << self
+ attr_writer :enabled
end
# Returns whether minions are enabled to run in their own threads
def self.enabled?
@enabled
@@ -52,12 +52,12 @@
# The list of classes for which the current scope must be copied into the
# new Minion (Thread)
#
# Example:
# ...
- def self.scoped_classes
- @scoped_classes
+ class << self
+ attr_reader :scoped_classes
end
def self.scoped_classes=(scoped_classes)
@scoped_classes = scoped_classes.dup
end
@@ -71,12 +71,12 @@
def self.started_log_level=(level)
raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level)
@started_log_level = level
end
- def self.started_log_level
- @started_log_level
+ class << self
+ attr_reader :started_log_level
end
# Change the log level for the Completed log message.
#
# Default: :info
@@ -86,12 +86,12 @@
def self.completed_log_level=(level)
raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level)
@completed_log_level = level
end
- def self.completed_log_level
- @completed_log_level
+ class << self
+ attr_reader :completed_log_level
end
self.started_log_level = :info
self.completed_log_level = :info
self.enabled = true
@@ -175,10 +175,14 @@
# Log the exception class and message. The backtrace will not be logged
# :off
# Any unhandled exception raised in the block will not be logged
# Default: :partial
#
+ # :on_exception_level [:trace | :debug | :info | :warn | :error | :fatal]
+ # Override the log level only when an exception occurs.
+ # Default: ParallelMinion::Minion.completed_log_level
+ #
# :enabled [Boolean]
# Override the global setting: `ParallelMinion::Minion.enabled?` for this minion instance.
#
# The overhead for moving the task to a Minion (separate thread) vs running it
# sequentially is about 0.3 ms if performing other tasks in-between starting
@@ -191,29 +195,57 @@
# Note:
# On JRuby it is recommended to add the following setting to .jrubyrc
# thread.pool.enabled=true
#
# Example:
- # ParallelMinion::Minion.new(10.days.ago, description: 'Doing something else in parallel', timeout: 1000) do |date|
+ # ParallelMinion::Minion.new(
+ # 10.days.ago,
+ # description: 'Doing something else in parallel',
+ # timeout: 1000
+ # ) do |date|
# MyTable.where('created_at <= ?', date).count
# end
- def initialize(*arguments, description: 'Minion', metric: nil, log_exception: nil, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block)
+ #
+ # Example, when the result is being ignored, log full exception as an error:
+ # ParallelMinion::Minion.new(
+ # customer,
+ # description: "We don't care about the result",
+ # log_exception: :full,
+ # on_exception_level: :error
+ # ) do |customer|
+ # customer.save!
+ # end
+ def initialize(*arguments,
+ description: 'Minion',
+ metric: nil,
+ log_exception: nil,
+ on_exception_level: self.class.completed_log_level,
+ enabled: self.class.enabled?,
+ timeout: INFINITE,
+ on_timeout: nil,
+ wait_metric: nil,
+ &block)
raise 'Missing mandatory block that Minion must perform' unless block
- @start_time = Time.now
- @exception = nil
- @arguments = arguments
- @timeout = timeout.to_f
- @description = description.to_s
- @metric = metric
- @log_exception = log_exception
- @enabled = enabled
- @on_timeout = on_timeout
+ @start_time = Time.now
+ @exception = nil
+ @arguments = arguments
+ @timeout = timeout.to_f
+ @description = description.to_s
+ @metric = metric
+ @log_exception = log_exception
+ @on_exception_level = on_exception_level
+ @enabled = enabled
+ @on_timeout = on_timeout
- @wait_metric = (wait_metric || "#{metric}/wait") if @metric
+ @wait_metric = (wait_metric || "#{metric}/wait") if @metric
# When minion is disabled it is obvious in the logs since the name will now be 'Inline' instead of 'Minion'
- self.logger = SemanticLogger['Inline'] unless @enabled
+ unless @enabled
+ l = self.class.logger.dup
+ l.name = 'Inline'
+ self.logger = l
+ end
@enabled ? run(&block) : run_inline(&block)
end
# Returns the result when the thread completes
@@ -223,11 +255,16 @@
# Note: The result of any thread cannot be nil
def result
# Return nil if Minion is still working and has time left to finish
if working?
ms = time_left
- logger.measure(self.class.completed_log_level, "Waited for Minion to complete: #{description}", min_duration: 0.01, metric: wait_metric) do
+ logger.measure(
+ self.class.completed_log_level,
+ "Waited for Minion to complete: #{description}",
+ min_duration: 0.01,
+ metric: wait_metric
+ ) do
if @thread.join(ms.nil? ? nil : ms / 1000).nil?
@thread.raise(@on_timeout.new("Minion: #{description} timed out")) if @on_timeout
logger.warn("Timed out waiting for: #{description}")
return
end
@@ -255,11 +292,11 @@
# Returns the amount of time left in milli-seconds that this Minion has to finish its task
# Returns 0 if no time is left
# Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)
def time_left
- return nil if (timeout == 0) || (timeout == -1)
+ return nil if timeout.zero? || (timeout == -1)
duration = timeout - (Time.now - start_time) * 1000
duration <= 0 ? 0 : duration
end
# Returns [Boolean] whether this minion is enabled to run in a separate thread
@@ -270,36 +307,44 @@
# Returns the current scopes for each of the models for which scopes will be
# copied to the Minions
if defined?(ActiveRecord)
if ActiveRecord::VERSION::MAJOR >= 4
def self.current_scopes
- scoped_classes.collect { |klass| klass.all }
+ scoped_classes.collect(&:all)
end
else
def self.current_scopes
- scoped_classes.collect { |klass| klass.scoped }
+ scoped_classes.collect(&:scoped)
end
end
end
private
+ # rubocop:disable Lint/RescueException
+
# Run the supplied block of code in the current thread.
# Useful for debugging, testing, and when running in batch environments.
def run_inline(&block)
- begin
- logger.public_send(self.class.started_log_level, "Started #{@description}")
- logger.measure(self.class.completed_log_level, "Completed #{@description}", log_exception: @log_exception, metric: metric) do
- @result = instance_exec(*arguments, &block)
- end
- rescue Exception => exc
- @exception = exc
- ensure
- @duration = Time.now - start_time
+ logger.public_send(self.class.started_log_level, "Started #{description}")
+ logger.measure(
+ self.class.completed_log_level,
+ "Completed #{description}",
+ log_exception: log_exception,
+ on_exception_level: on_exception_level,
+ metric: metric
+ ) do
+ @result = instance_exec(*arguments, &block)
end
+ rescue Exception => exc
+ @exception = exc
+ ensure
+ @duration = Time.now - start_time
end
+ # rubocop:enable Lint/RescueException
+
def run(&block)
# Capture tags from current thread
tags = SemanticLogger.tags
tags = tags.nil? || tags.empty? ? nil : tags.dup
@@ -314,21 +359,30 @@
# Copy logging tags from parent thread, if any
SemanticLogger.tagged(*tags) do
SemanticLogger.named_tagged(named_tags) do
logger.public_send(self.class.started_log_level, "Started #{description}")
+ # rubocop:disable Lint/RescueException
begin
- proc = Proc.new { run_in_scope(scopes, &block) }
- logger.measure(self.class.completed_log_level, "Completed #{description}", log_exception: log_exception, metric: metric, &proc)
+ proc = proc { run_in_scope(scopes, &block) }
+ logger.measure(
+ self.class.completed_log_level,
+ "Completed #{description}",
+ log_exception: log_exception,
+ on_exception_level: on_exception_level,
+ metric: metric,
+ &proc
+ )
rescue Exception => exc
@exception = exc
nil
ensure
@duration = Time.now - start_time
# Return any database connections used by this thread back to the pool
ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
end
+ # rubocop:enable Lint/RescueException
end
end
end
end
@@ -336,14 +390,13 @@
if scopes.nil? || scopes.empty?
@result = instance_exec(*@arguments, &block)
else
# Use the captured scope when running the block.
# Each Class to scope requires passing a block to .scoping.
- proc = Proc.new { instance_exec(*@arguments, &block) }
+ proc = proc { instance_exec(*@arguments, &block) }
first = scopes.shift
- scopes.each { |scope| proc = Proc.new { scope.scoping(&proc) } }
+ scopes.each { |scope| proc = proc { scope.scoping(&proc) } }
@result = first.scoping(&proc)
end
end
-
end
end