lib/skynet/skynet_partitioners.rb in skynet-0.9.2 vs lib/skynet/skynet_partitioners.rb in skynet-0.9.3

- old
+ new

@@ -49,40 +49,44 @@ class RecombineAndSplit < Partitioners # Tries to be smart about what kind of data its getting, whether array of arrays or array of arrays of arrays. # def self.reduce_partition(post_map_data,new_partitions) return post_map_data unless post_map_data.is_a?(Array) and (not post_map_data.empty?) and post_map_data.first.is_a?(Array) - if not post_map_data.first.first.is_a?(Array) - partitioned_data = post_map_data.flatten - else + ### Why did I do this? It breaks badly. + # if not post_map_data.first.first.is_a?(Array) + # partitioned_data = post_map_data.flatten + # else partitioned_data = post_map_data.inject(Array.new) do |data,part| data += part end - end + # end partitioned_data = Skynet::Partitioners::SimplePartitionData.reduce_partition(partitioned_data, new_partitions) + debug "POST PARTITIONED DATA_SIZE", partitioned_data.size debug "POST PARTITIONED DATA", partitioned_data partitioned_data end end class ArrayDataSplitByFirstEntry < Partitioners # Smarter partitioner for array data, generates simple sum of array[0] # and ensures that all arrays sharing that key go into the same partition. # - def self.reduce_partition(partitioned_data, new_partitions) - partitions = Array.new + def self.reduce_partition(post_map_data, new_partitions) + partitions = [] (0..new_partitions - 1).each { |i| partitions[i] = Array.new } - - partitioned_data.each do |partition| - partition.each do |array| - next unless array.class == Array and array.size == 2 + cnt = 0 + post_map_data.each do |partition| + partition.each do |array| + next unless array.is_a?(Array) and array.size >= 2 if array[0].kind_of?(Fixnum) - key = array[0] - else - key = 0 - array[0].each_byte { |c| key += c } + key = array[0] % new_partitions + elsif array[0].kind_of?(String) + key = array[0].sum % new_partitions + else + cnt += 1 + key = cnt % new_partitions end - partitions[key % new_partitions] << array + partitions[key] << array end end partitions end end