lib/tresse.rb in tresse-1.1.3 vs lib/tresse.rb in tresse-1.2.0

- old
+ new

@@ -2,41 +2,27 @@ require 'thread' module Tresse - VERSION = '1.1.3' + VERSION = '1.2.0' class << self def init @work_queue = Queue.new @work_threads = 8.times.collect { |i| make_work_thread } - - @on_error = - lambda do |where, err| - puts "-" * 80 - p where - p err - puts err.backtrace - puts "-" * 80 - end end def enqueue(batch) @work_queue << batch batch.group end - def on_error(&block) - - @on_error = block - end - def max_work_thread_count @work_threads.size end @@ -71,11 +57,11 @@ batch.process rescue => err - @on_error.call(:in_worker_thread, err) + batch.error = err end end end end end @@ -93,10 +79,11 @@ attr_reader :group attr_reader :map_index attr_reader :completed attr_accessor :value + attr_reader :error def initialize(group, block_or_group) @group = group @bog = block_or_group @@ -126,10 +113,16 @@ def complete @completed = true end + + def error=(err) + + @error = err + @group.send(:receive, self) + end end class Group attr_accessor :name @@ -220,11 +213,15 @@ @reduce = [ target, block ] launch - @reduction_queue.pop + r = @reduction_queue.pop + + raise r.error if r.is_a?(Tresse::Batch) + + r end def launch return if @launched == true @@ -233,10 +230,12 @@ @batches.each { |b| Tresse.enqueue(b) } end def receive(batch) - if batch.map_index == 0 + if batch.error + @reduction_queue << batch + elsif batch.map_index == 0 batch.source Tresse.enqueue(batch) elsif m = @maps[batch.map_index] batch.map(*m) Tresse.enqueue(batch)