lib/affinity_propagation/calculator.rb in affinity_propagation-0.1.0 vs lib/affinity_propagation/calculator.rb in affinity_propagation-0.2.0

- old
+ new

@@ -1,29 +1,10 @@ require 'matrix' +require 'concurrent' +require 'thread' module AffinityPropagation - refine Array do - def median - relevant_elements = if size % 2 == 0 - # Even number of items in this list => let's get the middle two and return their mean - slice(size / 2, 2) - else - slice(size / 2, 1) - end - - relevant_elements.sum / relevant_elements.size - end - end - - refine Matrix do - def to_matlab - output = row_vectors.map(&:to_a).map { |row| row.join(', ') }.join('; ') - - "[#{output}]" - end - end - class Calculator using AffinityPropagation LAMBDA = 0.75 @@ -31,11 +12,11 @@ def initialize(data, lambda: LAMBDA, &block) @data = data @lambda = lambda - raise 'no block provide to calculate similarities within data!' unless block_given? + raise 'no block provided to calculate similarities within data!' unless block_given? @similarities = similarity_matrix(&block) reset end @@ -68,81 +49,136 @@ clusters end def run(iterations: 100, stable_iterations: 10) - iterate while @total_iterations < iterations && @stable_cluster_iterations < stable_iterations + while @total_iterations < iterations && @stable_cluster_iterations < stable_iterations + iterate + + yield(@total_iterations, @stable_cluster_iterations) if block_given? + end end private + def median(array) + relevant_elements = if array.size % 2 == 0 + # Even number of items in this list => let's get the middle two and return their mean + array.sort.slice(array.size / 2, 2) + else + array.sort.slice(array.size / 2, 1) + end + + relevant_elements.sum / relevant_elements.size + end + def similarity_matrix(&block) similarity_array = [] - similarities = Matrix.build(@data.size, @data.size) do |row_idx, col_idx| + similarities_future = Matrix.build(@data.size, @data.size) do |row_idx, col_idx| exemplar = @data[row_idx] datum = @data[col_idx] - similarity = block.call(datum, exemplar) - similarity_array << similarity - similarity + similarity_future = Concurrent::Future.execute(executor: :fast) { block.call(datum, exemplar)} + similarity_array << similarity_future + + similarity_future end - median_similarity = similarity_array.median + while similarity_array.any?(&:pending?) + sleep 0.1 + end - # Matrices can't be modified once created so we have to use this hack + similarity_array.map!(&:value) + similarities = similarities_future.map(&:value) + + median_similarity = median(similarity_array) + (0...@data.size).each { |idx| similarities.send(:[]=, idx, idx, median_similarity) } similarities end def dampen(new_value, existing_value) (1 - @lambda) * new_value + @lambda * existing_value end def responsibility_matrix - Matrix.build(@similarities.row_count, @similarities.column_count) do |row_idx, col_idx| + responsibility_futures = [] + + responsibilities_future = Matrix.build(@similarities.row_count, @similarities.column_count) do |row_idx, col_idx| exemplar_idx = row_idx datum_idx = col_idx + current_similarity = @similarities[row_idx, col_idx] + current_responsibility = @responsibilities[exemplar_idx, datum_idx] + availability_column = @availabilities.column(col_idx).to_a - availability_column.slice!(exemplar_idx) similarity_column = @similarities.column(col_idx).to_a - similarity_column.slice!(exemplar_idx) - availability_plus_similarity = [] - availability_column.zip(similarity_column) { |data| availability_plus_similarity << data.sum } + responsibility_future = Concurrent::Future.execute(executor: :fast) do + availability_column.slice!(exemplar_idx) + similarity_column.slice!(exemplar_idx) - dampen(@similarities[row_idx, col_idx] - availability_plus_similarity.max, @responsibilities[exemplar_idx, datum_idx]) + availability_plus_similarity = [] + availability_column.zip(similarity_column) { |data| availability_plus_similarity << data.sum } + + dampen(current_similarity - availability_plus_similarity.max, current_responsibility) + end + + responsibility_futures << responsibility_future + responsibility_future end + + while responsibility_futures.any?(&:pending?) + sleep 0.1 + end + + responsibilities_future.map(&:value) end def availability_matrix - Matrix.build(@responsibilities.row_count, @responsibilities.column_count) do |row_idx, col_idx| + availability_futures = [] + + availabilities_future = Matrix.build(@responsibilities.row_count, @responsibilities.column_count) do |row_idx, col_idx| exemplar_idx = row_idx datum_idx = col_idx responsibility_column = @responsibilities.row(exemplar_idx).to_a - if exemplar_idx == datum_idx - # self-availability - responsibility_column.slice!(exemplar_idx) + current_availability = @availabilities[exemplar_idx, datum_idx] + current_responsibility = @responsibilities[exemplar_idx, exemplar_idx] - dampen(responsibility_column.inject(0) { |sum, item| sum += [0, item].max },@availabilities[exemplar_idx, datum_idx]) - else - self_responsibility = @responsibilities[exemplar_idx, exemplar_idx] - if datum_idx > exemplar_idx - # Slice out the datum index first since in this case it won't affect the exemplar index - responsibility_column.slice!(datum_idx) + availability_future = Concurrent::Future.execute(executor: :fast) do + if exemplar_idx == datum_idx + # self-availability responsibility_column.slice!(exemplar_idx) + + dampen(responsibility_column.inject(0) { |sum, item| sum += [0, item].max }, current_availability) else - responsibility_column.slice!(exemplar_idx) - responsibility_column.slice!(datum_idx) - end + self_responsibility = current_responsibility + if datum_idx > exemplar_idx + # Slice out the datum index first since in this case it won't affect the exemplar index + responsibility_column.slice!(datum_idx) + responsibility_column.slice!(exemplar_idx) + else + responsibility_column.slice!(exemplar_idx) + responsibility_column.slice!(datum_idx) + end - responsibility_column_sum = responsibility_column.inject(0) { |sum, item| sum += [0, item].max } + responsibility_column_sum = responsibility_column.inject(0) { |sum, item| sum += [0, item].max } - dampen([0, self_responsibility + responsibility_column_sum].min, @availabilities[exemplar_idx, datum_idx]) + dampen([0, self_responsibility + responsibility_column_sum].min, current_availability) + end end + + availability_futures << availability_future + availability_future end + + while availability_futures.any?(&:pending?) + sleep 0.1 + end + + availabilities_future.map(&:value) end def identify_raw_clusters clusters = {}