lib/parallel_minion/minion.rb in parallel_minion-1.1.0 vs lib/parallel_minion/minion.rb in parallel_minion-1.2.0
- old
+ new
@@ -17,72 +17,135 @@
# Returns [Float] the number of milli-seconds the the minion took to complete
# Returns nil if the minion is still running
attr_reader :duration
+ # Metrics [String]
+ attr_reader :metric, :wait_metric
+
+ attr_reader :on_timeout, :log_exception, :start_time
+
# Give an infinite amount of time to wait for a Minion to complete a task
INFINITE = 0
- # Sets whether minions are enabled to run in their own threads
+ # Sets whether Minions should run in a separate thread.
#
- # By Setting _enabled_ to false all Minions that have not yet been started
- # will run in the thread from which it is created and not on its own thread
- #
- # This is useful:
- # - to run tests under the Capybara gem
- # - when debugging code so that all code is run sequentially in the current thread
- #
- # Note: Not recommended to set this setting to false in Production
+ # By Setting _enabled_ to false all Minions that have not yet been created
+ # will run in the thread in which it is created.
+ # - Development:
+ # Use a debugger, since the code will run in the current thread.
+ # - Test:
+ # Keep test execution in the current thread.
+ # Supports rolling back database changes after each test, since all changes are
+ # performed on the same database connection.
+ # - Production:
+ # Batch processing in Rocket Job where throughput is more important than latency.
+ # http://rocketjob.io
def self.enabled=(enabled)
- @@enabled = enabled
+ @enabled = enabled
end
# Returns whether minions are enabled to run in their own threads
def self.enabled?
- @@enabled
+ @enabled
end
# The list of classes for which the current scope must be copied into the
# new Minion (Thread)
#
# Example:
# ...
def self.scoped_classes
- @@scoped_classes
+ @scoped_classes
end
- # Create a new thread and
- # Log the time for the thread to complete processing
- # The exception without stack trace is logged whenever an exception is
- # thrown in the thread
- # Re-raises any unhandled exception in the calling thread when it call #result
- # Copy the logging tags and specified ActiveRecord scopes to the new thread
+ def self.scoped_classes=(scoped_classes)
+ @scoped_classes = scoped_classes.dup
+ end
+
+ # Change the log level for the Started log message.
#
+ # Default: :info
+ #
+ # Valid levels:
+ # :trace, :debug, :info, :warn, :error, :fatal
+ def self.started_log_level=(level)
+ raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level)
+ @started_log_level = level
+ end
+
+ def self.started_log_level
+ @started_log_level
+ end
+
+ # Change the log level for the Completed log message.
+ #
+ # Default: :info
+ #
+ # Valid levels:
+ # :trace, :debug, :info, :warn, :error, :fatal
+ def self.completed_log_level=(level)
+ raise(ArgumentError, "Invalid log level: #{level}") unless SemanticLogger::LEVELS.include?(level)
+ @completed_log_level = level
+ end
+
+ def self.completed_log_level
+ @completed_log_level
+ end
+
+ self.started_log_level = :info
+ self.completed_log_level = :info
+ self.enabled = true
+ self.scoped_classes = []
+ logger.name = 'Minion'
+
+ # Create a new Minion
+ #
+ # Creates a new thread and logs the time for the supplied block to complete processing.
+ # The exception without stack trace is logged whenever an exception is thrown in the thread.
+ #
+ # Re-raises any unhandled exception in the calling thread when `#result` is called.
+ # Copies the logging tags and specified ActiveRecord scopes to the new thread.
+ #
# Parameters
+ # *arguments
+ # Any number of arguments can be supplied that are passed into the block
+ # in the order they are listed.
+ #
+ # Note:
+ # All arguments must be supplied by copy and not by reference.
+ # For example, use `#dup` to create copies of passed data.
+ # Pass by copy is critical to prevent concurrency issues when multiple threads
+ # attempt to update the same object at the same time.
+ #
+ # Proc / lambda
+ # A block of code must be supplied that the Minion will execute.
+ #
+ # Note:
+ # This block will be executed within the scope of the created minion instance
+ # and _not_ within the scope of where the Proc/lambda was originally created.
+ # This is done to force all parameters to be passed in explicitly
+ # and should be read-only or duplicates of the original data.
+ #
# :description [String]
- # Description for this task that the Minion is performing
- # Put in the log file along with the time take to complete the task
+ # Description for this task that the Minion is performing.
+ # Written to the log file along with the time take to complete the task.
#
# :timeout [Integer]
# Maximum amount of time in milli-seconds that the task may take to complete
- # before #result times out
- # Set to 0 to give the thread an infinite amount of time to complete
+ # before #result times out.
+ # Set to 0 to give the thread an infinite amount of time to complete.
# Default: 0 ( Wait forever )
#
# Notes:
# - :timeout does not affect what happens to the Minion running the
# the task, it only affects how long #result will take to return.
# - The Minion will continue to run even after the timeout has been exceeded
# - If :enabled is false, or ParallelMinion::Minion.enabled is false,
# then :timeout is ignored and assumed to be Minion::INFINITE
# since the code is run in the calling thread when the Minion is created
#
- # :enabled [Boolean]
- # Whether the minion should run in a separate thread
- # Not recommended in Production, but is useful for debugging purposes
- # Default: ParallelMinion::Minion.enabled?
- #
# :on_timeout [Exception]
# The class to raise on the minion when the minion times out.
# By raising the exception on the running thread it ensures that the thread
# ends due to the exception, rather than continuing to execute.
# The exception is only raised on the running minion when #result is called.
@@ -90,114 +153,69 @@
# calls to #result will raise the supplied exception on the current thread
# since the thread will have terminated with that exception.
#
# Note: :on_timeout has no effect if not #enabled?
#
- # *args
- # Any number of arguments can be supplied that are passed into the block
- # in the order they are listed
- # It is recommended to duplicate and/or freeze objects passed as arguments
- # so that they are not modified at the same time by multiple threads
- # These arguments are accessible while and after the minion is running
- # by calling #arguments
+ # :metric [String]
+ # Name of the metric to forward to Semantic Logger when measuring the minion execution time.
+ # Example: inquiry/address_cleansing
#
- # Proc / lambda
- # A block of code must be supplied that the Minion will execute
- # NOTE: This block will be executed within the scope of the created minion
- # instance and _not_ within the scope of where the Proc/lambda was
- # originally created.
- # This is done to force all parameters to be passed in explicitly
- # and should be read-only or duplicates of the original data
+ # When a metric is supplied the following metrics will also be generated:
+ # - wait
+ # Duration waiting for a minion to complete.
#
+ # The additional metrics are added to the supplied metric name. For example:
+ # - inquiry/address_cleansing/wait
+ #
+ # :log_exception [Symbol]
+ # Control whether or how an exception thrown in the block is
+ # reported by Semantic Logger. Values:
+ # :full
+ # Log the exception class, message, and backtrace
+ # :partial
+ # Log the exception class and message. The backtrace will not be logged
+ # :off
+ # Any unhandled exception raised in the block will not be logged
+ # Default: :partial
+ #
+ # :enabled [Boolean]
+ # Override the global setting: `ParallelMinion::Minion.enabled?` for this minion instance.
+ #
# The overhead for moving the task to a Minion (separate thread) vs running it
# sequentially is about 0.3 ms if performing other tasks in-between starting
# the task and requesting its result.
#
# The following call adds 0.5 ms to total processing time vs running the
# code in-line:
# ParallelMinion::Minion.new(description: 'Count', timeout: 5) { 1 }.result
#
- # NOTE:
- # On JRuby it is very important to add the following setting to .jrubyrc
- # thread.pool.enabled=true
+ # Note:
+ # On JRuby it is recommended to add the following setting to .jrubyrc
+ # thread.pool.enabled=true
#
# Example:
# ParallelMinion::Minion.new(10.days.ago, description: 'Doing something else in parallel', timeout: 1000) do |date|
# MyTable.where('created_at <= ?', date).count
# end
- def initialize(*args, &block)
- raise "Missing mandatory block that Minion must perform" unless block
+ def initialize(*arguments, description: 'Minion', metric: nil, log_exception: nil, enabled: self.class.enabled?, timeout: INFINITE, on_timeout: nil, wait_metric: nil, &block)
+ raise 'Missing mandatory block that Minion must perform' unless block
@start_time = Time.now
@exception = nil
- @arguments = args.dup
- options = self.class.extract_options!(@arguments)
- @timeout = options.delete(:timeout).to_f
- @description = (options.delete(:description) || 'Minion').to_s
- @metric = options.delete(:metric)
- @log_exception = options.delete(:log_exception)
- @enabled = options.delete(:enabled)
- @enabled = self.class.enabled? if @enabled.nil?
- @on_timeout = options.delete(:on_timeout)
+ @arguments = arguments
+ @timeout = timeout.to_f
+ @description = description.to_s
+ @metric = metric
+ @log_exception = log_exception
+ @enabled = enabled
+ @on_timeout = on_timeout
- # Warn about any unknown options.
- options.each_pair do | key, val |
- logger.warn "Ignoring unknown option: #{key.inspect} => #{val.inspect}"
- warn "ParallelMinion::Minion Ignoring unknown option: #{key.inspect} => #{val.inspect}"
- end
+ @wait_metric = (wait_metric || "#{metric}/wait") if @metric
- # Run the supplied block of code in the current thread for testing or
- # debugging purposes
- if @enabled == false
- begin
- logger.info("Started in the current thread: #{@description}")
- logger.benchmark_info("Completed in the current thread: #{@description}", log_exception: @log_exception, metric: @metric) do
- @result = instance_exec(*@arguments, &block)
- end
- rescue Exception => exc
- @exception = exc
- ensure
- @duration = Time.now - @start_time
- end
- return
- end
+ # When minion is disabled it is obvious in the logs since the name will now be 'Inline' instead of 'Minion'
+ self.logger = SemanticLogger['Inline'] unless @enabled
- tags = (logger.tags || []).dup
-
- # Copy current scopes for new thread. Only applicable for AR models
- scopes = self.class.current_scopes if defined?(ActiveRecord::Base)
-
- @thread = Thread.new(*@arguments) do
- # Copy logging tags from parent thread
- logger.tagged(*tags) do
- # Set the current thread name to the description for this Minion
- # so that all log entries in this thread use this thread name
- Thread.current.name = "#{@description}-#{Thread.current.object_id}"
- logger.info("Started #{@description}")
-
- begin
- logger.benchmark_info("Completed #{@description}", log_exception: @log_exception, metric: @metric) do
- # Use the current scope for the duration of the task execution
- if scopes.nil? || (scopes.size == 0)
- @result = instance_exec(*@arguments, &block)
- else
- # Each Class to scope requires passing a block to .scoping
- proc = Proc.new { instance_exec(*@arguments, &block) }
- first = scopes.shift
- scopes.each {|scope| proc = Proc.new { scope.scoping(&proc) } }
- @result = first.scoping(&proc)
- end
- end
- rescue Exception => exc
- @exception = exc
- nil
- ensure
- # Return any database connections used by this thread back to the pool
- ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
- @duration = Time.now - @start_time
- end
- end
- end
+ @enabled ? run(&block) : run_inline(&block)
end
# Returns the result when the thread completes
# Returns nil if the thread has not yet completed
# Raises any unhandled exception in the thread, if any
@@ -205,14 +223,14 @@
# Note: The result of any thread cannot be nil
def result
# Return nil if Minion is still working and has time left to finish
if working?
ms = time_left
- logger.benchmark_info("Waited for Minion to complete: #{@description}", min_duration: 0.01) do
- if @thread.join(ms.nil? ? nil: ms / 1000).nil?
- @thread.raise(@on_timeout.new("Minion: #{@description} timed out")) if @on_timeout
- logger.warn("Timed out waiting for result from Minion: #{@description}")
+ logger.measure(self.class.completed_log_level, "Waited for Minion to complete: #{description}", min_duration: 0.01, metric: wait_metric) do
+ if @thread.join(ms.nil? ? nil : ms / 1000).nil?
+ @thread.raise(@on_timeout.new("Minion: #{description} timed out")) if @on_timeout
+ logger.warn("Timed out waiting for: #{description}")
return
end
end
end
@@ -237,12 +255,12 @@
# Returns the amount of time left in milli-seconds that this Minion has to finish its task
# Returns 0 if no time is left
# Returns nil if their is no time limit. I.e. :timeout was set to Minion::INFINITE (infinite time left)
def time_left
- return nil if (@timeout == 0) || (@timeout == -1)
- duration = @timeout - (Time.now - @start_time) * 1000
+ return nil if (timeout == 0) || (timeout == -1)
+ duration = timeout - (Time.now - start_time) * 1000
duration <= 0 ? 0 : duration
end
# Returns [Boolean] whether this minion is enabled to run in a separate thread
def enabled?
@@ -250,28 +268,81 @@
end
# Returns the current scopes for each of the models for which scopes will be
# copied to the Minions
if defined?(ActiveRecord)
- if ActiveRecord::VERSION::MAJOR >= 4
+ if ActiveRecord::VERSION::MAJOR >= 4
def self.current_scopes
- @@scoped_classes.collect {|klass| klass.all}
+ scoped_classes.collect { |klass| klass.all }
end
else
def self.current_scopes
- @@scoped_classes.collect {|klass| klass.scoped}
+ scoped_classes.collect { |klass| klass.scoped }
end
end
end
- protected
+ private
- @@enabled = true
- @@scoped_classes = []
+ # Run the supplied block of code in the current thread.
+ # Useful for debugging, testing, and when running in batch environments.
+ def run_inline(&block)
+ begin
+ logger.public_send(self.class.started_log_level, "Started #{@description}")
+ logger.measure(self.class.completed_log_level, "Completed #{@description}", log_exception: @log_exception, metric: metric) do
+ @result = instance_exec(*arguments, &block)
+ end
+ rescue Exception => exc
+ @exception = exc
+ ensure
+ @duration = Time.now - start_time
+ end
+ end
- # Extract options from a hash.
- def self.extract_options!(args)
- args.last.is_a?(Hash) ? args.pop : {}
+ def run(&block)
+ # Capture tags from current thread
+ tags = SemanticLogger.tags
+ tags = tags.nil? || tags.empty? ? nil : tags.dup
+
+ named_tags = SemanticLogger.named_tags
+ named_tags = named_tags.nil? || named_tags.empty? ? nil : named_tags.dup
+
+ # Captures scopes from current thread. Only applicable for AR models
+ scopes = self.class.current_scopes if defined?(ActiveRecord::Base)
+
+ @thread = Thread.new(*arguments) do
+ Thread.current.name = "#{description}-#{Thread.current.object_id}"
+
+ # Copy logging tags from parent thread, if any
+ proc = Proc.new { run_in_scope(scopes, &block) }
+ proc2 = tags ? Proc.new { SemanticLogger.tagged(*tags, &proc) } : proc
+ proc3 = named_tags ? Proc.new { SemanticLogger.named_tagged(named_tags, &proc2) } : proc2
+
+ logger.public_send(self.class.started_log_level, "Started #{description}")
+ begin
+ logger.measure(self.class.completed_log_level, "Completed #{description}", log_exception: log_exception, metric: metric, &proc3)
+ rescue Exception => exc
+ @exception = exc
+ nil
+ ensure
+ @duration = Time.now - start_time
+ # Return any database connections used by this thread back to the pool
+ ActiveRecord::Base.clear_active_connections! if defined?(ActiveRecord::Base)
+ end
+ end
+ end
+
+ def run_in_scope(scopes, &block)
+ if scopes.nil? || scopes.empty?
+ @result = instance_exec(*@arguments, &block)
+ else
+ # Use the captured scope when running the block.
+ # Each Class to scope requires passing a block to .scoping.
+ proc = Proc.new { instance_exec(*@arguments, &block) }
+ first = scopes.shift
+ scopes.each { |scope| proc = Proc.new { scope.scoping(&proc) } }
+ @result = first.scoping(&proc)
+ end
end
end
end