mirror of
https://git.asonix.dog/asonix/pict-rs
synced 2025-01-03 08:21:24 +00:00
Use actix-web's Range header
This commit is contained in:
parent
9392152dd5
commit
21b6e1dc08
3 changed files with 59 additions and 180 deletions
|
@ -57,9 +57,6 @@ pub(crate) enum UploadError {
|
||||||
#[error("Error parsing string, {0}")]
|
#[error("Error parsing string, {0}")]
|
||||||
ParseString(#[from] std::string::FromUtf8Error),
|
ParseString(#[from] std::string::FromUtf8Error),
|
||||||
|
|
||||||
#[error("Error parsing request, {0}")]
|
|
||||||
ParseReq(String),
|
|
||||||
|
|
||||||
#[error("Error interacting with filesystem, {0}")]
|
#[error("Error interacting with filesystem, {0}")]
|
||||||
Io(#[from] std::io::Error),
|
Io(#[from] std::io::Error),
|
||||||
|
|
||||||
|
@ -155,8 +152,7 @@ impl ResponseError for Error {
|
||||||
UploadError::DuplicateAlias
|
UploadError::DuplicateAlias
|
||||||
| UploadError::Limit(_)
|
| UploadError::Limit(_)
|
||||||
| UploadError::NoFiles
|
| UploadError::NoFiles
|
||||||
| UploadError::Upload(_)
|
| UploadError::Upload(_) => StatusCode::BAD_REQUEST,
|
||||||
| UploadError::ParseReq(_) => StatusCode::BAD_REQUEST,
|
|
||||||
UploadError::MissingAlias | UploadError::MissingFilename => StatusCode::NOT_FOUND,
|
UploadError::MissingAlias | UploadError::MissingFilename => StatusCode::NOT_FOUND,
|
||||||
UploadError::InvalidToken => StatusCode::FORBIDDEN,
|
UploadError::InvalidToken => StatusCode::FORBIDDEN,
|
||||||
UploadError::Range => StatusCode::RANGE_NOT_SATISFIABLE,
|
UploadError::Range => StatusCode::RANGE_NOT_SATISFIABLE,
|
||||||
|
|
27
src/main.rs
27
src/main.rs
|
@ -1,7 +1,7 @@
|
||||||
use actix_form_data::{Field, Form, Value};
|
use actix_form_data::{Field, Form, Value};
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
guard,
|
guard,
|
||||||
http::header::{CacheControl, CacheDirective, LastModified, ACCEPT_RANGES},
|
http::header::{CacheControl, CacheDirective, LastModified, Range, ACCEPT_RANGES},
|
||||||
web, App, HttpResponse, HttpResponseBuilder, HttpServer,
|
web, App, HttpResponse, HttpResponseBuilder, HttpServer,
|
||||||
};
|
};
|
||||||
use awc::Client;
|
use awc::Client;
|
||||||
|
@ -341,7 +341,7 @@ where
|
||||||
/// Process files
|
/// Process files
|
||||||
#[instrument(name = "Serving processed image", skip(manager, filters))]
|
#[instrument(name = "Serving processed image", skip(manager, filters))]
|
||||||
async fn process<S: Store + 'static>(
|
async fn process<S: Store + 'static>(
|
||||||
range: Option<range::RangeHeader>,
|
range: Option<web::Header<Range>>,
|
||||||
query: web::Query<ProcessQuery>,
|
query: web::Query<ProcessQuery>,
|
||||||
ext: web::Path<String>,
|
ext: web::Path<String>,
|
||||||
manager: web::Data<UploadManager>,
|
manager: web::Data<UploadManager>,
|
||||||
|
@ -440,12 +440,14 @@ where
|
||||||
|
|
||||||
let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?;
|
let (details, bytes) = CancelSafeProcessor::new(thumbnail_path.clone(), process_fut).await?;
|
||||||
|
|
||||||
let (builder, stream) = if let Some(range_header) = range {
|
let (builder, stream) = if let Some(web::Header(range_header)) = range {
|
||||||
if let Some(range) = range_header.single_bytes_range() {
|
if let Some(range) = range::single_bytes_range(&range_header) {
|
||||||
if let Some(content_range) = range.to_content_range(bytes.len() as u64) {
|
let len = bytes.len() as u64;
|
||||||
|
|
||||||
|
if let Some(content_range) = range::to_content_range(range, len) {
|
||||||
let mut builder = HttpResponse::PartialContent();
|
let mut builder = HttpResponse::PartialContent();
|
||||||
builder.insert_header(content_range);
|
builder.insert_header(content_range);
|
||||||
let stream = range.chop_bytes(bytes);
|
let stream = range::chop_bytes(range, bytes, len)?;
|
||||||
|
|
||||||
(builder, Either::left(Either::left(stream)))
|
(builder, Either::left(Either::left(stream)))
|
||||||
} else {
|
} else {
|
||||||
|
@ -502,7 +504,7 @@ where
|
||||||
/// Serve files
|
/// Serve files
|
||||||
#[instrument(name = "Serving file", skip(manager))]
|
#[instrument(name = "Serving file", skip(manager))]
|
||||||
async fn serve<S: Store>(
|
async fn serve<S: Store>(
|
||||||
range: Option<range::RangeHeader>,
|
range: Option<web::Header<Range>>,
|
||||||
alias: web::Path<String>,
|
alias: web::Path<String>,
|
||||||
manager: web::Data<UploadManager>,
|
manager: web::Data<UploadManager>,
|
||||||
store: web::Data<S>,
|
store: web::Data<S>,
|
||||||
|
@ -532,25 +534,24 @@ where
|
||||||
async fn ranged_file_resp<S: Store>(
|
async fn ranged_file_resp<S: Store>(
|
||||||
store: &S,
|
store: &S,
|
||||||
identifier: S::Identifier,
|
identifier: S::Identifier,
|
||||||
range: Option<range::RangeHeader>,
|
range: Option<web::Header<Range>>,
|
||||||
details: Details,
|
details: Details,
|
||||||
) -> Result<HttpResponse, Error>
|
) -> Result<HttpResponse, Error>
|
||||||
where
|
where
|
||||||
Error: From<S::Error>,
|
Error: From<S::Error>,
|
||||||
{
|
{
|
||||||
let (builder, stream) = if let Some(range_header) = range {
|
let (builder, stream) = if let Some(web::Header(range_header)) = range {
|
||||||
//Range header exists - return as ranged
|
//Range header exists - return as ranged
|
||||||
if let Some(range) = range_header.single_bytes_range() {
|
if let Some(range) = range::single_bytes_range(&range_header) {
|
||||||
let len = store.len(&identifier).await?;
|
let len = store.len(&identifier).await?;
|
||||||
|
|
||||||
if let Some(content_range) = range.to_content_range(len) {
|
if let Some(content_range) = range::to_content_range(range, len) {
|
||||||
let mut builder = HttpResponse::PartialContent();
|
let mut builder = HttpResponse::PartialContent();
|
||||||
builder.insert_header(content_range);
|
builder.insert_header(content_range);
|
||||||
|
|
||||||
(
|
(
|
||||||
builder,
|
builder,
|
||||||
Either::left(Either::left(map_error::map_crate_error(
|
Either::left(Either::left(map_error::map_crate_error(
|
||||||
range.chop_store(store, identifier).await?,
|
range::chop_store(range, store, &identifier, len).await?,
|
||||||
))),
|
))),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
|
|
206
src/range.rs
206
src/range.rs
|
@ -3,180 +3,62 @@ use crate::{
|
||||||
store::Store,
|
store::Store,
|
||||||
};
|
};
|
||||||
use actix_web::{
|
use actix_web::{
|
||||||
dev::Payload,
|
http::header::{ByteRangeSpec, ContentRange, ContentRangeSpec, Range},
|
||||||
http::header::{ContentRange, ContentRangeSpec, HeaderValue},
|
|
||||||
web::Bytes,
|
web::Bytes,
|
||||||
FromRequest, HttpRequest,
|
|
||||||
};
|
};
|
||||||
use futures_util::stream::{once, Stream};
|
use futures_util::stream::{once, Stream};
|
||||||
use std::future::ready;
|
use std::future::ready;
|
||||||
|
|
||||||
#[derive(Debug)]
|
pub(crate) fn chop_bytes(
|
||||||
pub(crate) enum Range {
|
byte_range: &ByteRangeSpec,
|
||||||
Start(u64),
|
bytes: Bytes,
|
||||||
SuffixLength(u64),
|
length: u64,
|
||||||
Segment(u64, u64),
|
) -> Result<impl Stream<Item = Result<Bytes, Error>>, Error> {
|
||||||
|
if let Some((start, end)) = byte_range.to_satisfiable_range(length) {
|
||||||
|
return Ok(once(ready(Ok(bytes.slice(start as usize..end as usize)))));
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(UploadError::Range.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
pub(crate) async fn chop_store<S: Store>(
|
||||||
pub(crate) struct RangeHeader {
|
byte_range: &ByteRangeSpec,
|
||||||
unit: String,
|
store: &S,
|
||||||
ranges: Vec<Range>,
|
identifier: &S::Identifier,
|
||||||
|
length: u64,
|
||||||
|
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, Error>
|
||||||
|
where
|
||||||
|
Error: From<S::Error>,
|
||||||
|
{
|
||||||
|
if let Some((start, end)) = byte_range.to_satisfiable_range(length) {
|
||||||
|
return Ok(store
|
||||||
|
.to_stream(identifier, Some(start), Some(end.saturating_sub(start)))
|
||||||
|
.await?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Err(UploadError::Range.into())
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Range {
|
pub(crate) fn single_bytes_range(range: &Range) -> Option<&ByteRangeSpec> {
|
||||||
pub(crate) fn to_content_range(&self, instance_length: u64) -> Option<ContentRange> {
|
if let Range::Bytes(ranges) = range {
|
||||||
match self {
|
if ranges.len() == 1 {
|
||||||
Range::Start(start) => {
|
return ranges.get(0);
|
||||||
if *start >= instance_length {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(ContentRange(ContentRangeSpec::Bytes {
|
|
||||||
range: Some((*start, instance_length - *start)),
|
|
||||||
instance_length: Some(instance_length),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
Range::SuffixLength(from_start) => {
|
|
||||||
if *from_start > instance_length {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(ContentRange(ContentRangeSpec::Bytes {
|
|
||||||
range: Some((0, *from_start)),
|
|
||||||
instance_length: Some(instance_length),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
Range::Segment(start, end) => {
|
|
||||||
if *start >= instance_length || *end > instance_length {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(ContentRange(ContentRangeSpec::Bytes {
|
|
||||||
range: Some((*start, *end)),
|
|
||||||
instance_length: Some(instance_length),
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn chop_bytes(&self, bytes: Bytes) -> impl Stream<Item = Result<Bytes, Error>> {
|
None
|
||||||
match self {
|
|
||||||
Range::Start(start) => once(ready(Ok(bytes.slice(*start as usize..)))),
|
|
||||||
Range::SuffixLength(from_start) => once(ready(Ok(bytes.slice(..*from_start as usize)))),
|
|
||||||
Range::Segment(start, end) => {
|
|
||||||
once(ready(Ok(bytes.slice(*start as usize..*end as usize))))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) async fn chop_store<S: Store>(
|
|
||||||
&self,
|
|
||||||
store: &S,
|
|
||||||
identifier: S::Identifier,
|
|
||||||
) -> Result<impl Stream<Item = std::io::Result<Bytes>>, Error>
|
|
||||||
where
|
|
||||||
Error: From<S::Error>,
|
|
||||||
{
|
|
||||||
match self {
|
|
||||||
Range::Start(start) => Ok(store.to_stream(&identifier, Some(*start), None).await?),
|
|
||||||
Range::SuffixLength(from_start) => Ok(store
|
|
||||||
.to_stream(&identifier, None, Some(*from_start))
|
|
||||||
.await?),
|
|
||||||
Range::Segment(start, end) => Ok(store
|
|
||||||
.to_stream(&identifier, Some(*start), Some(end.saturating_sub(*start)))
|
|
||||||
.await?),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RangeHeader {
|
pub(crate) fn to_content_range(
|
||||||
pub(crate) fn single_bytes_range(&self) -> Option<&'_ Range> {
|
byte_range: &ByteRangeSpec,
|
||||||
if self.ranges.len() == 1 && self.unit == "bytes" {
|
instance_length: u64,
|
||||||
self.ranges.get(0)
|
) -> Option<ContentRange> {
|
||||||
} else {
|
byte_range
|
||||||
None
|
.to_satisfiable_range(instance_length)
|
||||||
}
|
.map(|range| {
|
||||||
}
|
ContentRange(ContentRangeSpec::Bytes {
|
||||||
}
|
range: Some(range),
|
||||||
|
instance_length: Some(instance_length),
|
||||||
impl FromRequest for RangeHeader {
|
})
|
||||||
type Error = Error;
|
})
|
||||||
type Future = std::future::Ready<Result<Self, Self::Error>>;
|
|
||||||
|
|
||||||
fn from_request(req: &HttpRequest, _: &mut Payload) -> Self::Future {
|
|
||||||
if let Some(range_head) = req.headers().get("Range") {
|
|
||||||
ready(parse_range_header(range_head).map_err(|e| {
|
|
||||||
tracing::warn!("Failed to parse range header: {}", e);
|
|
||||||
e
|
|
||||||
}))
|
|
||||||
} else {
|
|
||||||
ready(Err(UploadError::ParseReq(
|
|
||||||
"Range header missing".to_string(),
|
|
||||||
)
|
|
||||||
.into()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_range_header(range_head: &HeaderValue) -> Result<RangeHeader, Error> {
|
|
||||||
let range_head_str = range_head.to_str().map_err(|_| {
|
|
||||||
UploadError::ParseReq("Range header contains non-utf8 characters".to_string())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let eq_pos = range_head_str
|
|
||||||
.find('=')
|
|
||||||
.ok_or_else(|| UploadError::ParseReq("Malformed Range Header".to_string()))?;
|
|
||||||
|
|
||||||
let (unit, ranges) = range_head_str.split_at(eq_pos);
|
|
||||||
let ranges = ranges.trim_start_matches('=');
|
|
||||||
|
|
||||||
let ranges = ranges
|
|
||||||
.split(',')
|
|
||||||
.map(parse_range)
|
|
||||||
.collect::<Result<Vec<Range>, Error>>()?;
|
|
||||||
|
|
||||||
Ok(RangeHeader {
|
|
||||||
unit: unit.to_owned(),
|
|
||||||
ranges,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_range(s: &str) -> Result<Range, Error> {
|
|
||||||
let dash_pos = s
|
|
||||||
.find('-')
|
|
||||||
.ok_or_else(|| UploadError::ParseReq("Mailformed Range Bound".to_string()))?;
|
|
||||||
|
|
||||||
let (start, end) = s.split_at(dash_pos);
|
|
||||||
let start = start.trim();
|
|
||||||
let end = end.trim_start_matches('-').trim();
|
|
||||||
|
|
||||||
if start.is_empty() && end.is_empty() {
|
|
||||||
Err(UploadError::ParseReq("Malformed content range".to_string()).into())
|
|
||||||
} else if start.is_empty() {
|
|
||||||
let suffix_length = end.parse().map_err(|_| {
|
|
||||||
UploadError::ParseReq("Cannot parse suffix length for range header".to_string())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(Range::SuffixLength(suffix_length))
|
|
||||||
} else if end.is_empty() {
|
|
||||||
let range_start = start.parse().map_err(|_| {
|
|
||||||
UploadError::ParseReq("Cannot parse range start for range header".to_string())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(Range::Start(range_start))
|
|
||||||
} else {
|
|
||||||
let range_start = start.parse().map_err(|_| {
|
|
||||||
UploadError::ParseReq("Cannot parse range start for range header".to_string())
|
|
||||||
})?;
|
|
||||||
let range_end = end.parse().map_err(|_| {
|
|
||||||
UploadError::ParseReq("Cannot parse range end for range header".to_string())
|
|
||||||
})?;
|
|
||||||
|
|
||||||
if range_start > range_end {
|
|
||||||
return Err(UploadError::Range.into());
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Range::Segment(range_start, range_end))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue