diff --git a/src/main.rs b/src/main.rs index 2f9bde1..ffe152b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -87,30 +87,24 @@ async fn main() -> Result<(), Box> { let mut sched = tokio_cron_scheduler::JobScheduler::new().await?; - // let member_scheduler = schedulers::member::scheduler::Scheduler::get_instance( - // pool.clone(), - // sched.clone(), - // api_config.clone(), - // )?; - // member_scheduler.queue().await?; - // let balance_scheduler = schedulers::balance::scheduler::Scheduler::get_instance( - // pool.clone(), - // sched.clone(), - // api_config.clone(), - // )?; - // balance_scheduler.queue().await?; - let vendor_scheduler = schedulers::vendor::scheduler::Scheduler::get_instance( + let member_scheduler = schedulers::member::scheduler::Scheduler::get_instance( pool.clone(), sched.clone(), api_config.clone(), )?; - vendor_scheduler.queue().await?; - // let game_scheduler = schedulers::game::scheduler::Scheduler::get_instance( - // pool.clone(), - // sched.clone(), - // api_config.clone(), - // )?; - // game_scheduler.queue().await?; + member_scheduler.queue().await?; + let balance_scheduler = schedulers::balance::scheduler::Scheduler::get_instance( + pool.clone(), + sched.clone(), + api_config.clone(), + )?; + balance_scheduler.queue().await?; + let game_scheduler = schedulers::game::scheduler::Scheduler::get_instance( + pool.clone(), + sched.clone(), + api_config.clone(), + )?; + game_scheduler.queue().await?; let _h_scheduler = sched.start().await?; diff --git a/src/schedulers/balance/scheduler.rs b/src/schedulers/balance/scheduler.rs index 9d78432..89653ae 100644 --- a/src/schedulers/balance/scheduler.rs +++ b/src/schedulers/balance/scheduler.rs @@ -95,71 +95,56 @@ impl Scheduler { let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { Box::pin(async move { let start_at = (chrono::Utc::now()).timestamp(); - let conn = self.pool.get().expect("conn"); - let req = api::member::models::ListMembersRequest { group_key: None }; - let res = match self.member_api.list_members(req).await { - Ok(r) => Ok(r), - Err(e) => { + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let req = api::member::models::ListMembersRequest { group_key: None }; + let res = self.member_api.list_members(req).await?; + + for u in res.users { + let req = api::member_account::models::GetBalanceForUserRequest { + username: u.site_username, + }; + let res = self.member_account_api.get_balance_for_user(req).await?; + + let modify_member = repositories::member::models::ModifyMemberForBalance { + balance: res.balance, + balance_bota: res.balance_bota, + balance_sum: res.balance_sum, + companies: res.companies, + }; + self - .add_history( - repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - e.code, - e.msg.clone(), - ) - .await - .expect("add_history"); - - Err(e) + .member_repository + .update_balance(&conn, u.id, &modify_member) + .expect("member update_balance"); } - } - .expect("list_members"); - - for u in res.users { - let req = api::member_account::models::GetBalanceForUserRequest { - username: u.site_username, - }; - let res = match self.member_account_api.get_balance_for_user(req).await { - Ok(r) => Ok(r), - Err(e) => { - self - .add_history( - repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - e.code, - e.msg.clone(), - ) - .await - .expect("add_history"); - - Err(e) - } - } - .expect("get_balance_for_user"); - - let modify_member = repositories::member::models::ModifyMemberForBalance { - balance: res.balance, - balance_bota: res.balance_bota, - balance_sum: res.balance_sum, - companies: res.companies, - }; self - .member_repository - .update_balance(&conn, u.id, &modify_member) - .expect("member update_balance"); - } + .add_history( + repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); - self - .add_history( - repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - 0, - None, - ) - .await - .expect("add_history"); + Ok::<(), api::core::models::Error>(()) + } + .await + { + self + .add_history( + repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); + } }) })?; @@ -172,68 +157,69 @@ impl Scheduler { let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { Box::pin(async move { let start_at = (chrono::Utc::now()).timestamp(); - let conn = self.pool.get().expect("conn"); - let req = api::member_account::models::GetBalanceForPartnerRequest {}; - let res = match self.member_account_api.get_balance_for_partner(req).await { - Ok(r) => Ok(r), - Err(e) => { - self - .add_history( - repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), - start_at, - e.code, - e.msg.clone(), - ) - .await - .expect("add_history"); + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); - Err(e) + let req = api::member_account::models::GetBalanceForPartnerRequest {}; + let res = self.member_account_api.get_balance_for_partner(req).await?; + + match self + .balance_repository + .select(&conn) + .expect("balance select") + { + Some(b) => { + self + .balance_repository + .update( + &conn, + b.id, + &repositories::balance::models::ModifyBalance { + balance: res.balance, + balance_bota: res.balance_bota, + }, + ) + .expect("balance update"); + } + None => { + self + .balance_repository + .insert( + &conn, + &repositories::balance::models::NewBalance { + balance: res.balance, + balance_bota: res.balance_bota, + }, + ) + .expect("balance insert"); + } } - } - .expect("list_members"); - match self - .balance_repository - .select(&conn) - .expect("balance select") + self + .add_history( + repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); + + Ok::<(), api::core::models::Error>(()) + } + .await { - Some(b) => { - self - .balance_repository - .update( - &conn, - b.id, - &repositories::balance::models::ModifyBalance { - balance: res.balance, - balance_bota: res.balance_bota, - }, - ) - .expect("balance update"); - } - None => { - self - .balance_repository - .insert( - &conn, - &repositories::balance::models::NewBalance { - balance: res.balance, - balance_bota: res.balance_bota, - }, - ) - .expect("balance insert"); - } + self + .add_history( + repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); } - - self - .add_history( - repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), - start_at, - 0, - None, - ) - .await - .expect("add_history"); }) })?; diff --git a/src/schedulers/game/scheduler.rs b/src/schedulers/game/scheduler.rs index 44445d8..bd56bde 100644 --- a/src/schedulers/game/scheduler.rs +++ b/src/schedulers/game/scheduler.rs @@ -18,6 +18,7 @@ pub struct Scheduler { api_config: core::config::ApiConfig, synchronization_history_repository: repositories::synchronization_history::repository::Repository, vendor_repository: repositories::vendor::repository::Repository, + vendor_api: api::vendor::api::Api, game_repository: repositories::game::repository::Repository, game_api: api::game::api::Api, } @@ -44,6 +45,7 @@ impl Scheduler { synchronization_history_repository: repositories::synchronization_history::repository::Repository::new(), vendor_repository: repositories::vendor::repository::Repository::new(), + vendor_api: api::vendor::api::Api::new(api_config.clone()), game_repository: repositories::game::repository::Repository::new(), game_api: api::game::api::Api::new(api_config.clone()), }; @@ -92,72 +94,61 @@ impl Scheduler { let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { Box::pin(async move { let start_at = (chrono::Utc::now()).timestamp(); - let conn = self.pool.get().expect("conn"); - let vendors = self - .vendor_repository - .select_all( - &conn, - &repositories::vendor::models::FindAll { - pagination: None, - sorts: None, - search: None, - }, - ) - .expect("vendor select_all"); + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); - let mut upsert_games: Vec = vec![]; + let req = api::vendor::models::ListVendorsRequest {}; + let res = self.vendor_api.list_vendors(req).await?; - for v in vendors { - let req = api::game::models::ListGamesRequest { - vendor_key: v.key.clone(), - }; - let res = match self.game_api.list_games(req).await { - Ok(r) => Ok(r), - Err(e) => { - self - .add_history( - repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - e.code, - e.msg.clone(), - ) - .await - .expect("add_history"); + let upsert_vendors: Vec = res + .vendors + .iter() + .map(|d| repositories::vendor::models::UpsertVendor { + id: d.id, + company_id: d.company_id, + vendor_id: d.vendor_id, + key: d.key.clone(), + name: d.name.clone(), + category: d.category.clone(), + max_bet_casino: d.max_bet_casino, + max_bet_slot: d.max_bet_slot, + is_enable: d.is_enable.clone(), + bet_count: d.bet_count, + }) + .collect(); - Err(e) - } - } - .expect("list_games"); + let _affected = self + .vendor_repository + .upserts(&conn, upsert_vendors) + .expect("vendor upsert"); - for g in res.games { - upsert_games.push(repositories::game::models::UpsertGame { - id: g.id, - vendor_id: v.id, - key: g.key.clone(), - names: serde_json::to_string(&g.names).expect("names"), - platform: g.platform.clone(), - category: g.category.clone(), - game_type: g.game_type.clone(), - image: g.image, - }); - } + self + .add_history( + repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); + + self.list_games().await; + + Ok::<(), api::core::models::Error>(()) + } + .await + { + self + .add_history( + repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); } - - let _affected = self - .game_repository - .upserts(&conn, upsert_games) - .expect("game upsert"); - - self - .add_history( - repositories::synchronization::models::ITEM_GAMES.to_string(), - start_at, - 0, - None, - ) - .await - .expect("add_history"); }) })?; @@ -165,4 +156,75 @@ impl Scheduler { Ok(()) } + + async fn list_games(&'static self) { + let start_at = (chrono::Utc::now()).timestamp(); + + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let vendors = self + .vendor_repository + .select_all( + &conn, + &repositories::vendor::models::FindAll { + pagination: None, + sorts: None, + search: None, + }, + ) + .expect("vendor select_all"); + + let mut upsert_games: Vec = vec![]; + + for v in vendors { + let req = api::game::models::ListGamesRequest { + vendor_key: v.key.clone(), + }; + let res = self.game_api.list_games(req).await?; + + for g in res.games { + upsert_games.push(repositories::game::models::UpsertGame { + id: g.id, + vendor_id: v.id, + key: g.key.clone(), + names: serde_json::to_string(&g.names).expect("names"), + platform: g.platform.clone(), + category: g.category.clone(), + game_type: g.game_type.clone(), + image: g.image, + }); + } + } + + let _affected = self + .game_repository + .upserts(&conn, upsert_games) + .expect("game upsert"); + + self + .add_history( + repositories::synchronization::models::ITEM_GAMES.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); + + Ok::<(), api::core::models::Error>(()) + } + .await + { + self + .add_history( + repositories::synchronization::models::ITEM_GAMES.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); + } + } } diff --git a/src/schedulers/member/scheduler.rs b/src/schedulers/member/scheduler.rs index 02e2f37..9ed8e1c 100644 --- a/src/schedulers/member/scheduler.rs +++ b/src/schedulers/member/scheduler.rs @@ -90,49 +90,50 @@ impl Scheduler { let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { Box::pin(async move { let start_at = (chrono::Utc::now()).timestamp(); - let conn = self.pool.get().expect("conn"); - let req = api::member::models::ListMembersRequest { group_key: None }; - let res = match self.member_api.list_members(req).await { - Ok(r) => Ok(r), - Err(e) => { + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let req = api::member::models::ListMembersRequest { group_key: None }; + let res = self.member_api.list_members(req).await?; + + for u in res.users { + let modify_member = repositories::member::models::ModifyMember { + balance: u.cash, + balance_bota: u.cash_bota, + oriental_play: u.oriental_play, + }; + self - .add_history( - repositories::synchronization::models::ITEM_MEMBERS.to_string(), - start_at, - e.code, - e.msg.clone(), - ) - .await - .expect("add_history"); - - Err(e) + .member_repository + .update(&conn, u.id, &modify_member) + .expect("member update"); } - } - .expect("list_members"); - - for u in res.users { - let modify_member = repositories::member::models::ModifyMember { - balance: u.cash, - balance_bota: u.cash_bota, - oriental_play: u.oriental_play, - }; self - .member_repository - .update(&conn, u.id, &modify_member) - .expect("member update"); - } + .add_history( + repositories::synchronization::models::ITEM_MEMBERS.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); - self - .add_history( - repositories::synchronization::models::ITEM_MEMBERS.to_string(), - start_at, - 0, - None, - ) - .await - .expect("add_history"); + Ok::<(), api::core::models::Error>(()) + } + .await + { + self + .add_history( + repositories::synchronization::models::ITEM_MEMBERS.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); + } }) })?; diff --git a/src/schedulers/mod.rs b/src/schedulers/mod.rs index d2a9a94..8c305bc 100644 --- a/src/schedulers/mod.rs +++ b/src/schedulers/mod.rs @@ -1,4 +1,3 @@ pub mod balance; pub mod game; pub mod member; -pub mod vendor; diff --git a/src/schedulers/vendor/mod.rs b/src/schedulers/vendor/mod.rs deleted file mode 100644 index 81b3546..0000000 --- a/src/schedulers/vendor/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod scheduler; diff --git a/src/schedulers/vendor/scheduler.rs b/src/schedulers/vendor/scheduler.rs deleted file mode 100644 index 13cda79..0000000 --- a/src/schedulers/vendor/scheduler.rs +++ /dev/null @@ -1,152 +0,0 @@ -use crate::api; -use crate::core; -use crate::repositories; -use diesel::{ - r2d2::{ConnectionManager, Pool}, - PgConnection, -}; -use once_cell::sync::OnceCell; -use std::sync::Arc; -use tokio_cron_scheduler::{Job, JobScheduler}; - -static G_INSTANCE: OnceCell> = OnceCell::new(); - -/// -pub struct Scheduler { - pool: Pool>, - sched: JobScheduler, - api_config: core::config::ApiConfig, - synchronization_history_repository: repositories::synchronization_history::repository::Repository, - vendor_repository: repositories::vendor::repository::Repository, - vendor_api: api::vendor::api::Api, -} - -impl std::fmt::Debug for Scheduler { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("Scheduler of api.kgon.identity").finish() - } -} - -impl Scheduler { - /// - pub fn get_instance( - pool: Pool>, - sched: JobScheduler, - api_config: core::config::ApiConfig, - ) -> Result<&'static Arc, Box> { - let instance = G_INSTANCE - .get_or_try_init(|| -> Result, Box> { - let s = Scheduler { - pool, - sched, - api_config: api_config.clone(), - synchronization_history_repository: - repositories::synchronization_history::repository::Repository::new(), - vendor_repository: repositories::vendor::repository::Repository::new(), - vendor_api: api::vendor::api::Api::new(api_config.clone()), - }; - - Ok(Arc::new(s)) - }) - .expect(""); - - Ok(instance) - } - - pub async fn queue(&'static self) -> Result<(), std::boxed::Box> { - self.list_vendors().await?; - - Ok(()) - } - - async fn add_history( - &'static self, - item: String, - start_at: i64, - code: i64, - message: Option, - ) -> Result<(), Box> { - let complete_at = (chrono::Utc::now()).timestamp(); - let conn = self.pool.get().expect("conn"); - - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item, - start_at, - complete_at, - code, - message, - }, - ) - .expect("synchronization_history insert"); - - Ok(()) - } - - async fn list_vendors(&'static self) -> Result<(), Box> { - let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { - Box::pin(async move { - let start_at = (chrono::Utc::now()).timestamp(); - let conn = self.pool.get().expect("conn"); - - let req = api::vendor::models::ListVendorsRequest {}; - let res = match self.vendor_api.list_vendors(req).await { - Ok(r) => Ok(r), - Err(e) => { - self - .add_history( - repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - e.code, - e.msg.clone(), - ) - .await - .expect("add_history"); - - Err(e) - } - } - .expect("list_vendors"); - - let upsert_vendors: Vec = res - .vendors - .iter() - .map(|d| repositories::vendor::models::UpsertVendor { - id: d.id, - company_id: d.company_id, - vendor_id: d.vendor_id, - key: d.key.clone(), - name: d.name.clone(), - category: d.category.clone(), - max_bet_casino: d.max_bet_casino, - max_bet_slot: d.max_bet_slot, - is_enable: d.is_enable.clone(), - bet_count: d.bet_count, - }) - .collect(); - - let _affected = self - .vendor_repository - .upserts(&conn, upsert_vendors) - .expect("vendor upsert"); - - self - .add_history( - repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - 0, - None, - ) - .await - .expect("add_history"); - }) - })?; - - self.sched.add(j_synchronization).await?; - - Ok(()) - } -}