use super::plumbing::*; use super::*; struct BlocksCallback { sizes: S, consumer: C, len: usize, } impl ProducerCallback for BlocksCallback where C: UnindexedConsumer, S: Iterator, { type Output = C::Result; fn callback>(mut self, mut producer: P) -> Self::Output { let mut remaining_len = self.len; let mut consumer = self.consumer; // we need a local variable for the accumulated results // we call the reducer's identity by splitting at 0 let (left_consumer, right_consumer, _) = consumer.split_at(0); let mut leftmost_res = left_consumer.into_folder().complete(); consumer = right_consumer; // now we loop on each block size while remaining_len > 0 && !consumer.full() { // we compute the next block's size let size = self.sizes.next().unwrap_or(std::usize::MAX); let capped_size = remaining_len.min(size); remaining_len -= capped_size; // split the producer let (left_producer, right_producer) = producer.split_at(capped_size); producer = right_producer; // split the consumer let (left_consumer, right_consumer, _) = consumer.split_at(capped_size); consumer = right_consumer; leftmost_res = consumer.to_reducer().reduce( leftmost_res, bridge_producer_consumer(capped_size, left_producer, left_consumer), ); } leftmost_res } } /// `ExponentialBlocks` is a parallel iterator that consumes itself as a sequence /// of parallel blocks of increasing sizes (exponentially). /// /// This struct is created by the [`by_exponential_blocks()`] method on [`IndexedParallelIterator`] /// /// [`by_exponential_blocks()`]: trait.IndexedParallelIterator.html#method.by_exponential_blocks /// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] #[derive(Debug, Clone)] pub struct ExponentialBlocks { base: I, } impl ExponentialBlocks { pub(super) fn new(base: I) -> Self { Self { base } } } impl ParallelIterator for ExponentialBlocks where I: IndexedParallelIterator, { type Item = I::Item; fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer, { let first = crate::current_num_threads(); let callback = BlocksCallback { consumer, sizes: std::iter::successors(Some(first), exponential_size), len: self.base.len(), }; self.base.with_producer(callback) } } fn exponential_size(size: &usize) -> Option { Some(size.saturating_mul(2)) } /// `UniformBlocks` is a parallel iterator that consumes itself as a sequence /// of parallel blocks of constant sizes. /// /// This struct is created by the [`by_uniform_blocks()`] method on [`IndexedParallelIterator`] /// /// [`by_uniform_blocks()`]: trait.IndexedParallelIterator.html#method.by_uniform_blocks /// [`IndexedParallelIterator`]: trait.IndexedParallelIterator.html #[must_use = "iterator adaptors are lazy and do nothing unless consumed"] #[derive(Debug, Clone)] pub struct UniformBlocks { base: I, block_size: usize, } impl UniformBlocks { pub(super) fn new(base: I, block_size: usize) -> Self { Self { base, block_size } } } impl ParallelIterator for UniformBlocks where I: IndexedParallelIterator, { type Item = I::Item; fn drive_unindexed(self, consumer: C) -> C::Result where C: UnindexedConsumer, { let callback = BlocksCallback { consumer, sizes: std::iter::repeat(self.block_size), len: self.base.len(), }; self.base.with_producer(callback) } }