Sha256: 4783e37b37af775c8eba42845f7d90d39647f643a3a9e46eac5bd6b88b5f38cf

Contents?: true

Size: 1.18 KB

Versions: 5

Compression:

Stored size: 1.18 KB

Contents

module Tap
  module Support
    module Joins
      
      # SyncMerge passes the collected results of the inputs to the outputs. The
      # results will not be passed until results from all of the inputs are 
      # available; results are passed in one group.  Similarly, a collision 
      # results if a single input completes twice before the group completes as
      # a whole.
      class SyncMerge < Join
        
        def join(inputs, outputs)
          results = Array.new(inputs.length)
          
          inputs.each do |input|
            input.on_complete do |_result|
              index = inputs.index(_result.key)
              
              unless results[index] == nil
                raise "sync_merge collision... already got a result for #{inputs[index]}"
              end
              results[index] = _result
              
              unless results.include?(nil)
                yield(*results) if block_given?
                outputs.each {|output| enq(output, *results) }
                
                # reset the results array
                results.collect! {|i| nil }
              end
            end
          end
        end
        
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
tap-0.12.0 lib/tap/support/joins/sync_merge.rb
tap-0.12.3 lib/tap/support/joins/sync_merge.rb
tap-0.12.1 lib/tap/support/joins/sync_merge.rb
tap-0.12.2 lib/tap/support/joins/sync_merge.rb
tap-0.12.4 lib/tap/support/joins/sync_merge.rb