153 lines
4.2 KiB
Rust

use crate::api;
use crate::core;
use crate::repositories;
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use once_cell::sync::OnceCell;
use std::sync::Arc;
use tokio_cron_scheduler::{Job, JobScheduler};
static G_INSTANCE: OnceCell<Arc<Scheduler>> = OnceCell::new();
///
pub struct Scheduler {
pool: Pool<ConnectionManager<PgConnection>>,
sched: JobScheduler,
api_config: core::config::ApiConfig,
synchronization_history_repository: repositories::synchronization_history::repository::Repository,
vendor_repository: repositories::vendor::repository::Repository,
vendor_api: api::vendor::api::Api,
}
impl std::fmt::Debug for Scheduler {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Scheduler of api.kgon.identity").finish()
}
}
impl Scheduler {
///
pub fn get_instance(
pool: Pool<ConnectionManager<PgConnection>>,
sched: JobScheduler,
api_config: core::config::ApiConfig,
) -> Result<&'static Arc<Scheduler>, Box<dyn std::error::Error>> {
let instance = G_INSTANCE
.get_or_try_init(|| -> Result<Arc<Scheduler>, Box<dyn std::error::Error>> {
let s = Scheduler {
pool,
sched,
api_config: api_config.clone(),
synchronization_history_repository:
repositories::synchronization_history::repository::Repository::new(),
vendor_repository: repositories::vendor::repository::Repository::new(),
vendor_api: api::vendor::api::Api::new(api_config.clone()),
};
Ok(Arc::new(s))
})
.expect("");
Ok(instance)
}
pub async fn queue(&'static self) -> Result<(), std::boxed::Box<dyn std::error::Error>> {
self.list_vendors().await?;
Ok(())
}
async fn add_history(
&'static self,
item: String,
start_at: i64,
code: i64,
message: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let complete_at = (chrono::Utc::now()).timestamp();
let conn = self.pool.get().expect("conn");
self
.synchronization_history_repository
.insert(
&conn,
&repositories::synchronization_history::models::NewSynchronizationHistory {
item,
start_at,
complete_at,
code,
message,
},
)
.expect("synchronization_history insert");
Ok(())
}
async fn list_vendors(&'static self) -> Result<(), Box<dyn std::error::Error>> {
let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| {
Box::pin(async move {
let start_at = (chrono::Utc::now()).timestamp();
let conn = self.pool.get().expect("conn");
let req = api::vendor::models::ListVendorsRequest {};
let res = match self.vendor_api.list_vendors(req).await {
Ok(r) => Ok(r),
Err(e) => {
self
.add_history(
repositories::synchronization::models::ITEM_VENDORS.to_string(),
start_at,
e.code,
e.msg.clone(),
)
.await
.expect("add_history");
Err(e)
}
}
.expect("list_vendors");
let upsert_vendors: Vec<repositories::vendor::models::UpsertVendor> = res
.vendors
.iter()
.map(|d| repositories::vendor::models::UpsertVendor {
id: d.id,
company_id: d.company_id,
vendor_id: d.vendor_id,
key: d.key.clone(),
name: d.name.clone(),
category: d.category.clone(),
max_bet_casino: d.max_bet_casino,
max_bet_slot: d.max_bet_slot,
is_enable: d.is_enable.clone(),
bet_count: d.bet_count,
})
.collect();
let _affected = self
.vendor_repository
.upserts(&conn, upsert_vendors)
.expect("vendor upsert");
self
.add_history(
repositories::synchronization::models::ITEM_VENDORS.to_string(),
start_at,
0,
None,
)
.await
.expect("add_history");
})
})?;
self.sched.add(j_synchronization).await?;
Ok(())
}
}