mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2024-12-22 19:31:35 +00:00
Implement UploadRepo
This commit is contained in:
parent
132e395e5c
commit
77a400c7ca
2 changed files with 57 additions and 9 deletions
|
@ -431,11 +431,6 @@ impl UploadId {
|
||||||
pub(crate) fn as_bytes(&self) -> &[u8] {
|
pub(crate) fn as_bytes(&self) -> &[u8] {
|
||||||
&self.id.as_bytes()[..]
|
&self.id.as_bytes()[..]
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn from_bytes(&self, bytes: &[u8]) -> Option<Self> {
|
|
||||||
let id = Uuid::from_slice(bytes).ok()?;
|
|
||||||
Some(Self { id })
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<Uuid> for UploadId {
|
impl From<Uuid> for UploadId {
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
error::Error,
|
error::{Error, UploadError},
|
||||||
repo::{
|
repo::{
|
||||||
Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo,
|
Alias, AliasRepo, AlreadyExists, BaseRepo, DeleteToken, Details, FullRepo, HashRepo,
|
||||||
Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
|
Identifier, IdentifierRepo, QueueRepo, SettingsRepo, UploadId, UploadRepo, UploadResult,
|
||||||
},
|
},
|
||||||
|
serde_str::Serde,
|
||||||
stream::from_iterator,
|
stream::from_iterator,
|
||||||
};
|
};
|
||||||
use futures_util::Stream;
|
use futures_util::Stream;
|
||||||
|
@ -55,6 +56,7 @@ pub(crate) struct SledRepo {
|
||||||
queue: Tree,
|
queue: Tree,
|
||||||
in_progress_queue: Tree,
|
in_progress_queue: Tree,
|
||||||
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
|
queue_notifier: Arc<RwLock<HashMap<&'static str, Arc<Notify>>>>,
|
||||||
|
uploads: Tree,
|
||||||
db: Db,
|
db: Db,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,6 +76,7 @@ impl SledRepo {
|
||||||
queue: db.open_tree("pict-rs-queue-tree")?,
|
queue: db.open_tree("pict-rs-queue-tree")?,
|
||||||
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
|
in_progress_queue: db.open_tree("pict-rs-in-progress-queue-tree")?,
|
||||||
queue_notifier: Arc::new(RwLock::new(HashMap::new())),
|
queue_notifier: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
uploads: db.open_tree("pict-rs-uploads-tree")?,
|
||||||
db,
|
db,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -85,18 +88,68 @@ impl BaseRepo for SledRepo {
|
||||||
|
|
||||||
impl FullRepo for SledRepo {}
|
impl FullRepo for SledRepo {}
|
||||||
|
|
||||||
|
#[derive(serde::Deserialize, serde::Serialize)]
|
||||||
|
enum InnerUploadResult {
|
||||||
|
Success {
|
||||||
|
alias: Serde<Alias>,
|
||||||
|
token: Serde<DeleteToken>,
|
||||||
|
},
|
||||||
|
Failure {
|
||||||
|
message: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<UploadResult> for InnerUploadResult {
|
||||||
|
fn from(u: UploadResult) -> Self {
|
||||||
|
match u {
|
||||||
|
UploadResult::Success { alias, token } => InnerUploadResult::Success {
|
||||||
|
alias: Serde::new(alias),
|
||||||
|
token: Serde::new(token),
|
||||||
|
},
|
||||||
|
UploadResult::Failure { message } => InnerUploadResult::Failure { message },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<InnerUploadResult> for UploadResult {
|
||||||
|
fn from(i: InnerUploadResult) -> Self {
|
||||||
|
match i {
|
||||||
|
InnerUploadResult::Success { alias, token } => UploadResult::Success {
|
||||||
|
alias: Serde::into_inner(alias),
|
||||||
|
token: Serde::into_inner(token),
|
||||||
|
},
|
||||||
|
InnerUploadResult::Failure { message } => UploadResult::Failure { message },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[async_trait::async_trait(?Send)]
|
#[async_trait::async_trait(?Send)]
|
||||||
impl UploadRepo for SledRepo {
|
impl UploadRepo for SledRepo {
|
||||||
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error> {
|
async fn wait(&self, upload_id: UploadId) -> Result<UploadResult, Error> {
|
||||||
unimplemented!("DO THIS")
|
let mut subscriber = self.uploads.watch_prefix(upload_id.as_bytes());
|
||||||
|
|
||||||
|
while let Some(event) = (&mut subscriber).await {
|
||||||
|
if let sled::Event::Insert { value, .. } = event {
|
||||||
|
let result: InnerUploadResult = serde_json::from_slice(&value)?;
|
||||||
|
return Ok(result.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(UploadError::Canceled.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn claim(&self, upload_id: UploadId) -> Result<(), Error> {
|
async fn claim(&self, upload_id: UploadId) -> Result<(), Error> {
|
||||||
unimplemented!("DO THIS")
|
b!(self.uploads, uploads.remove(upload_id.as_bytes()));
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> {
|
async fn complete(&self, upload_id: UploadId, result: UploadResult) -> Result<(), Error> {
|
||||||
unimplemented!("DO THIS")
|
let result: InnerUploadResult = result.into();
|
||||||
|
let result = serde_json::to_vec(&result)?;
|
||||||
|
|
||||||
|
b!(self.uploads, uploads.insert(upload_id.as_bytes(), result));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue