vendor/umappp/optimize_layout.hpp in umappp-0.1.6 vs vendor/umappp/optimize_layout.hpp in umappp-0.2.0

- old
+ new

@@ -3,10 +3,14 @@ #include <vector> #include <limits> #include <algorithm> #include <cmath> +#ifndef UMAPPP_NO_PARALLEL_OPTIMIZATION +#include <thread> +#include <atomic> +#endif #include "NeighborList.hpp" #include "aarand/aarand.hpp" namespace umappp { @@ -83,202 +87,475 @@ constexpr Float min_gradient = -4; constexpr Float max_gradient = 4; return std::min(std::max(input, min_gradient), max_gradient); } -template<bool batch, typename Float, class Setup, class Rng> -void optimize_sample( - size_t i, +/***************************************************** + ***************** Serial code *********************** + *****************************************************/ + +template<typename Float, class Setup, class Rng> +void optimize_layout( int ndim, - Float* embedding, - Float* buffer, + Float* embedding, Setup& setup, - Float a, - Float b, + Float a, + Float b, Float gamma, - Float alpha, + Float initial_alpha, Rng& rng, - Float epoch + int epoch_limit ) { - const auto& head = setup.head; - const auto& tail = setup.tail; - const auto& epochs_per_sample = setup.epochs_per_sample; - auto& epoch_of_next_sample = setup.epoch_of_next_sample; - auto& epoch_of_next_negative_sample = setup.epoch_of_next_negative_sample; - - const size_t num_obs = head.size(); - const Float negative_sample_rate = setup.negative_sample_rate; + auto& n = setup.current_epoch; + auto num_epochs = setup.total_epochs; + auto limit_epochs = num_epochs; + if (epoch_limit> 0) { + limit_epochs = std::min(epoch_limit, num_epochs); + } + + const size_t num_obs = setup.head.size(); + for (; n < limit_epochs; ++n) { + const Float epoch = n; + const Float alpha = initial_alpha * (1.0 - epoch / num_epochs); - size_t start = (i == 0 ? 0 : setup.head[i-1]), end = setup.head[i]; - Float* left = embedding + i * ndim; + for (size_t i = 0; i < num_obs; ++i) { + size_t start = (i == 0 ? 0 : setup.head[i-1]), end = setup.head[i]; + Float* left = embedding + i * ndim; - for (size_t j = start; j < end; ++j) { - if (epoch_of_next_sample[j] > epoch) { - continue; - } + for (size_t j = start; j < end; ++j) { + if (setup.epoch_of_next_sample[j] > epoch) { + continue; + } - Float* right = embedding + tail[j] * ndim; - Float dist2 = quick_squared_distance(left, right, ndim); - const Float pd2b = std::pow(dist2, b); - const Float grad_coef = (-2 * a * b * pd2b) / (dist2 * (a * pd2b + 1.0)); - { - Float* lcopy = left; - Float* rcopy = right; + { + Float* right = embedding + setup.tail[j] * ndim; + Float dist2 = quick_squared_distance(left, right, ndim); + const Float pd2b = std::pow(dist2, b); + const Float grad_coef = (-2 * a * b * pd2b) / (dist2 * (a * pd2b + 1.0)); - for (int d = 0; d < ndim; ++d, ++lcopy, ++rcopy) { - Float gradient = alpha * clamp(grad_coef * (*lcopy - *rcopy)); - if constexpr(!batch) { - *lcopy += gradient; - *rcopy -= gradient; - } else { - // Doubling as we'll assume symmetry from the same - // force applied by the right node. This allows us to - // avoid worrying about accounting for modifications to - // the right node. - buffer[d] += 2 * gradient; + Float* lcopy = left; + for (int d = 0; d < ndim; ++d, ++lcopy, ++right) { + Float gradient = alpha * clamp(grad_coef * (*lcopy - *right)); + *lcopy += gradient; + *right -= gradient; + } } + + // Remember that 'epochs_per_negative_sample' is defined as 'epochs_per_sample[j] / negative_sample_rate'. + // We just use it inline below rather than defining a new variable and suffering floating-point round-off. + const size_t num_neg_samples = (epoch - setup.epoch_of_next_negative_sample[j]) * + setup.negative_sample_rate / setup.epochs_per_sample[j]; // i.e., 1/epochs_per_negative_sample. + + for (size_t p = 0; p < num_neg_samples; ++p) { + size_t sampled = aarand::discrete_uniform(rng, num_obs); + if (sampled == i) { + continue; + } + + const Float* right = embedding + sampled * ndim; + Float dist2 = quick_squared_distance(left, right, ndim); + const Float grad_coef = 2 * gamma * b / ((0.001 + dist2) * (a * std::pow(dist2, b) + 1.0)); + + Float* lcopy = left; + for (int d = 0; d < ndim; ++d, ++lcopy, ++right) { + *lcopy += alpha * clamp(grad_coef * (*lcopy - *right)); + } + } + + setup.epoch_of_next_sample[j] += setup.epochs_per_sample[j]; + + // The update to 'epoch_of_next_negative_sample' involves adding + // 'num_neg_samples * epochs_per_negative_sample', which eventually boils + // down to setting epoch_of_next_negative_sample to 'epoch'. + setup.epoch_of_next_negative_sample[j] = epoch; } } + } - // Here, we divide by epochs_per_negative_sample, defined as epochs_per_sample[j] / negative_sample_rate. - const size_t num_neg_samples = (epoch - epoch_of_next_negative_sample[j]) * negative_sample_rate / epochs_per_sample[j]; + return; +} - for (size_t p = 0; p < num_neg_samples; ++p) { - size_t sampled = aarand::discrete_uniform(rng, num_obs); - if (sampled == i) { +/***************************************************** + **************** Parallel code ********************** + *****************************************************/ + +#ifndef UMAPPP_NO_PARALLEL_OPTIMIZATION +template<class Float, class Setup> +struct BusyWaiterThread { +public: + std::vector<size_t> selections; + std::vector<unsigned char> skips; + size_t observation; + Float alpha; + +private: + int ndim; + Float* embedding; + const Setup* setup; + Float a; + Float b; + Float gamma; + + std::vector<Float> self_modified; + +private: + std::thread pool; + std::atomic<bool> ready = false; + bool finished = false; + bool active = false; + +public: + void run() { + ready.store(true, std::memory_order_release); + } + + void wait() { + while (ready.load(std::memory_order_acquire)) { + ; + } + } + + void migrate_parameters(BusyWaiterThread& src) { + selections.swap(src.selections); + skips.swap(src.skips); + alpha = src.alpha; + observation = src.observation; + } + + void transfer_coordinates() { + std::copy(self_modified.begin(), self_modified.end(), embedding + observation * ndim); + } + +public: + void run_direct() { + auto seIt = selections.begin(); + auto skIt = skips.begin(); + const size_t i = observation; + const size_t start = (i == 0 ? 0 : setup->head[i-1]), end = setup->head[i]; + + // Copying it over into a thread-local buffer to avoid false sharing. + // We don't bother doing this for the neighbors, though, as it's + // tedious to make sure that the modified values are available during negative sampling. + // (This isn't a problem for the self, as the self cannot be its own negative sample.) + { + const Float* left = embedding + i * ndim; + std::copy(left, left + ndim, self_modified.data()); + } + + for (size_t j = start; j < end; ++j) { + if (*(skIt++)) { continue; } - Float* right = embedding + sampled * ndim; - Float dist2 = quick_squared_distance(left, right, ndim); - const Float grad_coef = 2 * gamma * b / ((0.001 + dist2) * (a * std::pow(dist2, b) + 1.0)); { - Float* lcopy = left; - const Float* rcopy = right; - for (int d = 0; d < ndim; ++d, ++lcopy, ++rcopy) { - Float gradient = alpha * clamp(grad_coef * (*lcopy - *rcopy)); - if constexpr(!batch) { - *lcopy += gradient; - } else { - buffer[d] += gradient; - } + Float* left = self_modified.data(); + Float* right = embedding + setup->tail[j] * ndim; + + Float dist2 = quick_squared_distance(left, right, ndim); + const Float pd2b = std::pow(dist2, b); + const Float grad_coef = (-2 * a * b * pd2b) / (dist2 * (a * pd2b + 1.0)); + + for (int d = 0; d < ndim; ++d, ++left, ++right) { + Float gradient = alpha * clamp(grad_coef * (*left - *right)); + *left += gradient; + *right -= gradient; } } - } - epoch_of_next_sample[j] += epochs_per_sample[j]; + while (seIt != selections.end() && *seIt != -1) { + Float* left = self_modified.data(); + const Float* right = embedding + (*seIt) * ndim; - // The update to epoch_of_next_negative_sample involves adding - // num_neg_samples * epochs_per_negative_sample, which eventually boils - // down to setting epoch_of_next_negative_sample to 'n'. - epoch_of_next_negative_sample[j] = epoch; + Float dist2 = quick_squared_distance(left, right, ndim); + const Float grad_coef = 2 * gamma * b / ((0.001 + dist2) * (a * std::pow(dist2, b) + 1.0)); + + for (int d = 0; d < ndim; ++d, ++left, ++right) { + *left += alpha * clamp(grad_coef * (*left - *right)); + } + ++seIt; + } + ++seIt; // get past the -1. + } } -} -template<typename Float, class Setup, class Rng> -void optimize_layout( - int ndim, - Float* embedding, - Setup& setup, - Float a, - Float b, - Float gamma, - Float initial_alpha, - Rng& rng, - int epoch_limit -) { - auto& n = setup.current_epoch; - auto num_epochs = setup.total_epochs; - auto limit_epochs = num_epochs; - if (epoch_limit> 0) { - limit_epochs = std::min(epoch_limit, num_epochs); +private: + void loop() { + while (true) { + while (!ready.load(std::memory_order_acquire)) { + ; + } + if (finished) { + break; + } + run_direct(); + ready.store(false, std::memory_order_release); + } } - for (; n < limit_epochs; ++n) { - const Float epoch = n; - const Float alpha = initial_alpha * (1.0 - epoch / num_epochs); - for (size_t i = 0; i < setup.head.size(); ++i) { - optimize_sample<false>(i, ndim, embedding, static_cast<Float*>(NULL), setup, a, b, gamma, alpha, rng, epoch); +public: + BusyWaiterThread() {} + + BusyWaiterThread(int ndim_, Float* embedding_, Setup& setup_, Float a_, Float b_, Float gamma_) : + ndim(ndim_), + embedding(embedding_), + setup(&setup_), + a(a_), + b(b_), + gamma(gamma_), + self_modified(ndim) + {} + + void start() { + active = true; + pool = std::thread(&BusyWaiterThread::loop, this); + } + +public: + ~BusyWaiterThread() { + if (active) { + finished = true; + ready.store(true, std::memory_order_release); + pool.join(); } } - return; -} + BusyWaiterThread(BusyWaiterThread&&) = default; + BusyWaiterThread& operator=(BusyWaiterThread&&) = default; -template<typename Float, class Setup, class SeedFunction, class EngineFunction> -inline void optimize_layout_batched( + BusyWaiterThread(const BusyWaiterThread& src) : + selections(src.selections), + skips(src.skips), + observation(src.observation), + + ndim(src.ndim), + embedding(src.embedding), + setup(src.setup), + a(src.a), + b(src.b), + gamma(src.gamma), + alpha(src.alpha), + + self_modified(src.self_modified) + {} + + BusyWaiterThread& operator=(const BusyWaiterThread& src) { + selections = src.selections; + skips = src.skips; + observation = src.observation; + + ndim = src.ndim; + embedding = src.embedding; + setup = src.setup; + a = src.a; + b = src.b; + gamma = src.gamma; + alpha = src.alpha; + + self_modified = src.self_modified; + } +}; +#endif + +//#define PRINT false + +template<typename Float, class Setup, class Rng> +void optimize_layout_parallel( int ndim, Float* embedding, Setup& setup, Float a, Float b, Float gamma, Float initial_alpha, - SeedFunction seeder, - EngineFunction creator, + Rng& rng, int epoch_limit, int nthreads ) { +#ifndef UMAPPP_NO_PARALLEL_OPTIMIZATION auto& n = setup.current_epoch; auto num_epochs = setup.total_epochs; auto limit_epochs = num_epochs; - if (epoch_limit > 0) { + if (epoch_limit> 0) { limit_epochs = std::min(epoch_limit, num_epochs); } const size_t num_obs = setup.head.size(); - std::vector<decltype(seeder())> seeds(num_obs); - std::vector<Float> replace_buffer(num_obs * ndim); - Float* replacement = replace_buffer.data(); - bool using_replacement = false; + std::vector<int> last_touched(num_obs); + std::vector<unsigned char> touch_type(num_obs); + // We run some things directly in this main thread to avoid excessive busy-waiting. + BusyWaiterThread<Float, Setup> staging(ndim, embedding, setup, a, b, gamma); + + int nthreadsm1 = nthreads - 1; + std::vector<BusyWaiterThread<Float, Setup> > pool; + pool.reserve(nthreadsm1); + for (int t = 0; t < nthreadsm1; ++t) { + pool.emplace_back(ndim, embedding, setup, a, b, gamma); + pool.back().start(); + } + + std::vector<int> jobs_in_progress; + for (; n < limit_epochs; ++n) { const Float epoch = n; const Float alpha = initial_alpha * (1.0 - epoch / num_epochs); - // Fill the seeds. - for (auto& s : seeds) { - s = seeder(); - } + int base_iteration = 0; + std::fill(last_touched.begin(), last_touched.end(), -1); - // Input and output alternate between epochs, to avoid the need for a - // copy operation on the entire embedding at the end of each epoch. - Float* reference = (using_replacement ? replacement : embedding); - Float* output = (using_replacement ? embedding : replacement); - using_replacement = !using_replacement; + size_t i = 0; + while (i < num_obs) { + bool is_clear = true; +// if (PRINT) { std::cout << "size is " << jobs_in_progress.size() << std::endl; } -#ifndef UMAPPP_CUSTOM_PARALLEL - #pragma omp parallel num_threads(nthreads) - { - std::vector<Float> buffer(ndim); - #pragma omp for - for (size_t i = 0; i < setup.head.size(); ++i) { -#else - UMAPPP_CUSTOM_PARALLEL(setup.head.size(), [&](size_t first, size_t last) -> void { - std::vector<Float> buffer(ndim); - for (size_t i = first; i < last; ++i) { -#endif + for (int t = jobs_in_progress.size(); t < nthreads && i < num_obs; ++t) { + staging.alpha = alpha; + staging.observation = i; - size_t shift = i * ndim; - std::copy(reference + shift, reference + shift + ndim, buffer.data()); - auto rng = creator(seeds[i]); - optimize_sample<true>(i, ndim, reference, buffer.data(), setup, a, b, gamma, alpha, rng, epoch); - std::copy(buffer.begin(), buffer.end(), output + shift); + // Tapping the RNG here in the serial section. + auto& selections = staging.selections; + selections.clear(); + auto& skips = staging.skips; + skips.clear(); -#ifndef UMAPPP_CUSTOM_PARALLEL + const int self_iteration = i; + constexpr unsigned char READONLY = 0; + constexpr unsigned char WRITE = 1; + + { + auto& touched = last_touched[i]; + auto& ttype = touch_type[i]; +// if (PRINT) { std::cout << "SELF: " << i << ": " << touched << " (" << ttype << ")" << std::endl; } + if (touched >= base_iteration) { + is_clear = false; +// if (PRINT) { std::cout << "=== FAILED! ===" << std::endl; } + } + touched = self_iteration; + ttype = WRITE; + } + + const size_t start = (i == 0 ? 0 : setup.head[i-1]), end = setup.head[i]; + for (size_t j = start; j < end; ++j) { + bool skip = setup.epoch_of_next_sample[j] > epoch; + skips.push_back(skip); + if (skip) { + continue; + } + + { + auto neighbor = setup.tail[j]; + auto& touched = last_touched[neighbor]; + auto& ttype = touch_type[neighbor]; +// if (PRINT) { std::cout << "\tNEIGHBOR: " << neighbor << ": " << touched << " (" << ttype << ")" << std::endl; } + if (touched >= base_iteration) { + if (touched != self_iteration) { + is_clear = false; +// if (PRINT) { std::cout << "=== FAILED! ===" << std::endl; } + } + } + touched = self_iteration; + ttype = WRITE; + } + + const size_t num_neg_samples = (epoch - setup.epoch_of_next_negative_sample[j]) * + setup.negative_sample_rate / setup.epochs_per_sample[j]; + + for (size_t p = 0; p < num_neg_samples; ++p) { + size_t sampled = aarand::discrete_uniform(rng, num_obs); + if (sampled == i) { + continue; + } + selections.push_back(sampled); + + auto& touched = last_touched[sampled]; + auto& ttype = touch_type[sampled]; +// if (PRINT) { std::cout << "\t\tSAMPLED: " << sampled << ": " << touched << " (" << ttype << ")" << std::endl; } + if (touched >= base_iteration) { + if (touched != self_iteration) { + if (ttype == WRITE) { + is_clear = false; +// if (PRINT) { std::cout << "=== FAILED! ===" << std::endl; } + } + } + } else { + // Only updating if it wasn't touched by a previous thread in this + // round of thread iterations. + ttype = READONLY; + touched = self_iteration; + } + } + + selections.push_back(-1); + + setup.epoch_of_next_sample[j] += setup.epochs_per_sample[j]; + setup.epoch_of_next_negative_sample[j] = epoch; + } + + if (!is_clear) { + // As we only updated the access for 'sampled' to READONLY + // if they weren't touched by another thread, we need to go + // through and manually update them now that the next round + // of thread_iterations will use 'self_iteration' as the + // 'base_iteration'. This ensures that the flags are properly + // set for the next round, under the expectation that the + // pending thread becomes the first thread. + for (auto s : selections) { + if (s != -1) { + auto& touched = last_touched[s]; + if (touched != self_iteration) { + touched = self_iteration; + touch_type[s] = READONLY; + } + } + } + break; + } + + // Submitting if it's not the final job, otherwise just running it directly. + // This avoids a busy-wait on the main thread that uses up an extra CPU. + if (t < nthreadsm1) { + const int thread_index = i % nthreadsm1; + pool[thread_index].migrate_parameters(staging); + pool[thread_index].run(); + jobs_in_progress.push_back(thread_index); + } else { + staging.run_direct(); + staging.transfer_coordinates(); + } + + ++i; } - } -#else + + // Waiting for all the jobs that were submitted. + for (auto job : jobs_in_progress) { + pool[job].wait(); + pool[job].transfer_coordinates(); } - }, nthreads); -#endif - } + jobs_in_progress.clear(); - if (using_replacement) { - std::copy(replace_buffer.begin(), replace_buffer.end(), embedding); +// if (PRINT) { std::cout << "###################### OK ##########################" << std::endl; } + + base_iteration = i; + if (!is_clear) { + const int thread_index = i % nthreadsm1; + pool[thread_index].migrate_parameters(staging); + pool[thread_index].run(); + jobs_in_progress.push_back(thread_index); + ++i; + } + } + + for (auto job : jobs_in_progress) { + pool[job].wait(); + pool[job].transfer_coordinates(); + } + jobs_in_progress.clear(); } return; +#else + throw std::runtime_error("umappp was not compiled with support for parallel optimization"); +#endif } } #endif