From 835647d290230b2ff6b6aaa982c5fe1c23ed0eed Mon Sep 17 00:00:00 2001 From: asonix Date: Sat, 3 Feb 2024 18:42:34 -0600 Subject: [PATCH] Unite launch-with-store fns --- src/lib.rs | 117 +++++++++++++---------------------------------------- 1 file changed, 28 insertions(+), 89 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 5983eb6..ecfe0d6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -49,6 +49,7 @@ use middleware::{Metrics, Payload}; use repo::ArcRepo; use reqwest_middleware::{ClientBuilder, ClientWithMiddleware}; use reqwest_tracing::TracingMiddleware; +use rustls_channel_resolver::ChannelSender; use rusty_s3::UrlStyle; use state::State; use std::{ @@ -59,6 +60,7 @@ use std::{ time::{Duration, SystemTime}, }; use streem::IntoStreamer; +use sync::DropHandle; use tmp_file::{ArcTmpDir, TmpDir}; use tokio::sync::Semaphore; use tracing::Instrument; @@ -1664,8 +1666,27 @@ where crate::sync::spawn("process-worker", queue::process_images(state, process_map)); } -async fn launch_file_store( - state: State, +fn watch_keys(tls: Tls, sender: ChannelSender) -> DropHandle<()> { + crate::sync::abort_on_drop(crate::sync::spawn("cert-reader", async move { + let mut interval = tokio::time::interval(Duration::from_secs(30)); + interval.tick().await; + + loop { + interval.tick().await; + + match tls.open_keys().await { + Ok(certified_key) => sender.update(certified_key), + Err(e) => tracing::error!("Failed to open keys {}", format!("{e}")), + } + } + })) +} + +async fn launch< + S: Store + Send + 'static, + F: Fn(&mut web::ServiceConfig) + Send + Clone + 'static, +>( + state: State, extra_config: F, ) -> color_eyre::Result<()> { let process_map = ProcessMap::new(); @@ -1698,19 +1719,7 @@ async fn launch_file_store(certified_key); - let handle = crate::sync::abort_on_drop(crate::sync::spawn("cert-reader", async move { - let mut interval = tokio::time::interval(Duration::from_secs(30)); - interval.tick().await; - - loop { - interval.tick().await; - - match tls.open_keys().await { - Ok(certified_key) => tx.update(certified_key), - Err(e) => tracing::error!("Failed to open keys {}", format!("{e}")), - } - } - })); + let handle = watch_keys(tls, tx); let config = rustls_021::ServerConfig::builder() .with_safe_defaults() @@ -1732,70 +1741,6 @@ async fn launch_file_store( - state: State, - extra_config: F, -) -> color_eyre::Result<()> { - let process_map = ProcessMap::new(); - - let address = state.config.server.address; - - let tls = Tls::from_config(&state.config); - - spawn_cleanup(state.clone()); - - let server = HttpServer::new(move || { - let extra_config = extra_config.clone(); - let state = state.clone(); - let process_map = process_map.clone(); - - spawn_workers(state.clone(), process_map.clone()); - - App::new() - .wrap(TracingLogger::default()) - .wrap(Deadline) - .wrap(Metrics) - .wrap(Payload::new()) - .configure(move |sc| { - configure_endpoints(sc, state.clone(), process_map.clone(), extra_config) - }) - }); - - if let Some(tls) = tls { - let certified_key = tls.open_keys().await?; - - let (tx, rx) = rustls_channel_resolver::channel::<32>(certified_key); - - let handle = crate::sync::abort_on_drop(crate::sync::spawn("cert-reader", async move { - let mut interval = tokio::time::interval(Duration::from_secs(30)); - interval.tick().await; - - loop { - interval.tick().await; - - match tls.open_keys().await { - Ok(certified_key) => tx.update(certified_key), - Err(e) => tracing::error!("Failed to open keys {}", format!("{e}")), - } - } - })); - - let config = rustls_021::ServerConfig::builder() - .with_safe_defaults() - .with_no_client_auth() - .with_cert_resolver(rx); - - server.bind_rustls_021(address, config)?.run().await?; - - handle.abort(); - let _ = handle.await; - } else { - server.bind(address)?.run().await?; - } - - Ok(()) -} - #[allow(clippy::too_many_arguments)] async fn migrate_inner( config: Configuration, @@ -2143,13 +2088,10 @@ impl PictRsConfiguration { match repo { Repo::Sled(sled_repo) => { - launch_file_store(state, move |sc| { - sled_extra_config(sc, sled_repo.clone()) - }) - .await?; + launch(state, move |sc| sled_extra_config(sc, sled_repo.clone())).await?; } Repo::Postgres(_) => { - launch_file_store(state, |_| {}).await?; + launch(state, |_| {}).await?; } } } @@ -2209,13 +2151,10 @@ impl PictRsConfiguration { match repo { Repo::Sled(sled_repo) => { - launch_object_store(state, move |sc| { - sled_extra_config(sc, sled_repo.clone()) - }) - .await?; + launch(state, move |sc| sled_extra_config(sc, sled_repo.clone())).await?; } Repo::Postgres(_) => { - launch_object_store(state, |_| {}).await?; + launch(state, |_| {}).await?; } } }