lib/parallel_minion/minion.rb in parallel_minion-0.4.1 vs lib/parallel_minion/minion.rb in parallel_minion-1.0.0
- old
+ new
@@ -10,10 +10,13 @@
attr_reader :exception
# Returns [Integer] the maximum duration in milli-seconds that the Minion may take to complete the task
attr_reader :timeout
+ # Returns [Array<Object>] list of arguments in the order they were passed into the initializer
+ attr_reader :arguments
+
# Give an infinite amount of time to wait for a Minion to complete a task
INFINITE = -1
# Sets whether minions are enabled to run in their own threads
#
@@ -77,10 +80,12 @@
# *args
# Any number of arguments can be supplied that are passed into the block
# in the order they are listed
# It is recommended to duplicate and/or freeze objects passed as arguments
# so that they are not modified at the same time by multiple threads
+ # These arguments are accessible while and after the minion is running
+ # by calling #arguments
#
# Proc / lambda
# A block of code must be supplied that the Minion will execute
# NOTE: This block will be executed within the scope of the created minion
# instance and _not_ within the scope of where the Proc/lambda was
@@ -104,35 +109,34 @@
# ParallelMinion::Minion.new(10.days.ago, description: 'Doing something else in parallel', timeout: 1000) do |date|
# MyTable.where('created_at <= ?', date).count
# end
def initialize(*args, &block)
raise "Missing mandatory block that Minion must perform" unless block
- @start_time = Time.now
- @exception = nil
-
- options = self.class.extract_options!(args).dup
-
+ @start_time = Time.now
+ @exception = nil
+ @arguments = args.dup
+ options = self.class.extract_options!(@arguments)
@timeout = (options.delete(:timeout) || Minion::INFINITE).to_f
@description = (options.delete(:description) || 'Minion').to_s
@metric = options.delete(:metric)
@log_exception = options.delete(:log_exception)
@enabled = options.delete(:enabled)
@enabled = self.class.enabled? if @enabled.nil?
# Warn about any unknown options.
- options.each_pair do |key,val|
+ options.each_pair do | key, val |
logger.warn "Ignoring unknown option: #{key.inspect} => #{val.inspect}"
warn "ParallelMinion::Minion Ignoring unknown option: #{key.inspect} => #{val.inspect}"
end
# Run the supplied block of code in the current thread for testing or
# debugging purposes
if @enabled == false
begin
logger.info("Started in the current thread: #{@description}")
logger.benchmark_info("Completed in the current thread: #{@description}", log_exception: @log_exception, metric: @metric) do
- @result = instance_exec(*args, &block)
+ @result = instance_exec(*@arguments, &block)
end
rescue Exception => exc
@exception = exc
end
return
@@ -141,11 +145,11 @@
tags = (logger.tags || []).dup
# Copy current scopes for new thread. Only applicable for AR models
scopes = self.class.current_scopes if defined?(ActiveRecord::Base)
- @thread = Thread.new(*args) do
+ @thread = Thread.new(*@arguments) do
# Copy logging tags from parent thread
logger.tagged(*tags) do
# Set the current thread name to the description for this Minion
# so that all log entries in this thread use this thread name
Thread.current.name = "#{@description}-#{Thread.current.object_id}"
@@ -153,13 +157,13 @@
begin
logger.benchmark_info("Completed #{@description}", log_exception: @log_exception, metric: @metric) do
# Use the current scope for the duration of the task execution
if scopes.nil? || (scopes.size == 0)
- @result = instance_exec(*args, &block)
+ @result = instance_exec(*@arguments, &block)
else
# Each Class to scope requires passing a block to .scoping
- proc = Proc.new { instance_exec(*args, &block) }
+ proc = Proc.new { instance_exec(*@arguments, &block) }
first = scopes.shift
scopes.each {|scope| proc = Proc.new { scope.scoping(&proc) } }
@result = first.scoping(&proc)
end
end