lib/tresse.rb in tresse-1.1.3 vs lib/tresse.rb in tresse-1.2.0
- old
+ new
@@ -2,41 +2,27 @@
require 'thread'
module Tresse
- VERSION = '1.1.3'
+ VERSION = '1.2.0'
class << self
def init
@work_queue = Queue.new
@work_threads = 8.times.collect { |i| make_work_thread }
-
- @on_error =
- lambda do |where, err|
- puts "-" * 80
- p where
- p err
- puts err.backtrace
- puts "-" * 80
- end
end
def enqueue(batch)
@work_queue << batch
batch.group
end
- def on_error(&block)
-
- @on_error = block
- end
-
def max_work_thread_count
@work_threads.size
end
@@ -71,11 +57,11 @@
batch.process
rescue => err
- @on_error.call(:in_worker_thread, err)
+ batch.error = err
end
end
end
end
end
@@ -93,10 +79,11 @@
attr_reader :group
attr_reader :map_index
attr_reader :completed
attr_accessor :value
+ attr_reader :error
def initialize(group, block_or_group)
@group = group
@bog = block_or_group
@@ -126,10 +113,16 @@
def complete
@completed = true
end
+
+ def error=(err)
+
+ @error = err
+ @group.send(:receive, self)
+ end
end
class Group
attr_accessor :name
@@ -220,11 +213,15 @@
@reduce = [ target, block ]
launch
- @reduction_queue.pop
+ r = @reduction_queue.pop
+
+ raise r.error if r.is_a?(Tresse::Batch)
+
+ r
end
def launch
return if @launched == true
@@ -233,10 +230,12 @@
@batches.each { |b| Tresse.enqueue(b) }
end
def receive(batch)
- if batch.map_index == 0
+ if batch.error
+ @reduction_queue << batch
+ elsif batch.map_index == 0
batch.source
Tresse.enqueue(batch)
elsif m = @maps[batch.map_index]
batch.map(*m)
Tresse.enqueue(batch)