use super::task::Task; use super::FuturesUnordered; use core::marker::PhantomData; use core::pin::Pin; use core::ptr; use core::sync::atomic::Ordering::Relaxed; /// Mutable iterator over all futures in the unordered set. #[derive(Debug)] pub struct IterPinMut<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, pub(super) _marker: PhantomData<&'a mut FuturesUnordered>, } /// Mutable iterator over all futures in the unordered set. #[derive(Debug)] pub struct IterMut<'a, Fut: Unpin>(pub(super) IterPinMut<'a, Fut>); /// Immutable iterator over all futures in the unordered set. #[derive(Debug)] pub struct IterPinRef<'a, Fut> { pub(super) task: *const Task, pub(super) len: usize, pub(super) pending_next_all: *mut Task, pub(super) _marker: PhantomData<&'a FuturesUnordered>, } /// Immutable iterator over all the futures in the unordered set. #[derive(Debug)] pub struct Iter<'a, Fut: Unpin>(pub(super) IterPinRef<'a, Fut>); /// Owned iterator over all futures in the unordered set. #[derive(Debug)] pub struct IntoIter { pub(super) len: usize, pub(super) inner: FuturesUnordered, } impl Iterator for IntoIter { type Item = Fut; fn next(&mut self) -> Option { // `head_all` can be accessed directly and we don't need to spin on // `Task::next_all` since we have exclusive access to the set. let task = self.inner.head_all.get_mut(); if (*task).is_null() { return None; } unsafe { // Moving out of the future is safe because it is `Unpin` let future = (*(**task).future.get()).take().unwrap(); // Mutable access to a previously shared `FuturesUnordered` implies // that the other threads already released the object before the // current thread acquired it, so relaxed ordering can be used and // valid `next_all` checks can be skipped. let next = (**task).next_all.load(Relaxed); *task = next; if !task.is_null() { *(**task).prev_all.get() = ptr::null_mut(); } self.len -= 1; Some(future) } } fn size_hint(&self) -> (usize, Option) { (self.len, Some(self.len)) } } impl ExactSizeIterator for IntoIter {} impl<'a, Fut> Iterator for IterPinMut<'a, Fut> { type Item = Pin<&'a mut Fut>; fn next(&mut self) -> Option { if self.task.is_null() { return None; } unsafe { let future = (*(*self.task).future.get()).as_mut().unwrap(); // Mutable access to a previously shared `FuturesUnordered` implies // that the other threads already released the object before the // current thread acquired it, so relaxed ordering can be used and // valid `next_all` checks can be skipped. let next = (*self.task).next_all.load(Relaxed); self.task = next; self.len -= 1; Some(Pin::new_unchecked(future)) } } fn size_hint(&self) -> (usize, Option) { (self.len, Some(self.len)) } } impl ExactSizeIterator for IterPinMut<'_, Fut> {} impl<'a, Fut: Unpin> Iterator for IterMut<'a, Fut> { type Item = &'a mut Fut; fn next(&mut self) -> Option { self.0.next().map(Pin::get_mut) } fn size_hint(&self) -> (usize, Option) { self.0.size_hint() } } impl ExactSizeIterator for IterMut<'_, Fut> {} impl<'a, Fut> Iterator for IterPinRef<'a, Fut> { type Item = Pin<&'a Fut>; fn next(&mut self) -> Option { if self.task.is_null() { return None; } unsafe { let future = (*(*self.task).future.get()).as_ref().unwrap(); // Relaxed ordering can be used since acquire ordering when // `head_all` was initially read for this iterator implies acquire // ordering for all previously inserted nodes (and we don't need to // read `len_all` again for any other nodes). let next = (*self.task).spin_next_all(self.pending_next_all, Relaxed); self.task = next; self.len -= 1; Some(Pin::new_unchecked(future)) } } fn size_hint(&self) -> (usize, Option) { (self.len, Some(self.len)) } } impl ExactSizeIterator for IterPinRef<'_, Fut> {} impl<'a, Fut: Unpin> Iterator for Iter<'a, Fut> { type Item = &'a Fut; fn next(&mut self) -> Option { self.0.next().map(Pin::get_ref) } fn size_hint(&self) -> (usize, Option) { self.0.size_hint() } } impl ExactSizeIterator for Iter<'_, Fut> {} // SAFETY: we do nothing thread-local and there is no interior mutability, // so the usual structural `Send`/`Sync` apply. unsafe impl Send for IterPinRef<'_, Fut> {} unsafe impl Sync for IterPinRef<'_, Fut> {} unsafe impl Send for IterPinMut<'_, Fut> {} unsafe impl Sync for IterPinMut<'_, Fut> {} unsafe impl Send for IntoIter {} unsafe impl Sync for IntoIter {}