lib/workers/task_group.rb in workers-0.3.0 vs lib/workers/task_group.rb in workers-0.4.0

- old
+ new

@@ -13,22 +13,22 @@ @internal_lock = Mutex.new @external_lock = Mutex.new @finished_count = 0 @conditional = ConditionVariable.new - return nil + nil end def add(options = {}, &block) state!(:initialized) options[:finished] = method(:finished) options[:perform] ||= block @tasks << Workers::Task.new(options) - return nil + nil end def run state!(:initialized) @@ -43,19 +43,19 @@ end @conditional.wait(@internal_lock) end - return @tasks.all? { |t| t.succeeded? } + @tasks.all? { |t| t.succeeded? } end def successes - return @tasks.select { |t| t.succeeded? } + @tasks.select { |t| t.succeeded? } end def failures - return @tasks.select { |t| t.failed? } + @tasks.select { |t| t.failed? } end def map(inputs, options = {}, &block) inputs.each do |input| add(:input => input, :max_tries => options[:max_tries]) do |i| @@ -68,39 +68,39 @@ if (failure = failures[0]) a = failure.input.inspect m = failure.exception.message b = failure.exception.backtrace.join("\n") - raise "At least one task failed. ARGS=#{a}, TRACE=#{m}\n#{b}\n----------\n" + raise Workers::FailedTaskError, "At least one task failed. ARGS=#{a}, TRACE=#{m}\n#{b}\n----------\n" end - return tasks.map { |t| t.result } + tasks.map { |t| t.result } end # Convenient mutex to be used by a users's task code that needs serializing. # This should NEVER be used by TaskGroup code (use the @internal_lock instead); def synchronize(&block) @external_lock.synchronize { block.call } - return nil + nil end private def state!(*args) unless args.include?(@state) - raise "Invalid state (#{@state})." + raise Workers::InvalidStateError, "Invalid state (#{@state})." end - return nil + nil end def finished(task) @internal_lock.synchronize do @finished_count += 1 @conditional.signal if @finished_count >= @tasks.count end - return nil + nil end end end