use super::noop::NoopConsumer; use super::plumbing::{Consumer, Folder, Reducer, UnindexedConsumer}; use super::{IntoParallelIterator, ParallelExtend, ParallelIterator}; use either::Either; use std::borrow::Cow; use std::collections::LinkedList; use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet}; use std::collections::{BinaryHeap, VecDeque}; use std::ffi::{OsStr, OsString}; use std::hash::{BuildHasher, Hash}; /// Performs a generic `par_extend` by collecting to a `LinkedList>` in /// parallel, then extending the collection sequentially. macro_rules! extend { ($self:ident, $par_iter:ident) => { extend!($self <- fast_collect($par_iter)) }; ($self:ident <- $vecs:expr) => { match $vecs { Either::Left(vec) => $self.extend(vec), Either::Right(list) => { for vec in list { $self.extend(vec); } } } }; } macro_rules! extend_reserved { ($self:ident, $par_iter:ident, $len:ident) => { let vecs = fast_collect($par_iter); $self.reserve($len(&vecs)); extend!($self <- vecs) }; ($self:ident, $par_iter:ident) => { extend_reserved!($self, $par_iter, len) }; } /// Computes the total length of a `fast_collect` result. fn len(vecs: &Either, LinkedList>>) -> usize { match vecs { Either::Left(vec) => vec.len(), Either::Right(list) => list.iter().map(Vec::len).sum(), } } /// Computes the total string length of a `fast_collect` result. fn string_len>(vecs: &Either, LinkedList>>) -> usize { let strs = match vecs { Either::Left(vec) => Either::Left(vec.iter()), Either::Right(list) => Either::Right(list.iter().flatten()), }; strs.map(AsRef::as_ref).map(str::len).sum() } /// Computes the total OS-string length of a `fast_collect` result. fn osstring_len>(vecs: &Either, LinkedList>>) -> usize { let osstrs = match vecs { Either::Left(vec) => Either::Left(vec.iter()), Either::Right(list) => Either::Right(list.iter().flatten()), }; osstrs.map(AsRef::as_ref).map(OsStr::len).sum() } pub(super) fn fast_collect(pi: I) -> Either, LinkedList>> where I: IntoParallelIterator, T: Send, { let par_iter = pi.into_par_iter(); match par_iter.opt_len() { Some(len) => { // Pseudo-specialization. See impl of ParallelExtend for Vec for more details. let mut vec = Vec::new(); super::collect::special_extend(par_iter, len, &mut vec); Either::Left(vec) } None => Either::Right(par_iter.drive_unindexed(ListVecConsumer)), } } struct ListVecConsumer; struct ListVecFolder { vec: Vec, } impl Consumer for ListVecConsumer { type Folder = ListVecFolder; type Reducer = ListReducer; type Result = LinkedList>; fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { (Self, Self, ListReducer) } fn into_folder(self) -> Self::Folder { ListVecFolder { vec: Vec::new() } } fn full(&self) -> bool { false } } impl UnindexedConsumer for ListVecConsumer { fn split_off_left(&self) -> Self { Self } fn to_reducer(&self) -> Self::Reducer { ListReducer } } impl Folder for ListVecFolder { type Result = LinkedList>; fn consume(mut self, item: T) -> Self { self.vec.push(item); self } fn consume_iter(mut self, iter: I) -> Self where I: IntoIterator, { self.vec.extend(iter); self } fn complete(self) -> Self::Result { let mut list = LinkedList::new(); if !self.vec.is_empty() { list.push_back(self.vec); } list } fn full(&self) -> bool { false } } /// Extends a binary heap with items from a parallel iterator. impl ParallelExtend for BinaryHeap where T: Ord + Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter); } } /// Extends a binary heap with copied items from a parallel iterator. impl<'a, T> ParallelExtend<&'a T> for BinaryHeap where T: 'a + Copy + Ord + Send + Sync, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter); } } /// Extends a B-tree map with items from a parallel iterator. impl ParallelExtend<(K, V)> for BTreeMap where K: Ord + Send, V: Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend!(self, par_iter); } } /// Extends a B-tree map with copied items from a parallel iterator. impl<'a, K: 'a, V: 'a> ParallelExtend<(&'a K, &'a V)> for BTreeMap where K: Copy + Ord + Send + Sync, V: Copy + Send + Sync, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend!(self, par_iter); } } /// Extends a B-tree set with items from a parallel iterator. impl ParallelExtend for BTreeSet where T: Ord + Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend!(self, par_iter); } } /// Extends a B-tree set with copied items from a parallel iterator. impl<'a, T> ParallelExtend<&'a T> for BTreeSet where T: 'a + Copy + Ord + Send + Sync, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend!(self, par_iter); } } /// Extends a hash map with items from a parallel iterator. impl ParallelExtend<(K, V)> for HashMap where K: Eq + Hash + Send, V: Send, S: BuildHasher + Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { // See the map_collect benchmarks in rayon-demo for different strategies. extend_reserved!(self, par_iter); } } /// Extends a hash map with copied items from a parallel iterator. impl<'a, K: 'a, V: 'a, S> ParallelExtend<(&'a K, &'a V)> for HashMap where K: Copy + Eq + Hash + Send + Sync, V: Copy + Send + Sync, S: BuildHasher + Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter); } } /// Extends a hash set with items from a parallel iterator. impl ParallelExtend for HashSet where T: Eq + Hash + Send, S: BuildHasher + Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter); } } /// Extends a hash set with copied items from a parallel iterator. impl<'a, T, S> ParallelExtend<&'a T> for HashSet where T: 'a + Copy + Eq + Hash + Send + Sync, S: BuildHasher + Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter); } } /// Extends a linked list with items from a parallel iterator. impl ParallelExtend for LinkedList where T: Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { let mut list = par_iter.into_par_iter().drive_unindexed(ListConsumer); self.append(&mut list); } } /// Extends a linked list with copied items from a parallel iterator. impl<'a, T> ParallelExtend<&'a T> for LinkedList where T: 'a + Copy + Send + Sync, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { self.par_extend(par_iter.into_par_iter().copied()) } } struct ListConsumer; struct ListFolder { list: LinkedList, } struct ListReducer; impl Consumer for ListConsumer { type Folder = ListFolder; type Reducer = ListReducer; type Result = LinkedList; fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { (Self, Self, ListReducer) } fn into_folder(self) -> Self::Folder { ListFolder { list: LinkedList::new(), } } fn full(&self) -> bool { false } } impl UnindexedConsumer for ListConsumer { fn split_off_left(&self) -> Self { Self } fn to_reducer(&self) -> Self::Reducer { ListReducer } } impl Folder for ListFolder { type Result = LinkedList; fn consume(mut self, item: T) -> Self { self.list.push_back(item); self } fn consume_iter(mut self, iter: I) -> Self where I: IntoIterator, { self.list.extend(iter); self } fn complete(self) -> Self::Result { self.list } fn full(&self) -> bool { false } } impl Reducer> for ListReducer { fn reduce(self, mut left: LinkedList, mut right: LinkedList) -> LinkedList { left.append(&mut right); left } } /// Extends an OS-string with string slices from a parallel iterator. impl<'a> ParallelExtend<&'a OsStr> for OsString { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter, osstring_len); } } /// Extends an OS-string with strings from a parallel iterator. impl ParallelExtend for OsString { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter, osstring_len); } } /// Extends an OS-string with string slices from a parallel iterator. impl<'a> ParallelExtend> for OsString { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator>, { extend_reserved!(self, par_iter, osstring_len); } } /// Extends a string with characters from a parallel iterator. impl ParallelExtend for String { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { // This is like `extend`, but `Vec` is less efficient to deal // with than `String`, so instead collect to `LinkedList`. let list = par_iter.into_par_iter().drive_unindexed(ListStringConsumer); self.reserve(list.iter().map(String::len).sum()); self.extend(list); } } /// Extends a string with copied characters from a parallel iterator. impl<'a> ParallelExtend<&'a char> for String { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { self.par_extend(par_iter.into_par_iter().copied()) } } struct ListStringConsumer; struct ListStringFolder { string: String, } impl Consumer for ListStringConsumer { type Folder = ListStringFolder; type Reducer = ListReducer; type Result = LinkedList; fn split_at(self, _index: usize) -> (Self, Self, Self::Reducer) { (Self, Self, ListReducer) } fn into_folder(self) -> Self::Folder { ListStringFolder { string: String::new(), } } fn full(&self) -> bool { false } } impl UnindexedConsumer for ListStringConsumer { fn split_off_left(&self) -> Self { Self } fn to_reducer(&self) -> Self::Reducer { ListReducer } } impl Folder for ListStringFolder { type Result = LinkedList; fn consume(mut self, item: char) -> Self { self.string.push(item); self } fn consume_iter(mut self, iter: I) -> Self where I: IntoIterator, { self.string.extend(iter); self } fn complete(self) -> Self::Result { let mut list = LinkedList::new(); if !self.string.is_empty() { list.push_back(self.string); } list } fn full(&self) -> bool { false } } /// Extends a string with string slices from a parallel iterator. impl<'a> ParallelExtend<&'a str> for String { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter, string_len); } } /// Extends a string with strings from a parallel iterator. impl ParallelExtend for String { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter, string_len); } } /// Extends a string with boxed strings from a parallel iterator. impl ParallelExtend> for String { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator>, { extend_reserved!(self, par_iter, string_len); } } /// Extends a string with string slices from a parallel iterator. impl<'a> ParallelExtend> for String { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator>, { extend_reserved!(self, par_iter, string_len); } } /// Extends a deque with items from a parallel iterator. impl ParallelExtend for VecDeque where T: Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter); } } /// Extends a deque with copied items from a parallel iterator. impl<'a, T> ParallelExtend<&'a T> for VecDeque where T: 'a + Copy + Send + Sync, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { extend_reserved!(self, par_iter); } } /// Extends a vector with items from a parallel iterator. impl ParallelExtend for Vec where T: Send, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { // See the vec_collect benchmarks in rayon-demo for different strategies. let par_iter = par_iter.into_par_iter(); match par_iter.opt_len() { Some(len) => { // When Rust gets specialization, we can get here for indexed iterators // without relying on `opt_len`. Until then, `special_extend()` fakes // an unindexed mode on the promise that `opt_len()` is accurate. super::collect::special_extend(par_iter, len, self); } None => { // This works like `extend`, but `Vec::append` is more efficient. let list = par_iter.drive_unindexed(ListVecConsumer); self.reserve(list.iter().map(Vec::len).sum()); for mut other in list { self.append(&mut other); } } } } } /// Extends a vector with copied items from a parallel iterator. impl<'a, T> ParallelExtend<&'a T> for Vec where T: 'a + Copy + Send + Sync, { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { self.par_extend(par_iter.into_par_iter().copied()) } } /// Collapses all unit items from a parallel iterator into one. impl ParallelExtend<()> for () { fn par_extend(&mut self, par_iter: I) where I: IntoParallelIterator, { par_iter.into_par_iter().drive_unindexed(NoopConsumer) } }