2022-08-18 23:35:24 +00:00

127 lines
3.8 KiB
Rust

use crate::api;
use crate::core;
use crate::repositories;
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use once_cell::sync::OnceCell;
use std::sync::Arc;
use tokio_cron_scheduler::{Job, JobScheduler};
static G_INSTANCE: OnceCell<Arc<Scheduler>> = OnceCell::new();
///
pub struct Scheduler {
pool: Pool<ConnectionManager<PgConnection>>,
sched: JobScheduler,
api_config: core::config::ApiConfig,
synchronization_history_repository: repositories::synchronization_history::repository::Repository,
member_repository: repositories::member::repository::Repository,
member_api: api::member::api::Api,
}
impl std::fmt::Debug for Scheduler {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Scheduler of api.kgon.identity").finish()
}
}
impl Scheduler {
///
pub fn get_instance(
pool: Pool<ConnectionManager<PgConnection>>,
sched: JobScheduler,
api_config: core::config::ApiConfig,
) -> Result<&'static Arc<Scheduler>, Box<dyn std::error::Error>> {
let instance = G_INSTANCE
.get_or_try_init(|| -> Result<Arc<Scheduler>, Box<dyn std::error::Error>> {
let s = Scheduler {
pool,
sched,
api_config: api_config.clone(),
synchronization_history_repository:
repositories::synchronization_history::repository::Repository::new(),
member_repository: repositories::member::repository::Repository::new(),
member_api: api::member::api::Api::new(api_config.clone()),
};
Ok(Arc::new(s))
})
.expect("");
Ok(instance)
}
pub async fn queue(&'static self) -> Result<(), std::boxed::Box<dyn std::error::Error>> {
self.list_members().await?;
Ok(())
}
async fn list_members(&'static self) -> Result<(), Box<dyn std::error::Error>> {
let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| {
Box::pin(async move {
let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async {
let conn = self.pool.get().expect("conn");
let req = api::member::models::ListMembersRequest { group_key: None };
let res = self.member_api.list_members(req).await?;
for u in res.users {
let modify_member = repositories::member::models::ModifyMember {
balance: u.cash,
balance_bota: u.cash_bota,
oriental_play: u.oriental_play,
};
self
.member_repository
.update(&conn, u.id, &modify_member)
.expect("member update");
}
self
.synchronization_history_repository
.insert(
&conn,
&repositories::synchronization_history::models::NewSynchronizationHistory {
item: repositories::synchronization::models::ITEM_MEMBERS.to_string(),
start_at,
complete_at: (chrono::Utc::now()).timestamp(),
code: 0,
message: None,
},
)
.expect("synchronization_history insert");
Ok::<(), api::core::models::Error>(())
}
.await
{
let conn = self.pool.get().expect("conn");
self
.synchronization_history_repository
.insert(
&conn,
&repositories::synchronization_history::models::NewSynchronizationHistory {
item: repositories::synchronization::models::ITEM_MEMBERS.to_string(),
start_at,
complete_at: (chrono::Utc::now()).timestamp(),
code: e.code,
message: e.msg,
},
)
.expect("synchronization_history insert");
}
})
})?;
self.sched.add(j_synchronization).await?;
Ok(())
}
}