More streme

This commit is contained in:
asonix 2023-09-10 20:08:01 -04:00
parent 1b97ac1c5a
commit b2674f06d0
5 changed files with 92 additions and 174 deletions

View File

@ -107,7 +107,8 @@ where
} }
// Hashes are read in a consistent order // Hashes are read in a consistent order
let mut stream = repo.hashes().into_streamer(); let stream = std::pin::pin!(repo.hashes());
let mut stream = stream.into_streamer();
let state = Rc::new(MigrateState { let state = Rc::new(MigrateState {
repo: repo.clone(), repo: repo.clone(),

View File

@ -138,7 +138,8 @@ async fn alias(repo: &ArcRepo, alias: Alias, token: DeleteToken) -> Result<(), E
#[tracing::instrument(skip_all)] #[tracing::instrument(skip_all)]
async fn all_variants(repo: &ArcRepo) -> Result<(), Error> { async fn all_variants(repo: &ArcRepo) -> Result<(), Error> {
let mut hash_stream = repo.hashes().into_streamer(); let hash_stream = std::pin::pin!(repo.hashes());
let mut hash_stream = hash_stream.into_streamer();
while let Some(res) = hash_stream.next().await { while let Some(res) = hash_stream.next().await {
let hash = res?; let hash = res?;

View File

@ -8,10 +8,10 @@ use crate::{
config, config,
details::Details, details::Details,
error_code::{ErrorCode, OwnedErrorCode}, error_code::{ErrorCode, OwnedErrorCode},
future::LocalBoxFuture,
stream::LocalBoxStream, stream::LocalBoxStream,
}; };
use base64::Engine; use base64::Engine;
use futures_core::Stream;
use std::{fmt::Debug, sync::Arc}; use std::{fmt::Debug, sync::Arc};
use url::Url; use url::Url;
use uuid::Uuid; use uuid::Uuid;
@ -572,81 +572,29 @@ impl HashPage {
} }
} }
type PageFuture = LocalBoxFuture<'static, Result<HashPage, RepoError>>;
pub(crate) struct HashStream {
repo: Option<ArcRepo>,
page_future: Option<PageFuture>,
page: Option<HashPage>,
}
impl futures_core::Stream for HashStream {
type Item = Result<Hash, RepoError>;
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
loop {
let Some(repo) = &this.repo else {
return std::task::Poll::Ready(None);
};
let slug = if let Some(page) = &mut this.page {
// popping last in page is fine - we reversed them
if let Some(hash) = page.hashes.pop() {
return std::task::Poll::Ready(Some(Ok(hash)));
}
let slug = page.next();
this.page.take();
if let Some(slug) = slug {
Some(slug)
} else {
this.repo.take();
return std::task::Poll::Ready(None);
}
} else {
None
};
if let Some(page_future) = &mut this.page_future {
let res = std::task::ready!(page_future.as_mut().poll(cx));
this.page_future.take();
match res {
Ok(mut page) => {
// reverse because we use `pop` to fetch next
page.hashes.reverse();
this.page = Some(page);
}
Err(e) => {
this.repo.take();
return std::task::Poll::Ready(Some(Err(e)));
}
}
} else {
let repo = repo.clone();
this.page_future = Some(Box::pin(async move { repo.hash_page(slug, 100).await }));
}
}
}
}
impl dyn FullRepo { impl dyn FullRepo {
pub(crate) fn hashes(self: &Arc<Self>) -> HashStream { pub(crate) fn hashes(self: &Arc<Self>) -> impl Stream<Item = Result<Hash, RepoError>> {
HashStream { let repo = self.clone();
repo: Some(self.clone()),
page_future: None, streem::try_from_fn(|yielder| async move {
page: None, let mut slug = None;
}
loop {
let page = repo.hash_page(slug, 100).await?;
slug = page.next();
for hash in page.hashes {
yielder.yield_ok(hash).await;
}
if slug.is_none() {
break;
}
}
Ok(())
})
} }
} }

View File

@ -35,7 +35,8 @@ pub(crate) async fn migrate_repo(old_repo: ArcRepo, new_repo: ArcRepo) -> Result
tracing::warn!("Checks complete, migrating repo"); tracing::warn!("Checks complete, migrating repo");
tracing::warn!("{total_size} hashes will be migrated"); tracing::warn!("{total_size} hashes will be migrated");
let mut hash_stream = old_repo.hashes().into_streamer(); let hash_stream = std::pin::pin!(old_repo.hashes());
let mut hash_stream = hash_stream.into_streamer();
let mut index = 0; let mut index = 0;
while let Some(res) = hash_stream.next().await { while let Some(res) = hash_stream.next().await {

View File

@ -21,6 +21,7 @@ use diesel_async::{
}, },
AsyncConnection, AsyncPgConnection, RunQueryDsl, AsyncConnection, AsyncPgConnection, RunQueryDsl,
}; };
use futures_core::Stream;
use tokio::sync::Notify; use tokio::sync::Notify;
use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket}; use tokio_postgres::{tls::NoTlsStream, AsyncMessage, Connection, NoTls, Notification, Socket};
use tracing::Instrument; use tracing::Instrument;
@ -30,7 +31,7 @@ use uuid::Uuid;
use crate::{ use crate::{
details::Details, details::Details,
error_code::{ErrorCode, OwnedErrorCode}, error_code::{ErrorCode, OwnedErrorCode},
future::{LocalBoxFuture, WithMetrics, WithTimeout}, future::{WithMetrics, WithTimeout},
serde_str::Serde, serde_str::Serde,
stream::LocalBoxStream, stream::LocalBoxStream,
}; };
@ -1477,33 +1478,29 @@ impl AliasAccessRepo for PostgresRepo {
&self, &self,
timestamp: time::OffsetDateTime, timestamp: time::OffsetDateTime,
) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> { ) -> Result<LocalBoxStream<'static, Result<Alias, RepoError>>, RepoError> {
Ok(Box::pin(PageStream { Ok(Box::pin(page_stream(
inner: self.inner.clone(), self.inner.clone(),
future: None, to_primitive(timestamp),
current: Vec::new(), |inner, older_than| async move {
older_than: to_primitive(timestamp), use schema::proxies::dsl::*;
next: Box::new(|inner, older_than| {
Box::pin(async move {
use schema::proxies::dsl::*;
let mut conn = inner.get_connection().await?; let mut conn = inner.get_connection().await?;
let vec = proxies let vec = proxies
.select((accessed, alias)) .select((accessed, alias))
.filter(accessed.lt(older_than)) .filter(accessed.lt(older_than))
.order(accessed.desc()) .order(accessed.desc())
.limit(100) .limit(100)
.get_results(&mut conn) .get_results(&mut conn)
.with_metrics("pict-rs.postgres.alias-access.older-aliases") .with_metrics("pict-rs.postgres.alias-access.older-aliases")
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
Ok(vec) Ok(vec)
}) },
}), )))
}))
} }
async fn remove_alias_access(&self, _: Alias) -> Result<(), RepoError> { async fn remove_alias_access(&self, _: Alias) -> Result<(), RepoError> {
@ -1570,33 +1567,29 @@ impl VariantAccessRepo for PostgresRepo {
&self, &self,
timestamp: time::OffsetDateTime, timestamp: time::OffsetDateTime,
) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> { ) -> Result<LocalBoxStream<'static, Result<(Hash, String), RepoError>>, RepoError> {
Ok(Box::pin(PageStream { Ok(Box::pin(page_stream(
inner: self.inner.clone(), self.inner.clone(),
future: None, to_primitive(timestamp),
current: Vec::new(), |inner, older_than| async move {
older_than: to_primitive(timestamp), use schema::variants::dsl::*;
next: Box::new(|inner, older_than| {
Box::pin(async move {
use schema::variants::dsl::*;
let mut conn = inner.get_connection().await?; let mut conn = inner.get_connection().await?;
let vec = variants let vec = variants
.select((accessed, (hash, variant))) .select((accessed, (hash, variant)))
.filter(accessed.lt(older_than)) .filter(accessed.lt(older_than))
.order(accessed.desc()) .order(accessed.desc())
.limit(100) .limit(100)
.get_results(&mut conn) .get_results(&mut conn)
.with_metrics("pict-rs.postgres.variant-access.older-variants") .with_metrics("pict-rs.postgres.variant-access.older-variants")
.with_timeout(Duration::from_secs(5)) .with_timeout(Duration::from_secs(5))
.await .await
.map_err(|_| PostgresError::DbTimeout)? .map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?; .map_err(PostgresError::Diesel)?;
Ok(vec) Ok(vec)
}) },
}), )))
}))
} }
async fn remove_variant_access(&self, _: Hash, _: String) -> Result<(), RepoError> { async fn remove_variant_access(&self, _: Hash, _: String) -> Result<(), RepoError> {
@ -1778,62 +1771,36 @@ impl FullRepo for PostgresRepo {
} }
} }
type NextFuture<I> = LocalBoxFuture<'static, Result<Vec<(time::PrimitiveDateTime, I)>, RepoError>>; fn page_stream<I, F, Fut>(
struct PageStream<I> {
inner: Arc<Inner>, inner: Arc<Inner>,
future: Option<NextFuture<I>>, mut older_than: time::PrimitiveDateTime,
current: Vec<I>, next: F,
older_than: time::PrimitiveDateTime, ) -> impl Stream<Item = Result<I, RepoError>>
next: Box<dyn Fn(Arc<Inner>, time::PrimitiveDateTime) -> NextFuture<I>>,
}
impl<I> futures_core::Stream for PageStream<I>
where where
I: Unpin, F: Fn(Arc<Inner>, time::PrimitiveDateTime) -> Fut,
Fut: std::future::Future<Output = Result<Vec<(time::PrimitiveDateTime, I)>, RepoError>>
+ 'static,
I: 'static,
{ {
type Item = Result<I, RepoError>; streem::try_from_fn(|yielder| async move {
fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let this = self.get_mut();
loop { loop {
// Pop because we reversed the list let mut page = (next)(inner.clone(), older_than).await?;
if let Some(alias) = this.current.pop() {
return std::task::Poll::Ready(Some(Ok(alias)));
}
if let Some(future) = this.future.as_mut() { if let Some((last_time, last_item)) = page.pop() {
let res = std::task::ready!(future.as_mut().poll(cx)); for (_, item) in page {
yielder.yield_ok(item).await;
this.future.take();
match res {
Ok(page) if page.is_empty() => {
return std::task::Poll::Ready(None);
}
Ok(page) => {
let (mut timestamps, mut aliases): (Vec<_>, Vec<_>) =
page.into_iter().unzip();
// reverse because we use .pop() to get next
aliases.reverse();
this.current = aliases;
this.older_than = timestamps.pop().expect("Verified nonempty");
}
Err(e) => return std::task::Poll::Ready(Some(Err(e))),
} }
} else {
let inner = this.inner.clone();
let older_than = this.older_than;
this.future = Some((this.next)(inner, older_than)); yielder.yield_ok(last_item).await;
older_than = last_time;
} else {
break;
} }
} }
}
Ok(())
})
} }
impl std::fmt::Debug for PostgresRepo { impl std::fmt::Debug for PostgresRepo {