diff --git a/src/job/mod.rs b/src/job/mod.rs index 11b7212..c702f2f 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -1,47 +1,88 @@ use actix_rt::Arbiter; +use actix_web::web; use async_trait::async_trait; -use sqlx::Execute; -use std::{cell::RefCell, fmt, sync::Mutex}; -use tokio::sync::mpsc::{channel, Receiver, Sender}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Weak; +use std::time::Duration; +use serde::{Deserialize, Serialize}; +use tokio::time::sleep; use crate::core::*; -use crate::state::AppState; +use crate::state::{AppState, State}; +const MAX_RETRIES: u32 = 3; +const ERROR_DELAY: Duration = Duration::from_secs(5 * 60); + +/// A background job to be executed asynchronously. +/// +/// Jobs run in a separate execution context, detached from the one that +/// enqueued it (which is usually a request handler). This mechanism allows +/// for fast response times to requests and automatic retries on failures +/// such as network errors when a remote instance is currently offline. #[async_trait] -pub trait Job: Send + Sync + fmt::Debug { - async fn perform(&self) -> Result<()>; +pub trait Job: Send + Sync + Serialize + Deserialize<'static> { + /// Execute the errand. + /// + /// This will be called by the scheduler in a background thread. + /// If the result is an error, the scheduler may retry it again later. + async fn perform(&self, state: &AppState) -> Result<()>; + + /// Return this job's name. + /// + /// Currently only used for logging. fn name(&self) -> &'static str; } pub struct Scheduler { arbiter: Arbiter, - sender: Sender>, + counter: AtomicUsize, + state: Weak, } impl Scheduler { - pub fn new() -> Scheduler { - let (tx, rx) = channel(32); - let sched = Scheduler { + pub fn new(state: Weak) -> Scheduler { + Scheduler { arbiter: Arbiter::new(), - sender: tx, - }; - - if !sched.arbiter.spawn(worker(rx)) { - panic!("Job scheduler initialization failed"); + counter: AtomicUsize::new(1), + state, } - sched } - pub async fn schedule(&self, job: Box) { - self.sender.send(job).await.expect("Job send failed"); - } -} - -async fn worker(mut receiver: Receiver>) { - while let Some(job) = receiver.recv().await { - debug!(target: "job", "Starting job {}", job.name()); - if let Err(e) = job.perform().await { - error!(target: "job", "Service {} failed: {}", job.name(), e); + pub fn schedule(&self, job: T) + where + T: 'static + Job, + { + let id = self.counter.fetch_add(1, Ordering::Relaxed); + let state: AppState = web::Data::from( + self.state + .upgrade() + .expect("Zombie scheduler (this is supposed to be impossible)"), + ); + if !self.arbiter.spawn(worker(job, id, state)) { + panic!("Arbiter has died"); + } + } +} + +async fn worker(job: T, id: usize, state: AppState) +where + T: Job, +{ + let name = job.name(); + for n in 1..=MAX_RETRIES { + debug!(target: "job", "Starting job {name}#{id}"); + + if let Err(e) = job.perform(&state).await { + warn!(target: "job", "Job {name}#{id} failed (try {n}/{MAX_RETRIES}): {e}"); + + if n == MAX_RETRIES { + error!(target: "job", "Giving up on job {name}#{id} (retry limit reached)"); + } else { + sleep(ERROR_DELAY).await; + } + } else { + debug!(target: "job", "Job {name}#{id} succeeded"); + break; } } } diff --git a/src/state.rs b/src/state.rs index 72d3f8a..788e392 100644 --- a/src/state.rs +++ b/src/state.rs @@ -12,13 +12,12 @@ pub struct State { pub sched: Scheduler, } -pub type AppStateRaw = Arc; -pub type AppState = web::Data; +pub type AppState = web::Data; pub fn new(config: Config, db_pool: PgPool) -> AppState { - web::Data::new(Arc::new(State { + web::Data::from(Arc::new_cyclic(|ptr| State { config, repo: AppRepo::new(db_pool), - sched: Scheduler::new(), + sched: Scheduler::new(ptr.clone()), })) }