lib/tresse.rb in tresse-1.1.2 vs lib/tresse.rb in tresse-1.1.3

- old
+ new

@@ -2,11 +2,11 @@ require 'thread' module Tresse - VERSION = '1.1.2' + VERSION = '1.1.3' class << self def init @@ -91,19 +91,21 @@ class Batch attr_reader :group attr_reader :map_index + attr_reader :completed attr_accessor :value def initialize(group, block_or_group) @group = group @bog = block_or_group @map_index = -1 @value = nil + @completed = false end def process @map_index += 1 @@ -119,27 +121,31 @@ r = Tresse.call_block(block, [ @value, self ]) @value = r if type == :map end + + def complete + + @completed = true + end end class Group attr_accessor :name - #attr_reader :batches def initialize(name=nil) @name = name @batches = [] @launched = false @maps = [ nil ] @reduce = nil - @reduce_batches = [] + @reduce_mutex = Mutex.new @reduction_queue = Queue.new end # # sourcing methods @@ -240,18 +246,21 @@ end end def queue_for_reduction(batch) - @reduce_batches << batch + @reduce_mutex.synchronize do - return if @reduce_batches.size < @batches.size - return unless @reduce + batch.complete - es = @batches.collect(&:value) - target, block = @reduce + return unless @reduce + return if @batches.find { |b| ! b.completed } - @reduction_queue << es.inject(target, &block) + es = @batches.collect(&:value) + target, block = @reduce + + @reduction_queue << es.inject(target, &block) + end end end end