lib/tresse.rb in tresse-1.1.2 vs lib/tresse.rb in tresse-1.1.3
- old
+ new
@@ -2,11 +2,11 @@
require 'thread'
module Tresse
- VERSION = '1.1.2'
+ VERSION = '1.1.3'
class << self
def init
@@ -91,19 +91,21 @@
class Batch
attr_reader :group
attr_reader :map_index
+ attr_reader :completed
attr_accessor :value
def initialize(group, block_or_group)
@group = group
@bog = block_or_group
@map_index = -1
@value = nil
+ @completed = false
end
def process
@map_index += 1
@@ -119,27 +121,31 @@
r = Tresse.call_block(block, [ @value, self ])
@value = r if type == :map
end
+
+ def complete
+
+ @completed = true
+ end
end
class Group
attr_accessor :name
- #attr_reader :batches
def initialize(name=nil)
@name = name
@batches = []
@launched = false
@maps = [ nil ]
@reduce = nil
- @reduce_batches = []
+ @reduce_mutex = Mutex.new
@reduction_queue = Queue.new
end
#
# sourcing methods
@@ -240,18 +246,21 @@
end
end
def queue_for_reduction(batch)
- @reduce_batches << batch
+ @reduce_mutex.synchronize do
- return if @reduce_batches.size < @batches.size
- return unless @reduce
+ batch.complete
- es = @batches.collect(&:value)
- target, block = @reduce
+ return unless @reduce
+ return if @batches.find { |b| ! b.completed }
- @reduction_queue << es.inject(target, &block)
+ es = @batches.collect(&:value)
+ target, block = @reduce
+
+ @reduction_queue << es.inject(target, &block)
+ end
end
end
end