You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

236 lines
8.7 KiB
Rust

// This file is an adaptation of
// <https://github.com/timothee-haudebourg/json-ld/blob/0.12.1/core/src/loader/reqwest.rs>.
//
// Copyright (C) 2022 Timothée Haudebourg et al.
// Licensed under either the Apache License, Version 2.0; or the MIT License.
// See <https://github.com/timothee-haudebourg/json-ld/tree/0.12.1> for details.
//
// Modified by anna <owo@fef.moe> to use nyanoblog's caching infrastructure
// and work with the `application/activity+json` MIME type.
//
// FIXME: This file is, frankly, just pure chaos and desperately needs a refactor.
use crate::state::AppState;
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use hashbrown::HashSet;
use iref::Iri;
use json_ld::{
syntax::{Parse, Value},
Loader, Profile, RemoteDocument,
};
use locspan::{Meta, Span};
use rdf_types::vocabulary::IriIndex;
use rdf_types::{IriVocabulary, IriVocabularyMut};
use reqwest::{
header::{ACCEPT, CONTENT_TYPE, LINK, LOCATION},
StatusCode,
};
use std::hash::Hash;
use crate::core::*;
use crate::headers;
use crate::util::http::{
self,
header::{content_type::ContentType, link::Link},
Response,
};
pub struct CachedLoader<I = IriIndex, M = Span, T = Value<M>> {
state: AppState,
parser: Box<DynParser<I, M, T>>,
}
type DynParser<I, M, T> =
dyn 'static + Send + Sync + FnMut(&dyn IriVocabulary<Iri = I>, &I, Bytes) -> Result<Meta<T, M>>;
impl CachedLoader {
pub fn new(state: AppState) -> Self {
CachedLoader::new_with(state, move |vocab, file, bytes| {
let content = String::from_utf8(bytes.to_vec())
.map_err(|e| Error::MalformedApub(format!("Invalid encoding: {e}")))?;
json_ld::syntax::Value::parse_str(&content, |span| span)
.map_err(|e| Error::MalformedApub(format!("Syntax error in JSON document: {e}")))
})
}
}
impl<I, M, T> CachedLoader<I, M, T> {
pub fn new_with(
state: AppState,
parser: impl 'static
+ Send
+ Sync
+ FnMut(&dyn IriVocabulary<Iri = I>, &I, Bytes) -> Result<Meta<T, M>>,
) -> Self {
CachedLoader {
state,
parser: Box::new(parser),
}
}
}
impl<I: Clone + Eq + Hash + Send + Sync, M: Send, T: Clone + Send> Loader<I, M>
for CachedLoader<I, M, T>
{
type Output = T;
type Error = Error;
fn load_with<'a>(
&'a mut self,
vocabulary: &'a mut (impl Send + Sync + IriVocabularyMut<Iri = I>),
url: I,
) -> BoxFuture<'a, Result<RemoteDocument<I, M, T>>>
where
I: 'a,
{
return self.load_chain(vocabulary, url).boxed();
}
}
impl<I: Clone + Eq + Hash + Send + Sync, M: Send, T: Clone + Send> CachedLoader<I, M, T> {
async fn load_chain(
&mut self,
vocabulary: &mut (impl Send + Sync + IriVocabularyMut<Iri = I>),
mut url: I,
) -> Result<RemoteDocument<I, M, T>> {
const MAX_REDIRECTS: u32 = 8;
const ACCEPT_ACTIVITY_PUB: &str =
"application/activity+json, application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\", application/json";
let mut redirect_count = 0;
'next_url: loop {
if redirect_count > MAX_REDIRECTS {
return Err(Error::MalformedApub(format!(
"Refusing to follow more than {MAX_REDIRECTS} redirects"
)));
}
let iri = vocabulary
.iri(&url)
.ok_or_else(|| Error::MalformedApub(String::from("Unresolved IRI")))?;
debug!(target: "ap", "downloading {iri}");
let response = http::get(
&self.state,
iri.as_str(),
headers! {
ACCEPT => ACCEPT_ACTIVITY_PUB,
},
)
.await?;
match response.status() {
StatusCode::OK => {
let mut content_types = response
.headers()
.get_all(CONTENT_TYPE)
.into_iter()
.filter_map(ContentType::from_header);
match content_types.find(ContentType::is_json_ld) {
Some(content_type) => {
let mut context_url = None;
if !content_type.is_proper_json_ld() {
context_url = self.get_context_url(vocabulary, &url, &response)?;
}
let profile: HashSet<Profile<I>> = content_type
.profile()
.into_iter()
.flat_map(|profile| profile.split(' '))
.filter_map(|p| {
Iri::new(p).ok().map(|iri| Profile::new(iri, vocabulary))
})
.collect();
let bytes = response.bytes().await?;
let document = (*self.parser)(vocabulary, &url, bytes)
.map_err(|e| Error::MalformedApub(e.to_string()))?;
break Ok(RemoteDocument::new_full(
Some(url),
Some(content_type.into_mime()),
context_url,
profile,
document,
));
}
None => {
debug!(target: "ap", "no valid media type found");
for link in response.headers().get_all(LINK).into_iter() {
if let Some(link) = Link::from_header(link) {
if link.rel() == Some("alternate") && link.is_proper_json_ld() {
debug!(target: "ap", "link found");
let u = link.href().resolved(vocabulary.iri(&url).unwrap());
url = vocabulary.insert(u.as_iri());
redirect_count += 1;
continue 'next_url;
}
}
}
break Err(Error::MalformedApub(String::from("Invalid content type")));
}
}
}
code if code.is_redirection() => {
if response.status() == StatusCode::SEE_OTHER {
break Err(Error::NotFound);
} else {
match response.headers().get(LOCATION) {
Some(location) => {
let u = Iri::new(location.as_bytes()).map_err(|_| {
Error::MalformedApub(String::from("Invalid redirect URL"))
})?;
url = vocabulary.insert(u);
}
None => {
break Err(Error::MalformedApub(String::from(
"Missing Location header in HTTP redirect",
)))
}
}
}
}
code => {
break Err(Error::MalformedApub(format!(
"HTTP request failed with status code {:?}",
code.as_u16()
)))
}
}
}
}
fn get_context_url(
&self,
vocabulary: &mut (impl Send + Sync + IriVocabularyMut<Iri = I>),
url: &I,
response: &Response,
) -> Result<Option<I>> {
let mut context_url = None;
for link in response.headers().get_all(LINK).into_iter() {
if let Some(link) = Link::from_header(link) {
if link.rel() == Some("alternate")
&& link.typ() == Some("https://www.w3.org/ns/json-ld#context")
{
if context_url.is_some() {
return Err(Error::MalformedApub(String::from(
"Multiple contexts in Link headers",
)));
}
let iri_buf = link.href().resolved(
vocabulary
.iri(url)
.ok_or_else(|| Error::MalformedApub(String::from("Unresolved IRI")))?,
);
context_url = Some(vocabulary.insert(iri_buf.as_iri()));
}
}
}
Ok(context_url)
}
}