You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

280 lines
9.3 KiB
Rust

// This file is an adaptation of
// <https://github.com/timothee-haudebourg/json-ld/blob/0.12.1/core/src/loader/reqwest.rs>.
//
// Copyright (C) 2022 Timothée Haudebourg et al.
// Licensed under either the Apache License, Version 2.0; or the MIT License.
// See <https://github.com/timothee-haudebourg/json-ld/tree/0.12.1> for details.
//
// Modified by anna <owo@fef.moe> to use nyanoblog's caching infrastructure
// and work with the `application/activity+json` MIME type.
//
// FIXME: This file is, frankly, just pure chaos and desperately needs a refactor.
// Update 2023-07-29: Things have improved slightly, but it's still bad.
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use hashbrown::HashSet;
use iref::Iri;
use json_ld::{syntax::Value, Loader, Profile, RemoteDocument};
use locspan::{Meta, Span};
use rdf_types::{vocabulary::IriIndex, IriVocabulary, IriVocabularyMut};
use reqwest::{
header::{ACCEPT, CONTENT_TYPE, LINK, LOCATION},
StatusCode,
};
use std::hash::Hash;
use std::ops::ControlFlow;
use crate::core::*;
use crate::headers;
use crate::state::AppState;
use crate::util::http::{
self,
header::{content_type::ContentType, link::Link},
Response,
};
pub struct CachedLoader<I = IriIndex, M = Span, T = Value<M>> {
state: AppState,
parser: Box<DynParser<I, M, T>>,
}
type DynParser<I, M, T> =
dyn 'static + Send + Sync + FnMut(&dyn IriVocabulary<Iri = I>, &I, Bytes) -> Result<Meta<T, M>>;
impl<I, M, T> CachedLoader<I, M, T> {
pub fn new_with<F>(state: AppState, parser: F) -> Self
where
F: 'static
+ Send
+ Sync
+ FnMut(&dyn IriVocabulary<Iri = I>, &I, Bytes) -> Result<Meta<T, M>>,
{
Self {
state,
parser: Box::new(parser),
}
}
}
impl<I, M, T> Loader<I, M> for CachedLoader<I, M, T>
where
I: Clone + Eq + Hash + Send + Sync,
M: Send,
T: Clone + Send,
{
type Output = T;
type Error = Error;
fn load_with<'a>(
&'a mut self,
vocabulary: &'a mut (impl Send + Sync + IriVocabularyMut<Iri = I>),
url: I,
) -> BoxFuture<'a, Result<RemoteDocument<I, M, T>>>
where
I: 'a,
{
return self.load_chain(vocabulary, url).boxed();
}
}
struct LoopState<'a, I, V> {
redirect_count: u32,
vocab: &'a mut V,
url: I,
}
impl<I, M, T> CachedLoader<I, M, T>
where
I: Clone + Eq + Hash + Send + Sync,
M: Send,
T: Clone + Send,
{
async fn load_chain(
&mut self,
vocabulary: &mut (impl Send + Sync + IriVocabularyMut<Iri = I>),
url: I,
) -> Result<RemoteDocument<I, M, T>> {
const MAX_REDIRECTS: u32 = 8;
const ACCEPT_ACTIVITY_PUB: &str =
"application/activity+json, application/ld+json; profile=\"https://www.w3.org/ns/activitystreams\", application/json";
let mut state = LoopState {
redirect_count: 0,
vocab: vocabulary,
url,
};
loop {
if state.redirect_count > MAX_REDIRECTS {
return apub_err(format!(
"Refusing to follow more than {MAX_REDIRECTS} redirects"
));
}
let iri = state
.vocab
.iri(&state.url)
.ok_or_else(|| Error::MalformedApub(String::from("Unresolved IRI")))?;
debug!(target: "ap", "downloading {iri}");
let response = http::get(
&self.state,
iri.as_str(),
headers! {
ACCEPT => ACCEPT_ACTIVITY_PUB,
},
)
.await?;
match response.status() {
StatusCode::OK => match self.handle_http_ok(&mut state, response).await {
ControlFlow::Break(result) => break result,
ControlFlow::Continue(_) => continue,
},
code if code.is_redirection() => {
match self.handle_http_redirect(&mut state, response) {
ControlFlow::Break(result) => break result,
ControlFlow::Continue(_) => continue,
}
}
code => {
break Err(Error::MalformedApub(format!(
"HTTP request failed with status code {:?}",
code.as_u16()
)))
}
}
}
}
fn get_context_url<V>(&self, vocab: &mut V, url: &I, response: &Response) -> Result<Option<I>>
where
V: Send + Sync + IriVocabularyMut<Iri = I>,
{
let mut context_url = None;
for link in response.headers().get_all(LINK).into_iter() {
if let Some(link) = Link::from_header(link) {
if link.rel() == Some("alternate")
&& link.typ() == Some("https://www.w3.org/ns/json-ld#context")
{
if context_url.is_some() {
return Err(Error::MalformedApub(String::from(
"Multiple contexts in Link headers",
)));
}
let iri_buf = link.href().resolved(
vocab
.iri(url)
.ok_or_else(|| Error::MalformedApub(String::from("Unresolved IRI")))?,
);
context_url = Some(vocab.insert(iri_buf.as_iri()));
}
}
}
Ok(context_url)
}
async fn handle_http_ok<V>(
&mut self,
state: &mut LoopState<'_, I, V>,
response: Response,
) -> ControlFlow<Result<RemoteDocument<I, M, T>>>
where
V: Send + Sync + IriVocabularyMut<Iri = I>,
{
let mut content_types = response
.headers()
.get_all(CONTENT_TYPE)
.into_iter()
.filter_map(ContentType::from_header);
match content_types.find(ContentType::is_json_ld) {
Some(content_type) => {
let mut context_url = None;
if !content_type.is_proper_json_ld() {
context_url = match self.get_context_url(state.vocab, &state.url, &response) {
Ok(context_url) => context_url,
Err(e) => return ControlFlow::Break(Err(e)),
};
}
let profile: HashSet<Profile<I>> = content_type
.profile()
.into_iter()
.flat_map(|profile| profile.split(' '))
.filter_map(|p| Iri::new(p).ok().map(|iri| Profile::new(iri, state.vocab)))
.collect();
let bytes = match response.bytes().await {
Ok(bytes) => bytes,
Err(e) => return ControlFlow::Break(Err(e)),
};
let document = match (*self.parser)(state.vocab, &state.url, bytes) {
Ok(document) => document,
Err(e) => return ControlFlow::Break(apub_err(e.to_string())),
};
ControlFlow::Break(Ok(RemoteDocument::new_full(
Some(state.url.clone()),
Some(content_type.into_mime()),
context_url,
profile,
document,
)))
}
None => {
debug!(target: "ap", "no valid media type found");
for link in response.headers().get_all(LINK).into_iter() {
if let Some(link) = Link::from_header(link) {
if link.rel() == Some("alternate") && link.is_proper_json_ld() {
debug!(target: "ap", "link found");
let u = link.href().resolved(state.vocab.iri(&state.url).unwrap());
state.url = state.vocab.insert(u.as_iri());
state.redirect_count += 1;
return ControlFlow::Continue(());
}
}
}
ControlFlow::Break(apub_err("Invalid content type"))
}
}
}
fn handle_http_redirect<V>(
&self,
state: &mut LoopState<'_, I, V>,
response: Response,
) -> ControlFlow<Result<RemoteDocument<I, M, T>>>
where
V: Send + Sync + IriVocabularyMut<Iri = I>,
{
state.redirect_count += 1;
if response.status() == StatusCode::SEE_OTHER {
return ControlFlow::Break(Err(Error::NotFound));
}
match response.headers().get(LOCATION) {
Some(location) => {
let u = match Iri::new(location.as_bytes()) {
Ok(u) => u,
Err(e) => {
return ControlFlow::Break(apub_err(format!("Invalid redirect URL: {e}")));
}
};
state.url = state.vocab.insert(u);
ControlFlow::Continue(())
}
None => ControlFlow::Break(Err(Error::MalformedApub(String::from(
"Missing Location header in HTTP redirect",
)))),
}
}
}
fn apub_err<T>(msg: impl Into<String>) -> Result<T> {
Err(Error::MalformedApub(msg.into()))
}