lib/workers/task_group.rb in workers-0.3.0 vs lib/workers/task_group.rb in workers-0.4.0
- old
+ new
@@ -13,22 +13,22 @@
@internal_lock = Mutex.new
@external_lock = Mutex.new
@finished_count = 0
@conditional = ConditionVariable.new
- return nil
+ nil
end
def add(options = {}, &block)
state!(:initialized)
options[:finished] = method(:finished)
options[:perform] ||= block
@tasks << Workers::Task.new(options)
- return nil
+ nil
end
def run
state!(:initialized)
@@ -43,19 +43,19 @@
end
@conditional.wait(@internal_lock)
end
- return @tasks.all? { |t| t.succeeded? }
+ @tasks.all? { |t| t.succeeded? }
end
def successes
- return @tasks.select { |t| t.succeeded? }
+ @tasks.select { |t| t.succeeded? }
end
def failures
- return @tasks.select { |t| t.failed? }
+ @tasks.select { |t| t.failed? }
end
def map(inputs, options = {}, &block)
inputs.each do |input|
add(:input => input, :max_tries => options[:max_tries]) do |i|
@@ -68,39 +68,39 @@
if (failure = failures[0])
a = failure.input.inspect
m = failure.exception.message
b = failure.exception.backtrace.join("\n")
- raise "At least one task failed. ARGS=#{a}, TRACE=#{m}\n#{b}\n----------\n"
+ raise Workers::FailedTaskError, "At least one task failed. ARGS=#{a}, TRACE=#{m}\n#{b}\n----------\n"
end
- return tasks.map { |t| t.result }
+ tasks.map { |t| t.result }
end
# Convenient mutex to be used by a users's task code that needs serializing.
# This should NEVER be used by TaskGroup code (use the @internal_lock instead);
def synchronize(&block)
@external_lock.synchronize { block.call }
- return nil
+ nil
end
private
def state!(*args)
unless args.include?(@state)
- raise "Invalid state (#{@state})."
+ raise Workers::InvalidStateError, "Invalid state (#{@state})."
end
- return nil
+ nil
end
def finished(task)
@internal_lock.synchronize do
@finished_count += 1
@conditional.signal if @finished_count >= @tasks.count
end
- return nil
+ nil
end
end
end