ap: add proper document loader

This still lacks caching, but it is slightly more
advanced than just using the ReqwestLoader that
the json_ld crate provides.
This commit is contained in:
anna 2023-01-18 20:04:14 +01:00
parent 7124590ff3
commit 64739eeead
Signed by: fef
GPG key ID: EC22E476DC2D3D84
10 changed files with 721 additions and 10 deletions

5
Cargo.lock generated
View file

@ -1166,7 +1166,6 @@ version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f103ff1c30bf42b3b7d09c69cbe12869e5ad42497638c5199d83de6fd7d7b13e"
dependencies = [
"bytes",
"contextual",
"derivative",
"futures",
@ -1184,7 +1183,6 @@ dependencies = [
"permutohedron",
"pretty_dtoa",
"rdf-types",
"reqwest",
"ryu-js",
"smallvec",
"static-iref",
@ -1560,14 +1558,17 @@ dependencies = [
"actix-web",
"argon2",
"async-trait",
"bytes",
"chrono",
"dotenvy",
"futures",
"hashbrown 0.13.1",
"iref",
"json-ld",
"jsonwebtoken",
"locspan",
"log",
"mime",
"pretty_env_logger",
"rdf-types",
"reqwest",

View file

@ -8,14 +8,17 @@ actix-rt = "2.7"
actix-web = "4"
argon2 = "0.4"
async-trait = "0.1.59"
bytes = "1.3"
chrono = { version = "0.4", features = [ "alloc", "clock", "serde" ] }
dotenvy = "0.15.6"
futures = "0.3"
hashbrown = "0.13"
iref = "2.2"
json-ld = { version = "0.12", features = [ "reqwest" ] }
json-ld = { version = "0.12" }
jsonwebtoken = { version = "8", default-features = false }
locspan = "0.7"
log = "0.4"
mime = "0.3"
pretty_env_logger = "0.4"
rdf-types = "0.12"
reqwest = { version = "0.11", features = [ "rustls" ] }

View file

@ -0,0 +1,265 @@
// This file is an adaptation of
// <https://github.com/timothee-haudebourg/json-ld/blob/0.12.1/core/src/loader/reqwest/content_type.rs>.
//
// Copyright (C) 2022 Timothée Haudebourg et al.
// Licensed under either the Apache License, Version 2.0; or the MIT License.
// See <https://github.com/timothee-haudebourg/json-ld/tree/0.12.1> for details.
//
// Modified by anna <owo@fef.moe> to accept the `application/activity+json`
// MIME type as proper JSON-LD.
use hashbrown::HashMap;
use mime::Mime;
use reqwest::header::HeaderValue;
use std::str::FromStr;
/// Helper structure for parsing the `Content-Type` header for JSON-LD.
pub struct ContentType {
mime: Mime,
params: HashMap<Vec<u8>, Vec<u8>>,
}
impl ContentType {
pub fn from_header(value: &HeaderValue) -> Option<ContentType> {
enum State {
Mime,
NextParam,
BeginKey,
Key,
BeginValue,
QuotedValue,
Value,
}
let mut state = State::Mime;
let mut mime = Vec::new();
let mut current_key = Vec::new();
let mut current_value = Vec::new();
let mut params = HashMap::new();
let mut bytes = value.as_bytes().iter();
loop {
match state {
State::Mime => match bytes.next().copied() {
Some(b';') => state = State::BeginKey,
Some(b) => mime.push(b),
None => break,
},
State::NextParam => match bytes.next().copied() {
Some(b';') => state = State::BeginKey,
Some(_) => return None,
None => break,
},
State::BeginKey => match bytes.next().copied() {
Some(b' ') => {}
Some(b) => {
current_key.push(b);
state = State::Key;
}
None => return None,
},
State::Key => match bytes.next().copied() {
Some(b'=') => state = State::BeginValue,
Some(b) => current_key.push(b),
None => return None,
},
State::BeginValue => match bytes.next().copied() {
Some(b'=') => state = State::QuotedValue,
Some(b) => {
state = State::Value;
current_value.push(b);
}
_ => return None,
},
State::QuotedValue => match bytes.next().copied() {
Some(b'"') => {
params.insert(
std::mem::take(&mut current_key),
std::mem::take(&mut current_value),
);
state = State::NextParam;
}
Some(b) => current_value.push(b),
None => {
params.insert(
std::mem::take(&mut current_key),
std::mem::take(&mut current_value),
);
break;
}
},
State::Value => match bytes.next().copied() {
Some(b';') => {
params.insert(
std::mem::take(&mut current_key),
std::mem::take(&mut current_value),
);
state = State::BeginKey;
}
Some(b) => current_value.push(b),
None => {
params.insert(
std::mem::take(&mut current_key),
std::mem::take(&mut current_value),
);
break;
}
},
}
}
Mime::from_str(std::str::from_utf8(&mime).ok()?)
.map(|mime| ContentType { mime, params })
.ok()
}
pub fn is_json_ld(&self) -> bool {
[
"application/activity+json",
"application/ld+json",
"application/json",
]
.iter()
.any(|mime| *mime == self.mime)
}
pub fn is_proper_json_ld(&self) -> bool {
["application/activity+json", "application/ld+json"]
.iter()
.any(|mime| *mime == self.mime)
}
pub fn mime(&self) -> &Mime {
&self.mime
}
pub fn into_mime(self) -> Mime {
self.mime
}
pub fn profile(&self) -> Option<&[u8]> {
self.params.get(b"profile".as_slice()).map(Vec::as_slice)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_content_type_1() {
let content_type = ContentType::new(
&HeaderValue::from_str(
"application/ld+json;profile=http://www.w3.org/ns/json-ld#expanded",
)
.unwrap(),
)
.unwrap();
assert_eq!(*content_type.media_type(), "application/ld+json");
assert_eq!(
content_type.profile(),
Some(b"http://www.w3.org/ns/json-ld#expanded".as_slice())
)
}
#[test]
fn parse_content_type_2() {
let content_type = ContentType::new(
&HeaderValue::from_str(
"application/ld+json; profile=http://www.w3.org/ns/json-ld#expanded",
)
.unwrap(),
)
.unwrap();
assert_eq!(*content_type.media_type(), "application/ld+json");
assert_eq!(
content_type.profile(),
Some(b"http://www.w3.org/ns/json-ld#expanded".as_slice())
)
}
#[test]
fn parse_content_type_3() {
let content_type = ContentType::new(
&HeaderValue::from_str(
"application/ld+json; profile=http://www.w3.org/ns/json-ld#expanded; q=1",
)
.unwrap(),
)
.unwrap();
assert_eq!(*content_type.media_type(), "application/ld+json");
assert_eq!(
content_type.profile(),
Some(b"http://www.w3.org/ns/json-ld#expanded".as_slice())
)
}
#[test]
fn parse_content_type_4() {
let content_type = ContentType::new(
&HeaderValue::from_str(
"application/ld+json; profile=\"http://www.w3.org/ns/json-ld#expanded\"; q=1",
)
.unwrap(),
)
.unwrap();
assert_eq!(*content_type.media_type(), "application/ld+json");
assert_eq!(
content_type.profile(),
Some(b"http://www.w3.org/ns/json-ld#expanded".as_slice())
)
}
#[test]
fn parse_content_type_5() {
let content_type = ContentType::new(
&HeaderValue::from_str(
"application/ld+json; profile=\"http://www.w3.org/ns/json-ld#expanded\"",
)
.unwrap(),
)
.unwrap();
assert_eq!(*content_type.media_type(), "application/ld+json");
assert_eq!(
content_type.profile(),
Some(b"http://www.w3.org/ns/json-ld#expanded".as_slice())
)
}
#[test]
fn parse_content_type_6() {
let content_type = ContentType::new(
&HeaderValue::from_str(
"application/ld+json;profile=\"http://www.w3.org/ns/json-ld#expanded\"; q=1",
)
.unwrap(),
)
.unwrap();
assert_eq!(*content_type.media_type(), "application/ld+json");
assert_eq!(
content_type.profile(),
Some(b"http://www.w3.org/ns/json-ld#expanded".as_slice())
)
}
#[test]
fn parse_content_type_7() {
let content_type = ContentType::new(&HeaderValue::from_str("application/ld+json; profile=\"http://www.w3.org/ns/json-ld#flattened http://www.w3.org/ns/json-ld#compacted\"; q=1").unwrap()).unwrap();
assert_eq!(*content_type.media_type(), "application/ld+json");
assert_eq!(
content_type.profile(),
Some(
b"http://www.w3.org/ns/json-ld#flattened http://www.w3.org/ns/json-ld#compacted"
.as_slice()
)
)
}
#[test]
fn parse_content_type_8() {
let content_type =
ContentType::new(&HeaderValue::from_str("application/ld+json").unwrap()).unwrap();
assert_eq!(*content_type.media_type(), "application/ld+json");
}
}

145
src/ap/loader/link.rs Normal file
View file

@ -0,0 +1,145 @@
// This file is an adaptation of
// <https://github.com/timothee-haudebourg/json-ld/blob/0.12.1/core/src/loader/reqwest/link.rs>.
//
// Copyright (C) 2022 Timothée Haudebourg et al.
// Licensed under either the Apache License, Version 2.0; or the MIT License.
// See <https://github.com/timothee-haudebourg/json-ld/tree/0.12.1> for details.
use iref::{IriRef, IriRefBuf};
use reqwest::header::HeaderValue;
use std::collections::HashMap;
pub struct Link {
href: IriRefBuf,
params: HashMap<Vec<u8>, Vec<u8>>,
}
impl Link {
pub fn from_header(value: &HeaderValue) -> Option<Self> {
enum State {
BeginHref,
Href,
NextParam,
BeginKey,
Key,
BeginValue,
Value,
}
let mut state = State::BeginHref;
let mut href = Vec::new();
let mut current_key = Vec::new();
let mut current_value = Vec::new();
let mut params = HashMap::new();
let mut bytes = value.as_bytes().iter();
loop {
match state {
State::BeginHref => match bytes.next().copied() {
Some(b'<') => state = State::Href,
_ => break None,
},
State::Href => match bytes.next().copied() {
Some(b'>') => state = State::NextParam,
Some(b) => {
href.push(b);
}
None => break None,
},
State::NextParam => match bytes.next().copied() {
Some(b';') => state = State::BeginKey,
Some(_) => break None,
None => {
break match IriRefBuf::from_vec(href) {
Ok(href) => Some(Self { href, params }),
Err(_) => None,
}
}
},
State::BeginKey => match bytes.next().copied() {
Some(b' ') => {}
Some(b) => {
current_key.push(b);
state = State::Key
}
None => break None,
},
State::Key => match bytes.next().copied() {
Some(b'=') => state = State::BeginValue,
Some(b) => current_key.push(b),
None => break None,
},
State::BeginValue => match bytes.next().copied() {
Some(b'"') => state = State::Value,
_ => break None,
},
State::Value => match bytes.next().copied() {
Some(b'"') => {
params.insert(
std::mem::take(&mut current_key),
std::mem::take(&mut current_value),
);
state = State::NextParam
}
Some(b) => current_value.push(b),
None => break None,
},
}
}
}
pub fn href(&self) -> IriRef {
self.href.as_iri_ref()
}
pub fn rel(&self) -> Option<&[u8]> {
self.params.get(b"rel".as_slice()).map(Vec::as_slice)
}
pub fn typ(&self) -> Option<&[u8]> {
self.params.get(b"type".as_slice()).map(Vec::as_slice)
}
pub fn is_proper_json_ld(&self) -> bool {
self.typ()
.map(|typ| typ == b"application/activity+json" || typ == b"application/ld+json")
.unwrap_or(false)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_link_1() {
let link = Link::from_header(
&HeaderValue::from_str(
"<http://www.example.org/context>; rel=\"context\"; type=\"application/ld+json\"",
)
.unwrap(),
)
.unwrap();
assert_eq!(link.href(), "http://www.example.org/context");
assert_eq!(link.rel(), Some(b"context".as_slice()));
assert_eq!(link.typ(), Some(b"application/ld+json".as_slice()))
}
#[test]
fn parse_link_2() {
let link = Link::from_header(&HeaderValue::from_str("<http://www.example.org/context>; rel=\"context\"; type=\"application/ld+json\"; foo=\"bar\"").unwrap()).unwrap();
assert_eq!(link.href(), "http://www.example.org/context");
assert_eq!(link.rel(), Some(b"context".as_slice()));
assert_eq!(link.typ(), Some(b"application/ld+json".as_slice()))
}
#[test]
fn parse_link_3() {
let link =
Link::from_header(&HeaderValue::from_str("<http://www.example.org/context>").unwrap())
.unwrap();
assert_eq!(link.href(), "http://www.example.org/context")
}
}

228
src/ap/loader/mod.rs Normal file
View file

@ -0,0 +1,228 @@
// This file is an adaptation of
// <https://github.com/timothee-haudebourg/json-ld/blob/0.12.1/core/src/loader/reqwest.rs>.
//
// Copyright (C) 2022 Timothée Haudebourg et al.
// Licensed under either the Apache License, Version 2.0; or the MIT License.
// See <https://github.com/timothee-haudebourg/json-ld/tree/0.12.1> for details.
//
// Modified by anna <owo@fef.moe> to use nyanoblog's caching infrastructure
// and work with the `application/activity+json` MIME type.
//
// FIXME: This file is, frankly, just pure chaos and desperately needs a refactor.
use crate::state::AppState;
use bytes::Bytes;
use futures::future::{BoxFuture, FutureExt};
use hashbrown::{HashMap, HashSet};
use iref::{Iri, IriBuf};
use json_ld::{syntax::Parse, Loader, LoadingResult, Profile, RemoteDocument, Value};
use locspan::{Meta, Span};
use mime::Mime;
use rdf_types::vocabulary::Index;
use rdf_types::{IriVocabulary, IriVocabularyMut};
use reqwest::header::{HeaderValue, ACCEPT, CONTENT_TYPE, LINK, LOCATION};
use reqwest::StatusCode;
use std::hash::Hash;
use std::str::FromStr;
use tokio::sync::OnceCell;
mod content_type;
use content_type::*;
mod link;
use link::*;
use crate::core::*;
use crate::headers;
use crate::util::http;
pub struct CachedLoader<I = IriBuf, M = Span, T = json_ld::syntax::Value<M>> {
state: AppState,
parser: Box<DynParser<I, M, T>>,
}
type DynParser<I, M, T> =
dyn 'static + Send + Sync + FnMut(&dyn IriVocabulary<Iri = I>, &I, Bytes) -> Result<Meta<T, M>>;
impl CachedLoader {
pub fn new(state: AppState) -> Self {
CachedLoader::new_with(state, move |vocab, file, bytes| {
let content = String::from_utf8(bytes.to_vec())
.map_err(|e| Error::MalformedApub(format!("Invalid encoding: {e}")))?;
json_ld::syntax::Value::parse_str(&content, |span| span)
.map_err(|e| Error::MalformedApub(format!("Syntax error in JSON document: {e}")))
})
}
}
impl<I, M, T> CachedLoader<I, M, T> {
pub fn new_with(
state: AppState,
parser: impl 'static
+ Send
+ Sync
+ FnMut(&dyn IriVocabulary<Iri = I>, &I, Bytes) -> Result<Meta<T, M>>,
) -> Self {
CachedLoader {
state,
parser: Box::new(parser),
}
}
}
impl<I: Clone + Eq + Hash + Send + Sync, M: Send, T: Clone + Send> Loader<I, M>
for CachedLoader<I, M, T>
{
type Output = T;
type Error = Error;
fn load_with<'a>(
&'a mut self,
vocabulary: &'a mut (impl Send + Sync + IriVocabularyMut<Iri = I>),
url: I,
) -> BoxFuture<'a, Result<RemoteDocument<I, M, T>>>
where
I: 'a,
{
return self.load_chain(vocabulary, url).boxed();
}
}
impl<I: Clone + Eq + Hash + Send + Sync, M: Send, T: Clone + Send> CachedLoader<I, M, T> {
async fn load_chain(
&mut self,
vocabulary: &mut (impl Send + Sync + IriVocabularyMut<Iri = I>),
mut url: I,
) -> Result<RemoteDocument<I, M, T>> {
const MAX_REDIRECTS: u32 = 8;
const ACCEPT_ACTIVITY_PUB: &'static str =
"application/activity+json, application/ld+json, application/json";
let mut redirect_count = 0;
'next_url: loop {
if redirect_count > MAX_REDIRECTS {
return Err(Error::NotFound);
}
debug!(target: "ap", "downloading {}", vocabulary.iri(&url).unwrap().as_str());
let response = http::get(
&self.state,
vocabulary.iri(&url).unwrap().as_str(),
headers! {
ACCEPT => ACCEPT_ACTIVITY_PUB,
},
)
.await?;
match response.status() {
StatusCode::OK => {
let mut content_types = response
.headers()
.get_all(CONTENT_TYPE)
.into_iter()
.filter_map(ContentType::from_header);
match content_types.find(ContentType::is_json_ld) {
Some(content_type) => {
let mut context_url = None;
if !content_type.is_proper_json_ld() {
context_url = self.get_context_url(vocabulary, &url, &response)?;
}
let mut profile = HashSet::new();
for p in content_type
.profile()
.into_iter()
.flat_map(|p| p.split(|b| *b == b' '))
{
if let Ok(iri) = Iri::new(p) {
profile.insert(Profile::new(iri, vocabulary));
}
}
let bytes = response.bytes().await.map_err(|e| Error::Reqwest(e))?;
let document = (*self.parser)(vocabulary, &url, bytes)
.map_err(|e| Error::MalformedApub(e.to_string()))?;
break Ok(RemoteDocument::new_full(
Some(url),
Some(content_type.into_mime()),
context_url,
profile,
document,
));
}
None => {
debug!(target: "ap", "no valid media type found");
for link in response.headers().get_all(LINK).into_iter() {
if let Some(link) = Link::from_header(link) {
if link.rel() == Some(b"alternate") && link.is_proper_json_ld()
{
debug!(target: "ap", "link found");
let u = link.href().resolved(vocabulary.iri(&url).unwrap());
url = vocabulary.insert(u.as_iri());
redirect_count += 1;
continue 'next_url;
}
}
}
break Err(Error::MalformedApub(String::from("Invalid content type")));
}
}
}
code if code.is_redirection() => {
if response.status() == StatusCode::SEE_OTHER {
break Err(Error::NotFound);
} else {
match response.headers().get(LOCATION) {
Some(location) => {
let u = Iri::new(location.as_bytes()).map_err(|_| {
Error::MalformedApub(String::from("Invalid redirect URL"))
})?;
url = vocabulary.insert(u);
}
None => {
break Err(Error::MalformedApub(String::from(
"Missing Location leader in HTTP redirect",
)))
}
}
}
}
code => {
break Err(Error::MalformedApub(format!(
"HTTP request failed with status code {:?}",
code.as_u16()
)))
}
}
}
}
fn get_context_url(
&self,
vocabulary: &mut (impl Send + Sync + IriVocabularyMut<Iri = I>),
url: &I,
response: &reqwest::Response,
) -> Result<Option<I>> {
let mut context_url = None;
for link in response.headers().get_all(LINK).into_iter() {
if let Some(link) = Link::from_header(link) {
if link.rel() == Some(b"alternate")
&& link.typ() == Some(b"https://www.w3.org/ns/json-ld#context")
{
if context_url.is_some() {
return Err(Error::MalformedApub(String::from(
"Multiple contexts in Link headers",
)));
}
let u = link.href().resolved(vocabulary.iri(url).unwrap());
context_url = Some(vocabulary.insert(u.as_iri()));
}
}
}
Ok(context_url)
}
}

View file

@ -1,2 +1,3 @@
pub mod context;
pub mod object;
pub mod document;
pub mod loader;
pub mod processor;

47
src/ap/processor.rs Normal file
View file

@ -0,0 +1,47 @@
use async_trait::async_trait;
use iref::{Iri, IriBuf};
use json_ld::{
object::TypeRef,
syntax::{MetaValue, Parse, Value},
Expand, IndexedObject, Loader, RemoteDocument,
};
use locspan::{Meta, Span};
use rdf_types::{vocabulary::Index, BlankIdBuf, IndexVocabulary, IriVocabulary, IriVocabularyMut};
use serde::{Deserialize, Serialize};
use static_iref::iri;
use std::fmt;
use crate::ap::loader::CachedLoader;
use crate::core::*;
use crate::job::Job;
use crate::state::AppState;
/// Main API for handling ActivityPub ingress, called by [`InboxWorker`].
pub async fn process_document(state: &AppState, raw: &String) -> Result<()> {
let document = Value::parse_str(raw, |span| span)
.map_err(|e| Error::MalformedApub(format!("Could not parse document: {e}")))?;
let rd = RemoteDocument::new(
None,
Some("application/activity+json".parse().unwrap()),
document,
);
let mut loader = CachedLoader::new(state.clone());
let rd = rd.expand(&mut loader).await.unwrap();
// this loop will usually only run once (one object per request)
for object in rd.into_value() {
let id = object.id().ok_or(Error::MalformedApub(String::from(
"Document does not have an id",
)))?;
let mut typ = None;
for t in object.types() {
typ = Some(t);
}
let typ = typ.ok_or(Error::MalformedApub(String::from(
"Document does not have a type",
)))?;
debug!("Object id=\"{id}\" type=\"{typ}\"");
}
Ok(())
}

View file

@ -27,6 +27,8 @@ pub enum Error {
BadCredentials,
BadBearcap,
BadRequest,
MalformedApub(String),
Reqwest(reqwest::Error),
}
pub fn utc_now() -> NaiveDateTime {
@ -50,6 +52,7 @@ impl ResponseError for Error {
Error::BadCredentials => StatusCode::UNAUTHORIZED,
Error::BadBearcap => StatusCode::UNPROCESSABLE_ENTITY,
Error::BadRequest => StatusCode::BAD_REQUEST,
Error::MalformedApub(_) => StatusCode::UNPROCESSABLE_ENTITY,
_ => StatusCode::INTERNAL_SERVER_ERROR,
}
}
@ -71,6 +74,8 @@ impl fmt::Display for Error {
Error::BadCredentials => write!(f, "Invalid user name or password"),
Error::BadBearcap => write!(f, "Invalid bearcap URL"),
Error::BadRequest => write!(f, "Bad request"),
Error::MalformedApub(msg) => write!(f, "Malformed ActivityPub: {msg}"),
Error::Reqwest(reqwest_error) => reqwest_error.fmt(f),
}
}
}
@ -108,6 +113,12 @@ impl From<jsonwebtoken::errors::Error> for Error {
}
}
impl From<reqwest::Error> for Error {
fn from(e: reqwest::Error) -> Error {
Error::Reqwest(e)
}
}
impl Serialize for Error {
fn serialize<S>(&self, serializer: S) -> std::result::Result<S::Ok, S::Error>
where

View file

@ -1,3 +1,4 @@
use crate::ap::processor::process_document;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::fmt;
@ -20,7 +21,7 @@ impl InboxWorker {
#[async_trait]
impl Job for InboxWorker {
async fn perform(&self, state: &AppState) -> Result<()> {
todo!()
process_document(state, &self.raw).await
}
fn name(&self) -> &'static str {

View file

@ -1,13 +1,22 @@
use crate::ap::processor::InboxWorker;
use actix_web::{post, web, HttpResponse, HttpRequest};
use actix_web::{post, web, HttpRequest, HttpResponse};
use crate::core::*;
use crate::job::inbox::InboxWorker;
use crate::state::AppState;
#[post("")]
async fn post_inbox(body: String, request: HttpRequest, state: AppState) -> Result<HttpResponse> {
let content_type = request.headers().get("Content-Type").ok_or(Error::BadRequest)?.to_str()?;
if content_type != "application/activity+json" {
const CONTENT_TYPES: &[&'static str] = &[
"application/activity+json",
"application/ld+json",
"application/json",
];
let content_type = request
.headers()
.get("Content-Type")
.ok_or(Error::BadRequest)?
.to_str()?;
if CONTENT_TYPES.iter().all(|typ| *typ != content_type) {
return Ok(HttpResponse::BadRequest().finish());
}