2
0
Fork 0
mirror of https://git.asonix.dog/asonix/pict-rs synced 2024-12-21 19:01:25 +00:00

Merge branch 'main' into asonix/update-diesel-async-05

This commit is contained in:
asonix 2024-11-12 13:24:41 -06:00
commit bb6ec7c817
11 changed files with 958 additions and 668 deletions

1344
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,7 +1,7 @@
[package]
name = "pict-rs"
description = "A simple image hosting service"
version = "0.5.17-pre.3"
version = "0.5.17-pre.6"
authors = ["asonix <asonix@asonix.dog>"]
license = "AGPL-3.0"
readme = "README.md"
@ -29,7 +29,7 @@ blurhash-update = "0.1.0"
clap = { version = "4.5.9", features = ["derive"] }
color-eyre = "0.6.3"
config = { version = "0.14.0", default-features = false, features = ["json", "ron", "toml", "yaml"] }
console-subscriber = "0.3.0"
console-subscriber = "0.4.0"
dashmap = "6.0.1"
diesel = { version = "2.2.1", features = ["postgres_backend", "serde_json", "time", "uuid"] }
diesel-async = { version = "0.5.0", features = ["bb8", "postgres"] }
@ -37,18 +37,18 @@ diesel-derive-enum = { version = "2.1.0", features = ["postgres"] }
futures-core = "0.3.30"
hex = "0.4.3"
md-5 = "0.10.6"
metrics = "0.23.0"
metrics-exporter-prometheus = { version = "0.15.1", default-features = false, features = ["http-listener"] }
metrics = "0.24.0"
metrics-exporter-prometheus = { version = "0.16.0", default-features = false, features = ["http-listener"] }
mime = "0.3.17"
nanorand = { version = "0.7.0", optional = true }
object_store = { version = "0.10.1", features = ["aws"] }
opentelemetry_sdk = { version = "0.23.0", features = ["rt-tokio"] }
opentelemetry = "0.23.0"
opentelemetry-otlp = "0.16.0"
object_store = { version = "0.11.0", features = ["aws"] }
opentelemetry_sdk = { version = "0.26.0", features = ["rt-tokio"] }
opentelemetry = "0.26.0"
opentelemetry-otlp = "0.26.0"
pin-project-lite = "0.2.14"
refinery = { version = "0.8.14", features = ["tokio-postgres", "postgres"] }
reqwest = { version = "0.12.5", default-features = false, features = ["json", "rustls-tls-no-provider", "stream"] }
reqwest-middleware = "0.3.1"
reqwest-middleware = "0.4.0"
reqwest-tracing = "0.5.0"
# pinned to tokio-postgres-generic-rustls
# pinned to actix-web
@ -79,7 +79,7 @@ toml = "0.8.14"
tracing = "0.1.40"
tracing-error = "0.2.0"
tracing-log = "0.2.0"
tracing-opentelemetry = "0.24.0"
tracing-opentelemetry = "0.27.0"
tracing-subscriber = { version = "0.3.18", features = [
"ansi",
"env-filter",
@ -94,6 +94,6 @@ uuid = { version = "1.10.0", features = ["serde", "std", "v4", "v7"] }
webpki-roots = "0.26.3"
[dependencies.tracing-actix-web]
version = "0.7.11"
version = "0.7.14"
default-features = false
features = ["opentelemetry_0_23"]
features = ["opentelemetry_0_26"]

View file

@ -70,6 +70,7 @@ feature-depth = 1
# A list of advisory IDs to ignore. Note that ignored advisories will still
# output a note when they are encountered.
ignore = [
{ id = "RUSTSEC-2024-0384", reason = "Sled 0.34.7 dependency" },
#"RUSTSEC-0000-0000",
#{ id = "RUSTSEC-0000-0000", reason = "you can specify a reason the advisory is ignored" },
#"a-crate-that-is-yanked@0.1.1", # you can also ignore yanked crate versions if you wish
@ -97,6 +98,7 @@ allow = [
"ISC",
"MIT",
"MPL-2.0",
"Unicode-3.0",
"Unicode-DFS-2016",
"Zlib",
]
@ -214,33 +216,20 @@ skip = [
"hashbrown",
"heck",
"http",
"http-body",
"hyper",
"indexmap",
"matchit",
"miniz_oxide",
"parking_lot",
"parking_lot_core",
"regex-automata",
"regex-syntax",
"siphasher",
"syn",
"sync_wrapper",
"tower",
# Ignore duplicates for systems we don't target
"redox_syscall",
"windows-sys",
"windows-targets",
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnu",
"windows_x86_64_gnu",
"windows_x86_64_gnu",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
#"ansi_term@0.11.0",
#{ crate = "ansi_term@0.11.0", reason = "you can specify a reason why it can't be updated/removed" },
]

View file

@ -11,7 +11,7 @@
rustPlatform.buildRustPackage {
pname = "pict-rs";
version = "0.5.17-pre.3";
version = "0.5.17-pre.5";
src = ./.;
cargoLock = {

14
releases/0.5.17-pre.4.md Normal file
View file

@ -0,0 +1,14 @@
# pict-rs 0.5.17-pre.4
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.17-pre.4 adds handling for rare cases where variant records exist and refer to missing
files. These records should now be properly cleaned upon request.
## Upgrade Notes
There are no significant changes from 0.5.17-pre.3. Upgrading should be as simple as pulling a new
version of pict-rs.

14
releases/0.5.17-pre.5.md Normal file
View file

@ -0,0 +1,14 @@
# pict-rs 0.5.17-pre.5
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.17-pre.5 fixes a bug in missing variant detection introduced in -pre.4 that made the
feature essentially do nothing.
## Upgrade Notes
There are no significant changes from 0.5.17-pre.4. Upgrading should be as simple as pulling a new
version of pict-rs.

14
releases/0.5.17-pre.6.md Normal file
View file

@ -0,0 +1,14 @@
# pict-rs 0.5.17-pre.6
pict-rs is a simple image hosting microservice, designed to handle storing and retrieving images,
animations, and videos, as well as providing basic image processing functionality.
## Overview
pict-rs 0.5.17-pre.6 updates dependencies and updates postgres queries to avoid causing errors in
postgres.
## Upgrade Notes
There are no significant changes from 0.5.17-pre.5. Upgrading should be as simple as pulling a new
version of pict-rs.

View file

@ -1,7 +1,7 @@
use crate::config::{LogFormat, OpenTelemetry, Tracing};
use color_eyre::config::Theme;
use console_subscriber::ConsoleLayer;
use opentelemetry::KeyValue;
use opentelemetry::{trace::TracerProvider, KeyValue};
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{propagation::TraceContextPropagator, Resource};
use tracing::subscriber::set_global_default;
@ -78,11 +78,12 @@ where
if let Some(url) = otel.url.as_ref() {
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_trace_config(
opentelemetry_sdk::trace::config().with_resource(Resource::new(vec![
KeyValue::new("service.name", otel.service_name.clone()),
])),
)
.with_trace_config(opentelemetry_sdk::trace::Config::default().with_resource(
Resource::new(vec![KeyValue::new(
"service.name",
otel.service_name.clone(),
)]),
))
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
@ -91,7 +92,7 @@ where
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
let otel_layer = tracing_opentelemetry::layer()
.with_tracer(tracer)
.with_tracer(tracer.tracer("pict-rs-tracer"))
.with_filter(otel.targets.as_ref().targets.clone());
let subscriber = subscriber.with(otel_layer);

View file

@ -842,6 +842,56 @@ async fn not_found_hash(repo: &ArcRepo) -> Result<Option<(Alias, Hash)>, Error>
Ok(Some((alias, hash)))
}
async fn exists<S: Store>(store: &S, identifier: &Arc<str>) -> Result<bool, Error> {
if let Err(e) = store.len(identifier).await {
if e.is_not_found() {
return Ok(false);
}
return Err(e.into());
}
Ok(true)
}
async fn existing_variant_identifier<S: Store>(
state: &State<S>,
hash: Hash,
variant: String,
) -> Result<Option<Arc<str>>, Error> {
let identifier_opt = state
.repo
.variant_identifier(hash.clone(), variant.clone())
.await?;
if let Some(identifier) = identifier_opt {
if !exists(&state.store, &identifier).await? {
let clean =
if let Some(original_identifier) = state.repo.identifier(hash.clone()).await? {
exists(&state.store, &original_identifier).await?
} else {
true
};
if clean {
if state.config.server.read_only {
tracing::warn!("Stored variant {variant} for hash {hash:?} doesn't exist");
return Err(UploadError::ReadOnly.into());
}
tracing::warn!("Stored variant {variant} for hash {hash:?} doesn't exist, spawning cleanup job");
queue::cleanup_variants(&state.repo, hash, Some(variant)).await?;
}
Ok(None)
} else {
Ok(Some(identifier))
}
} else {
Ok(None)
}
}
/// Process files
#[tracing::instrument(name = "Serving processed image", skip(state))]
async fn process<S: Store + 'static>(
@ -871,10 +921,7 @@ async fn process<S: Store + 'static>(
.await?;
}
let identifier_opt = state
.repo
.variant_identifier(hash.clone(), variant.clone())
.await?;
let identifier_opt = existing_variant_identifier(&state, hash.clone(), variant.clone()).await?;
let (details, identifier) = if let Some(identifier) = identifier_opt {
let details = ensure_details_identifier(&state, &identifier).await?;

View file

@ -97,7 +97,7 @@ pub(crate) async fn cleanup_identifier(repo: &ArcRepo, identifier: &Arc<str>) ->
Ok(())
}
async fn cleanup_variants(
pub(super) async fn cleanup_variants(
repo: &ArcRepo,
hash: Hash,
variant: Option<String>,

View file

@ -428,20 +428,19 @@ impl PostgresRepo {
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
let res = diesel::insert_into(keyed_notifications)
let inserted = diesel::insert_into(keyed_notifications)
.values(key.eq(input_key))
.on_conflict_do_nothing()
.execute(&mut conn)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(AlreadyInserted)),
Err(e) => Err(PostgresError::Diesel(e)),
if inserted == 1 {
Ok(Ok(()))
} else {
Ok(Err(AlreadyInserted))
}
}
@ -947,25 +946,24 @@ impl HashRepo for PostgresRepo {
let timestamp = to_primitive(timestamp);
let res = diesel::insert_into(hashes)
let inserted = diesel::insert_into(hashes)
.values((
hash.eq(&input_hash),
identifier.eq(input_identifier.as_ref()),
created_at.eq(&timestamp),
))
.on_conflict_do_nothing()
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_HASHES_CREATE_HASH)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(HashAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
if inserted == 1 {
Ok(Ok(()))
} else {
Ok(Err(HashAlreadyExists))
}
}
@ -1195,25 +1193,24 @@ impl VariantRepo for PostgresRepo {
let mut conn = self.get_connection().await?;
let res = diesel::insert_into(variants)
let inserted = diesel::insert_into(variants)
.values((
hash.eq(&input_hash),
variant.eq(&input_variant),
identifier.eq(input_identifier.to_string()),
))
.on_conflict_do_nothing()
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_VARIANTS_RELATE_VARIANT_IDENTIFIER)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(VariantAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
if inserted == 1 {
Ok(Ok(()))
} else {
Ok(Err(VariantAlreadyExists))
}
}
@ -1302,25 +1299,24 @@ impl AliasRepo for PostgresRepo {
let mut conn = self.get_connection().await?;
let res = diesel::insert_into(aliases)
let inserted = diesel::insert_into(aliases)
.values((
alias.eq(input_alias),
hash.eq(&input_hash),
token.eq(delete_token),
))
.on_conflict_do_nothing()
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_ALIASES_CREATE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(AliasAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
if inserted == 1 {
Ok(Ok(()))
} else {
Ok(Err(AliasAlreadyExists))
}
}
@ -1467,22 +1463,17 @@ impl DetailsRepo for PostgresRepo {
let value =
serde_json::to_value(&input_details.inner).map_err(PostgresError::SerializeDetails)?;
let res = diesel::insert_into(details)
diesel::insert_into(details)
.values((identifier.eq(input_identifier.as_ref()), json.eq(&value)))
.on_conflict_do_nothing()
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_DETAILS_RELATE)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res {
Ok(_)
| Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(()),
Err(e) => Err(PostgresError::Diesel(e).into()),
}
Ok(())
}
#[tracing::instrument(level = "debug", skip(self))]
@ -1560,33 +1551,28 @@ impl QueueRepo for PostgresRepo {
let mut conn = self.get_connection().await?;
let res = diesel::insert_into(job_queue)
let job_id = diesel::insert_into(job_queue)
.values((
queue.eq(queue_name),
job.eq(job_json),
unique_key.eq(in_unique_key),
))
.returning(id)
.on_conflict_do_nothing()
.get_result::<Uuid>(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_QUEUE_PUSH)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?
.map(JobId)
.map(Some);
.optional()
.map_err(PostgresError::Diesel)?
.map(JobId);
match res {
Ok(job_id) => {
guard.disarm();
Ok(job_id)
}
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(None),
Err(e) => Err(RepoError::from(PostgresError::Diesel(e))),
if job_id.is_some() {
guard.disarm();
}
Ok(job_id)
}
#[tracing::instrument(level = "debug", skip_all, fields(job_id))]
@ -1893,21 +1879,20 @@ impl ProxyRepo for PostgresRepo {
let mut conn = self.get_connection().await?;
let res = diesel::insert_into(proxies)
let inserted = diesel::insert_into(proxies)
.values((url.eq(input_url.as_str()), alias.eq(&input_alias)))
.on_conflict_do_nothing()
.execute(&mut conn)
.with_metrics(crate::init_metrics::POSTGRES_PROXY_RELATE_URL)
.with_timeout(Duration::from_secs(5))
.await
.map_err(|_| PostgresError::DbTimeout)?;
.map_err(|_| PostgresError::DbTimeout)?
.map_err(PostgresError::Diesel)?;
match res {
Ok(_) => Ok(Ok(())),
Err(diesel::result::Error::DatabaseError(
diesel::result::DatabaseErrorKind::UniqueViolation,
_,
)) => Ok(Err(ProxyAlreadyExists)),
Err(e) => Err(PostgresError::Diesel(e).into()),
if inserted == 1 {
Ok(Ok(()))
} else {
Ok(Err(ProxyAlreadyExists))
}
}