job: retry on failure, general refactor

This commit is contained in:
anna 2022-12-25 04:13:06 +01:00
parent 055a97a8e1
commit fb8bf0f316
Signed by: fef
GPG key ID: EC22E476DC2D3D84
2 changed files with 70 additions and 30 deletions

View file

@ -1,47 +1,88 @@
use actix_rt::Arbiter; use actix_rt::Arbiter;
use actix_web::web;
use async_trait::async_trait; use async_trait::async_trait;
use sqlx::Execute; use std::sync::atomic::{AtomicUsize, Ordering};
use std::{cell::RefCell, fmt, sync::Mutex}; use std::sync::Weak;
use tokio::sync::mpsc::{channel, Receiver, Sender}; use std::time::Duration;
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::core::*; use crate::core::*;
use crate::state::AppState; use crate::state::{AppState, State};
const MAX_RETRIES: u32 = 3;
const ERROR_DELAY: Duration = Duration::from_secs(5 * 60);
/// A background job to be executed asynchronously.
///
/// Jobs run in a separate execution context, detached from the one that
/// enqueued it (which is usually a request handler). This mechanism allows
/// for fast response times to requests and automatic retries on failures
/// such as network errors when a remote instance is currently offline.
#[async_trait] #[async_trait]
pub trait Job: Send + Sync + fmt::Debug { pub trait Job: Send + Sync + Serialize + Deserialize<'static> {
async fn perform(&self) -> Result<()>; /// Execute the errand.
///
/// This will be called by the scheduler in a background thread.
/// If the result is an error, the scheduler may retry it again later.
async fn perform(&self, state: &AppState) -> Result<()>;
/// Return this job's name.
///
/// Currently only used for logging.
fn name(&self) -> &'static str; fn name(&self) -> &'static str;
} }
pub struct Scheduler { pub struct Scheduler {
arbiter: Arbiter, arbiter: Arbiter,
sender: Sender<Box<dyn Job>>, counter: AtomicUsize,
state: Weak<State>,
} }
impl Scheduler { impl Scheduler {
pub fn new() -> Scheduler { pub fn new(state: Weak<State>) -> Scheduler {
let (tx, rx) = channel(32); Scheduler {
let sched = Scheduler {
arbiter: Arbiter::new(), arbiter: Arbiter::new(),
sender: tx, counter: AtomicUsize::new(1),
}; state,
if !sched.arbiter.spawn(worker(rx)) {
panic!("Job scheduler initialization failed");
} }
sched
} }
pub async fn schedule(&self, job: Box<dyn Job>) { pub fn schedule<T>(&self, job: T)
self.sender.send(job).await.expect("Job send failed"); where
} T: 'static + Job,
} {
let id = self.counter.fetch_add(1, Ordering::Relaxed);
async fn worker(mut receiver: Receiver<Box<dyn Job>>) { let state: AppState = web::Data::from(
while let Some(job) = receiver.recv().await { self.state
debug!(target: "job", "Starting job {}", job.name()); .upgrade()
if let Err(e) = job.perform().await { .expect("Zombie scheduler (this is supposed to be impossible)"),
error!(target: "job", "Service {} failed: {}", job.name(), e); );
if !self.arbiter.spawn(worker(job, id, state)) {
panic!("Arbiter has died");
}
}
}
async fn worker<T>(job: T, id: usize, state: AppState)
where
T: Job,
{
let name = job.name();
for n in 1..=MAX_RETRIES {
debug!(target: "job", "Starting job {name}#{id}");
if let Err(e) = job.perform(&state).await {
warn!(target: "job", "Job {name}#{id} failed (try {n}/{MAX_RETRIES}): {e}");
if n == MAX_RETRIES {
error!(target: "job", "Giving up on job {name}#{id} (retry limit reached)");
} else {
sleep(ERROR_DELAY).await;
}
} else {
debug!(target: "job", "Job {name}#{id} succeeded");
break;
} }
} }
} }

View file

@ -12,13 +12,12 @@ pub struct State {
pub sched: Scheduler, pub sched: Scheduler,
} }
pub type AppStateRaw = Arc<State>; pub type AppState = web::Data<State>;
pub type AppState = web::Data<AppStateRaw>;
pub fn new(config: Config, db_pool: PgPool) -> AppState { pub fn new(config: Config, db_pool: PgPool) -> AppState {
web::Data::new(Arc::new(State { web::Data::from(Arc::new_cyclic(|ptr| State {
config, config,
repo: AppRepo::new(db_pool), repo: AppRepo::new(db_pool),
sched: Scheduler::new(), sched: Scheduler::new(ptr.clone()),
})) }))
} }