diff --git a/src/main.rs b/src/main.rs index 6d333fc..9d791db 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,7 @@ mod events; mod repositories; mod schedulers; mod services; +mod synchronizations; diesel_migrations::embed_migrations!(); @@ -68,28 +69,17 @@ async fn main() -> Result<(), Box> { let server_broker_opts = nats::asynk::Options::new(); let connection_server_broker = server_broker_opts.connect(url_server_broker).await?; - let mut sched = tokio_cron_scheduler::JobScheduler::new().await?; - - let member_scheduler = schedulers::member::scheduler::Scheduler::get_instance( - pool.clone(), - sched.clone(), - api_config.clone(), - )?; - member_scheduler.queue().await?; - let balance_scheduler = schedulers::balance::scheduler::Scheduler::get_instance( - pool.clone(), - sched.clone(), - api_config.clone(), - )?; - balance_scheduler.queue().await?; - let game_scheduler = schedulers::game::scheduler::Scheduler::get_instance( - pool.clone(), - sched.clone(), - api_config.clone(), - )?; - game_scheduler.queue().await?; - - let _h_scheduler = sched.start().await?; + let vendor_synchronizer = + synchronizations::vendor::synchronizer::Synchronizer::new(pool.clone(), api_config.clone()); + let game_synchronizer = + synchronizations::game::synchronizer::Synchronizer::new(pool.clone(), api_config.clone()); + let member_synchronizer = + synchronizations::member::synchronizer::Synchronizer::new(pool.clone(), api_config.clone()); + let member_account_synchronizer = + synchronizations::member_account::synchronizer::Synchronizer::new( + pool.clone(), + api_config.clone(), + ); let vendor_service = services::vendor::service::Service::new( connection_server_broker.clone(), @@ -121,6 +111,29 @@ async fn main() -> Result<(), Box> { api_config.clone(), ); + let mut sched = tokio_cron_scheduler::JobScheduler::new().await?; + + let game_scheduler = schedulers::game::scheduler::Scheduler::get_instance( + sched.clone(), + vendor_synchronizer.clone(), + game_synchronizer.clone(), + )?; + game_scheduler.queue().await?; + + let member_scheduler = schedulers::member::scheduler::Scheduler::get_instance( + sched.clone(), + member_synchronizer.clone(), + )?; + member_scheduler.queue().await?; + + let balance_scheduler = schedulers::balance::scheduler::Scheduler::get_instance( + sched.clone(), + member_account_synchronizer.clone(), + )?; + balance_scheduler.queue().await?; + + let _h_scheduler = sched.start().await?; + println!("Server service [beteran-api-kgon-server-service] is started"); futures::try_join!( diff --git a/src/schedulers/balance/scheduler.rs b/src/schedulers/balance/scheduler.rs index 88394b0..cdbef2a 100644 --- a/src/schedulers/balance/scheduler.rs +++ b/src/schedulers/balance/scheduler.rs @@ -1,10 +1,4 @@ -use crate::api; -use crate::core; -use crate::repositories; -use diesel::{ - r2d2::{ConnectionManager, Pool}, - PgConnection, -}; +use crate::synchronizations; use once_cell::sync::OnceCell; use std::sync::Arc; use tokio_cron_scheduler::{Job, JobScheduler}; @@ -13,14 +7,8 @@ static G_INSTANCE: OnceCell> = OnceCell::new(); /// pub struct Scheduler { - pool: Pool>, sched: JobScheduler, - api_config: core::config::ApiConfig, - synchronization_history_repository: repositories::synchronization_history::repository::Repository, - balance_repository: repositories::balance::repository::Repository, - member_repository: repositories::member::repository::Repository, - member_api: api::member::api::Api, - member_account_api: api::member_account::api::Api, + member_account_synchronizer: synchronizations::member_account::synchronizer::Synchronizer, } impl std::fmt::Debug for Scheduler { @@ -32,22 +20,14 @@ impl std::fmt::Debug for Scheduler { impl Scheduler { /// pub fn get_instance( - pool: Pool>, sched: JobScheduler, - api_config: core::config::ApiConfig, + member_account_synchronizer: synchronizations::member_account::synchronizer::Synchronizer, ) -> Result<&'static Arc, Box> { let instance = G_INSTANCE .get_or_try_init(|| -> Result, Box> { let s = Scheduler { - pool, sched, - api_config: api_config.clone(), - synchronization_history_repository: - repositories::synchronization_history::repository::Repository::new(), - balance_repository: repositories::balance::repository::Repository::new(), - member_repository: repositories::member::repository::Repository::new(), - member_api: api::member::api::Api::new(api_config.clone()), - member_account_api: api::member_account::api::Api::new(api_config.clone()), + member_account_synchronizer, }; Ok(Arc::new(s)) @@ -67,67 +47,11 @@ impl Scheduler { async fn balance_for_user(&'static self) -> Result<(), Box> { let j_synchronization = Job::new_async("0 0/3 * * * ?", move |_uuid, _l| { Box::pin(async move { - let start_at = (chrono::Utc::now()).timestamp(); - - if let Err(e) = async { - let conn = self.pool.get().expect("conn"); - - let req = api::member::models::ListMembersRequest { group_key: None }; - let res = self.member_api.list_members(req).await?; - - for u in res.users { - let req = api::member_account::models::GetBalanceForUserRequest { - username: u.site_username, - }; - let res = self.member_account_api.get_balance_for_user(req).await?; - - let modify_member = repositories::member::models::ModifyMemberForBalance { - balance: res.balance, - balance_bota: res.balance_bota, - balance_sum: res.balance_sum, - companies: res.companies, - }; - - self - .member_repository - .update_balance(&conn, u.id, &modify_member) - .expect("member update_balance"); - } - - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - }, - ) - .expect("synchronization_history insert"); - - Ok::<(), api::core::models::Error>(()) - } - .await - { - let conn = self.pool.get().expect("conn"); - - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - }, - ) - .expect("synchronization_history insert"); - } + self + .member_account_synchronizer + .balance_for_user() + .await + .expect("member_account_synchronizer.balance_for_user"); }) })?; @@ -139,79 +63,11 @@ impl Scheduler { async fn balance_for_partner(&'static self) -> Result<(), Box> { let j_synchronization = Job::new_async("0 0/3 * * * ?", move |_uuid, _l| { Box::pin(async move { - let start_at = (chrono::Utc::now()).timestamp(); - - if let Err(e) = async { - let conn = self.pool.get().expect("conn"); - - let req = api::member_account::models::GetBalanceForPartnerRequest {}; - let res = self.member_account_api.get_balance_for_partner(req).await?; - - match self - .balance_repository - .select(&conn) - .expect("balance select") - { - Some(b) => { - self - .balance_repository - .update( - &conn, - b.id, - &repositories::balance::models::ModifyBalance { - balance: res.balance, - balance_bota: res.balance_bota, - }, - ) - .expect("balance update"); - } - None => { - self - .balance_repository - .insert( - &conn, - &repositories::balance::models::NewBalance { - balance: res.balance, - balance_bota: res.balance_bota, - }, - ) - .expect("balance insert"); - } - } - - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - }, - ) - .expect("synchronization_history insert"); - - Ok::<(), api::core::models::Error>(()) - } - .await - { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - }, - ) - .expect("synchronization_history insert"); - } + self + .member_account_synchronizer + .balance_for_partner() + .await + .expect("member_account_synchronizer.balance_for_partner"); }) })?; diff --git a/src/schedulers/game/scheduler.rs b/src/schedulers/game/scheduler.rs index da9d774..c51e19e 100644 --- a/src/schedulers/game/scheduler.rs +++ b/src/schedulers/game/scheduler.rs @@ -1,10 +1,4 @@ -use crate::api; -use crate::core; -use crate::repositories; -use diesel::{ - r2d2::{ConnectionManager, Pool}, - PgConnection, -}; +use crate::synchronizations; use once_cell::sync::OnceCell; use std::sync::Arc; use tokio_cron_scheduler::{Job, JobScheduler}; @@ -13,14 +7,9 @@ static G_INSTANCE: OnceCell> = OnceCell::new(); /// pub struct Scheduler { - pool: Pool>, sched: JobScheduler, - api_config: core::config::ApiConfig, - synchronization_history_repository: repositories::synchronization_history::repository::Repository, - vendor_repository: repositories::vendor::repository::Repository, - vendor_api: api::vendor::api::Api, - game_repository: repositories::game::repository::Repository, - game_api: api::game::api::Api, + vendor_synchronizer: synchronizations::vendor::synchronizer::Synchronizer, + game_synchronizer: synchronizations::game::synchronizer::Synchronizer, } impl std::fmt::Debug for Scheduler { @@ -30,24 +19,17 @@ impl std::fmt::Debug for Scheduler { } impl Scheduler { - /// pub fn get_instance( - pool: Pool>, sched: JobScheduler, - api_config: core::config::ApiConfig, + vendor_synchronizer: synchronizations::vendor::synchronizer::Synchronizer, + game_synchronizer: synchronizations::game::synchronizer::Synchronizer, ) -> Result<&'static Arc, Box> { let instance = G_INSTANCE .get_or_try_init(|| -> Result, Box> { let s = Scheduler { - pool, sched, - api_config: api_config.clone(), - synchronization_history_repository: - repositories::synchronization_history::repository::Repository::new(), - vendor_repository: repositories::vendor::repository::Repository::new(), - vendor_api: api::vendor::api::Api::new(api_config.clone()), - game_repository: repositories::game::repository::Repository::new(), - game_api: api::game::api::Api::new(api_config.clone()), + vendor_synchronizer, + game_synchronizer, }; Ok(Arc::new(s)) @@ -58,79 +40,25 @@ impl Scheduler { } pub async fn queue(&'static self) -> Result<(), std::boxed::Box> { - self.list_vendors().await?; + self.vendors_and_games().await?; Ok(()) } - async fn list_vendors(&'static self) -> Result<(), Box> { + async fn vendors_and_games(&'static self) -> Result<(), Box> { let j_synchronization = Job::new_async("0 0/3 * * * ?", move |_uuid, _l| { Box::pin(async move { - let start_at = (chrono::Utc::now()).timestamp(); + self + .vendor_synchronizer + .vendors() + .await + .expect("vendor_synchronizer.vendors"); - if let Err(e) = async { - let conn = self.pool.get().expect("conn"); - - let req = api::vendor::models::ListVendorsRequest {}; - let res = self.vendor_api.list_vendors(req).await?; - - let upsert_vendors: Vec = res - .vendors - .iter() - .map(|d| repositories::vendor::models::UpsertVendor { - id: d.id, - company_id: d.company_id, - vendor_id: d.vendor_id, - key: d.key.clone(), - name: d.name.clone(), - category: d.category.clone(), - max_bet_casino: d.max_bet_casino, - max_bet_slot: d.max_bet_slot, - is_enable: d.is_enable.clone(), - bet_count: d.bet_count, - }) - .collect(); - - let _affected = self - .vendor_repository - .upserts(&conn, upsert_vendors) - .expect("vendor upsert"); - - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - }, - ) - .expect("synchronization_history insert"); - - self.list_games().await; - - Ok::<(), api::core::models::Error>(()) - } - .await - { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - }, - ) - .expect("synchronization_history insert"); - } + self + .game_synchronizer + .games() + .await + .expect("game_synchronizer.games"); }) })?; @@ -138,84 +66,4 @@ impl Scheduler { Ok(()) } - - async fn list_games(&'static self) { - let start_at = (chrono::Utc::now()).timestamp(); - - if let Err(e) = async { - let conn = self.pool.get().expect("conn"); - - let vendors = self - .vendor_repository - .select_all( - &conn, - &repositories::vendor::models::FindAll { - pagination: None, - sorts: None, - search: None, - }, - ) - .expect("vendor select_all"); - - let mut upsert_games: Vec = vec![]; - - for v in vendors { - let req = api::game::models::ListGamesRequest { - vendor_key: v.key.clone(), - }; - let res = self.game_api.list_games(req).await?; - - for g in res.games { - upsert_games.push(repositories::game::models::UpsertGame { - id: g.id, - vendor_id: v.id, - key: g.key.clone(), - names: serde_json::to_string(&g.names).expect("names"), - platform: g.platform.clone(), - category: g.category.clone(), - game_type: g.game_type.clone(), - image: g.image, - }); - } - } - - let _affected = self - .game_repository - .upserts(&conn, upsert_games) - .expect("game upsert"); - - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_GAMES.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - }, - ) - .expect("synchronization_history insert"); - - Ok::<(), api::core::models::Error>(()) - } - .await - { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_GAMES.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - }, - ) - .expect("synchronization_history insert"); - } - } } diff --git a/src/schedulers/member/scheduler.rs b/src/schedulers/member/scheduler.rs index 20c4f90..96ecb79 100644 --- a/src/schedulers/member/scheduler.rs +++ b/src/schedulers/member/scheduler.rs @@ -1,10 +1,4 @@ -use crate::api; -use crate::core; -use crate::repositories; -use diesel::{ - r2d2::{ConnectionManager, Pool}, - PgConnection, -}; +use crate::synchronizations; use once_cell::sync::OnceCell; use std::sync::Arc; use tokio_cron_scheduler::{Job, JobScheduler}; @@ -13,12 +7,8 @@ static G_INSTANCE: OnceCell> = OnceCell::new(); /// pub struct Scheduler { - pool: Pool>, sched: JobScheduler, - api_config: core::config::ApiConfig, - synchronization_history_repository: repositories::synchronization_history::repository::Repository, - member_repository: repositories::member::repository::Repository, - member_api: api::member::api::Api, + member_synchronizer: synchronizations::member::synchronizer::Synchronizer, } impl std::fmt::Debug for Scheduler { @@ -30,20 +20,14 @@ impl std::fmt::Debug for Scheduler { impl Scheduler { /// pub fn get_instance( - pool: Pool>, sched: JobScheduler, - api_config: core::config::ApiConfig, + member_synchronizer: synchronizations::member::synchronizer::Synchronizer, ) -> Result<&'static Arc, Box> { let instance = G_INSTANCE .get_or_try_init(|| -> Result, Box> { let s = Scheduler { - pool, sched, - api_config: api_config.clone(), - synchronization_history_repository: - repositories::synchronization_history::repository::Repository::new(), - member_repository: repositories::member::repository::Repository::new(), - member_api: api::member::api::Api::new(api_config.clone()), + member_synchronizer, }; Ok(Arc::new(s)) @@ -54,68 +38,19 @@ impl Scheduler { } pub async fn queue(&'static self) -> Result<(), std::boxed::Box> { - self.list_members().await?; + self.members().await?; Ok(()) } - async fn list_members(&'static self) -> Result<(), Box> { + async fn members(&'static self) -> Result<(), Box> { let j_synchronization = Job::new_async("0 0/3 * * * ?", move |_uuid, _l| { Box::pin(async move { - let start_at = (chrono::Utc::now()).timestamp(); - - if let Err(e) = async { - let conn = self.pool.get().expect("conn"); - - let req = api::member::models::ListMembersRequest { group_key: None }; - let res = self.member_api.list_members(req).await?; - - for u in res.users { - let modify_member = repositories::member::models::ModifyMember { - balance: u.cash, - balance_bota: u.cash_bota, - oriental_play: u.oriental_play, - }; - - self - .member_repository - .update(&conn, u.id, &modify_member) - .expect("member update"); - } - - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - }, - ) - .expect("synchronization_history insert"); - - Ok::<(), api::core::models::Error>(()) - } - .await - { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - }, - ) - .expect("synchronization_history insert"); - } + self + .member_synchronizer + .members() + .await + .expect("member_synchronizer.members"); }) })?; diff --git a/src/services/member_account/service.rs b/src/services/member_account/service.rs index 9e37433..580326a 100644 --- a/src/services/member_account/service.rs +++ b/src/services/member_account/service.rs @@ -23,8 +23,9 @@ impl Service { futures::try_join!( self.create_deposit(), self.create_withdraw(), - self.list_partner_balnace(), - self.list_user_balnace(), + self.get_partner_balnace(), + self.list_user_balnaces(), + self.get_user_balnace(), ) .map(|_| ()) } @@ -32,14 +33,29 @@ impl Service { async fn create_deposit(&self) -> Result<(), Box> { Ok(()) } + async fn list_deposits(&self) -> Result<(), Box> { + Ok(()) + } + async fn get_deposit(&self) -> Result<(), Box> { + Ok(()) + } async fn create_withdraw(&self) -> Result<(), Box> { Ok(()) } - - async fn list_partner_balnace(&self) -> Result<(), Box> { + async fn list_withdraws(&self) -> Result<(), Box> { Ok(()) } - async fn list_user_balnace(&self) -> Result<(), Box> { + async fn get_withdraw(&self) -> Result<(), Box> { + Ok(()) + } + + async fn get_partner_balnace(&self) -> Result<(), Box> { + Ok(()) + } + async fn list_user_balnaces(&self) -> Result<(), Box> { + Ok(()) + } + async fn get_user_balnace(&self) -> Result<(), Box> { Ok(()) } } diff --git a/src/services/vendor/service.rs b/src/services/vendor/service.rs index 4816011..c236843 100644 --- a/src/services/vendor/service.rs +++ b/src/services/vendor/service.rs @@ -116,12 +116,12 @@ impl Service { pagination: request .pagination .as_ref() - .map(|d| bcr::models::pagination::Pagination::from(d)), + .map(bcr::models::pagination::Pagination::from), sorts: Some( request .sorts .iter() - .map(|d| beteran_common_rust::models::pagination::Sort::from(d)) + .map(beteran_common_rust::models::pagination::Sort::from) .collect(), ), }; @@ -155,7 +155,7 @@ impl Service { result: Some(bpr::ss::api::vendor::list_vendors_response::Result { vendors: list .iter() - .map(|d| bpr::models::api::vendor::Vendor::from(d)) + .map(bpr::models::api::vendor::Vendor::from) .collect(), }), } @@ -188,11 +188,4 @@ impl Service { Ok(()) } - async fn list_games(&self) -> Result<(), Box> { - Ok(()) - } - - async fn execute_game(&self) -> Result<(), Box> { - Ok(()) - } } diff --git a/src/synchronizations/game/mod.rs b/src/synchronizations/game/mod.rs new file mode 100644 index 0000000..38b1e01 --- /dev/null +++ b/src/synchronizations/game/mod.rs @@ -0,0 +1 @@ +pub mod synchronizer; diff --git a/src/synchronizations/game/synchronizer.rs b/src/synchronizations/game/synchronizer.rs new file mode 100644 index 0000000..0ae9bc1 --- /dev/null +++ b/src/synchronizations/game/synchronizer.rs @@ -0,0 +1,137 @@ +use crate::api; +use crate::core; +use crate::repositories; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; + +/// +pub struct Synchronizer { + pool: Pool>, + api_config: core::config::ApiConfig, + synchronization_history_repository: repositories::synchronization_history::repository::Repository, + vendor_repository: repositories::vendor::repository::Repository, + game_repository: repositories::game::repository::Repository, + game_api: api::game::api::Api, +} + +impl std::fmt::Debug for Synchronizer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Synchronizer of api.kgon.identity").finish() + } +} + +impl std::clone::Clone for Synchronizer { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + api_config: self.api_config.clone(), + synchronization_history_repository: + repositories::synchronization_history::repository::Repository::new(), + vendor_repository: repositories::vendor::repository::Repository::new(), + game_repository: repositories::game::repository::Repository::new(), + game_api: api::game::api::Api::new(self.api_config.clone()), + } + } +} + +impl Synchronizer { + /// + pub fn new( + pool: Pool>, + api_config: core::config::ApiConfig, + ) -> Synchronizer { + Synchronizer { + pool, + api_config: api_config.clone(), + synchronization_history_repository: + repositories::synchronization_history::repository::Repository::new(), + vendor_repository: repositories::vendor::repository::Repository::new(), + game_repository: repositories::game::repository::Repository::new(), + game_api: api::game::api::Api::new(api_config), + } + } + + pub async fn games(&self) -> Result<(), Box> { + let start_at = (chrono::Utc::now()).timestamp(); + + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let vendors = self + .vendor_repository + .select_all( + &conn, + &repositories::vendor::models::FindAll { + pagination: None, + sorts: None, + search: None, + }, + ) + .expect("vendor select_all"); + + let mut upsert_games: Vec = vec![]; + + for v in vendors { + let req = api::game::models::ListGamesRequest { + vendor_key: v.key.clone(), + }; + let res = self.game_api.list_games(req).await?; + + for g in res.games { + upsert_games.push(repositories::game::models::UpsertGame { + id: g.id, + vendor_id: v.id, + key: g.key.clone(), + names: serde_json::to_string(&g.names).expect("names"), + platform: g.platform.clone(), + category: g.category.clone(), + game_type: g.game_type.clone(), + image: g.image, + }); + } + } + + let _affected = self + .game_repository + .upserts(&conn, upsert_games) + .expect("game upsert"); + + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_GAMES.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + }, + ) + .expect("synchronization_history insert"); + + Ok::<(), api::core::models::Error>(()) + } + .await + { + let conn = self.pool.get().expect("conn"); + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_GAMES.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + }, + ) + .expect("synchronization_history insert"); + } + + Ok(()) + } +} diff --git a/src/synchronizations/member/mod.rs b/src/synchronizations/member/mod.rs new file mode 100644 index 0000000..38b1e01 --- /dev/null +++ b/src/synchronizations/member/mod.rs @@ -0,0 +1 @@ +pub mod synchronizer; diff --git a/src/synchronizations/member/synchronizer.rs b/src/synchronizations/member/synchronizer.rs new file mode 100644 index 0000000..20ed2ce --- /dev/null +++ b/src/synchronizations/member/synchronizer.rs @@ -0,0 +1,111 @@ +use crate::api; +use crate::core; +use crate::repositories; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; + +/// +pub struct Synchronizer { + pool: Pool>, + api_config: core::config::ApiConfig, + synchronization_history_repository: repositories::synchronization_history::repository::Repository, + member_repository: repositories::member::repository::Repository, + member_api: api::member::api::Api, +} + +impl std::fmt::Debug for Synchronizer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Synchronizer of api.kgon.identity").finish() + } +} + +impl std::clone::Clone for Synchronizer { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + api_config: self.api_config.clone(), + synchronization_history_repository: + repositories::synchronization_history::repository::Repository::new(), + member_repository: repositories::member::repository::Repository::new(), + member_api: api::member::api::Api::new(self.api_config.clone()), + } + } +} + +impl Synchronizer { + /// + pub fn new( + pool: Pool>, + api_config: core::config::ApiConfig, + ) -> Synchronizer { + Synchronizer { + pool, + api_config: api_config.clone(), + synchronization_history_repository: + repositories::synchronization_history::repository::Repository::new(), + member_repository: repositories::member::repository::Repository::new(), + member_api: api::member::api::Api::new(api_config), + } + } + + pub async fn members(&self) -> Result<(), Box> { + let start_at = (chrono::Utc::now()).timestamp(); + + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let req = api::member::models::ListMembersRequest { group_key: None }; + let res = self.member_api.list_members(req).await?; + + for u in res.users { + let modify_member = repositories::member::models::ModifyMember { + balance: u.cash, + balance_bota: u.cash_bota, + oriental_play: u.oriental_play, + }; + + self + .member_repository + .update(&conn, u.id, &modify_member) + .expect("member update"); + } + + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + }, + ) + .expect("synchronization_history insert"); + + Ok::<(), api::core::models::Error>(()) + } + .await + { + let conn = self.pool.get().expect("conn"); + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + }, + ) + .expect("synchronization_history insert"); + } + + Ok(()) + } +} diff --git a/src/synchronizations/member_account/mod.rs b/src/synchronizations/member_account/mod.rs new file mode 100644 index 0000000..38b1e01 --- /dev/null +++ b/src/synchronizations/member_account/mod.rs @@ -0,0 +1 @@ +pub mod synchronizer; diff --git a/src/synchronizations/member_account/synchronizer.rs b/src/synchronizations/member_account/synchronizer.rs new file mode 100644 index 0000000..0c68b2d --- /dev/null +++ b/src/synchronizations/member_account/synchronizer.rs @@ -0,0 +1,224 @@ +use crate::api; +use crate::core; +use crate::repositories; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; + +/// +pub struct Synchronizer { + pool: Pool>, + api_config: core::config::ApiConfig, + synchronization_history_repository: repositories::synchronization_history::repository::Repository, + balance_repository: repositories::balance::repository::Repository, + member_repository: repositories::member::repository::Repository, + member_api: api::member::api::Api, + member_account_api: api::member_account::api::Api, +} + +impl std::fmt::Debug for Synchronizer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Synchronizer of api.kgon.identity").finish() + } +} + +impl std::clone::Clone for Synchronizer { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + api_config: self.api_config.clone(), + synchronization_history_repository: + repositories::synchronization_history::repository::Repository::new(), + balance_repository: repositories::balance::repository::Repository::new(), + member_repository: repositories::member::repository::Repository::new(), + member_api: api::member::api::Api::new(self.api_config.clone()), + member_account_api: api::member_account::api::Api::new(self.api_config.clone()), + } + } +} + +impl Synchronizer { + /// + pub fn new( + pool: Pool>, + api_config: core::config::ApiConfig, + ) -> Synchronizer { + Synchronizer { + pool, + api_config: api_config.clone(), + synchronization_history_repository: + repositories::synchronization_history::repository::Repository::new(), + balance_repository: repositories::balance::repository::Repository::new(), + member_repository: repositories::member::repository::Repository::new(), + member_api: api::member::api::Api::new(api_config.clone()), + member_account_api: api::member_account::api::Api::new(api_config), + } + } + + pub async fn balance_for_user(&self) -> Result<(), Box> { + let start_at = (chrono::Utc::now()).timestamp(); + + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let req = api::member::models::ListMembersRequest { group_key: None }; + let res = self.member_api.list_members(req).await?; + + let mut messages: Vec = vec![]; + + for u in res.users { + let req = api::member_account::models::GetBalanceForUserRequest { + username: u.site_username, + }; + let res = match self.member_account_api.get_balance_for_user(req).await { + Ok(r) => r, + Err(e) => { + messages.push(format!( + "api.get_balance_for_user code: {}, msg: {:?}", + e.code, e.msg + )); + continue; + } + }; + + let modify_member = repositories::member::models::ModifyMemberForBalance { + balance: res.balance, + balance_bota: res.balance_bota, + balance_sum: res.balance_sum, + companies: res.companies, + }; + + if let Err(e) = self + .member_repository + .update_balance(&conn, u.id, &modify_member) + { + messages.push(format!("repository.update_balance error: {}", e)); + }; + } + + if !messages.is_empty() { + return Err(api::core::models::Error { + code: -1, + msg: Some(serde_json::to_string(&messages).expect("json")), + }); + } + + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + }, + ) + .expect("synchronization_history insert"); + + Ok::<(), api::core::models::Error>(()) + } + .await + { + let conn = self.pool.get().expect("conn"); + + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + }, + ) + .expect("synchronization_history insert"); + } + + Ok(()) + } + + pub async fn balance_for_partner(&self) -> Result<(), Box> { + let start_at = (chrono::Utc::now()).timestamp(); + + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let req = api::member_account::models::GetBalanceForPartnerRequest {}; + let res = self.member_account_api.get_balance_for_partner(req).await?; + + match self + .balance_repository + .select(&conn) + .expect("balance select") + { + Some(b) => { + self + .balance_repository + .update( + &conn, + b.id, + &repositories::balance::models::ModifyBalance { + balance: res.balance, + balance_bota: res.balance_bota, + }, + ) + .expect("balance update"); + } + None => { + self + .balance_repository + .insert( + &conn, + &repositories::balance::models::NewBalance { + balance: res.balance, + balance_bota: res.balance_bota, + }, + ) + .expect("balance insert"); + } + } + + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + }, + ) + .expect("synchronization_history insert"); + + Ok::<(), api::core::models::Error>(()) + } + .await + { + let conn = self.pool.get().expect("conn"); + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + }, + ) + .expect("synchronization_history insert"); + } + + Ok(()) + } + + +} diff --git a/src/synchronizations/mod.rs b/src/synchronizations/mod.rs new file mode 100644 index 0000000..8c5d506 --- /dev/null +++ b/src/synchronizations/mod.rs @@ -0,0 +1,4 @@ +pub mod game; +pub mod member; +pub mod member_account; +pub mod vendor; diff --git a/src/synchronizations/vendor/mod.rs b/src/synchronizations/vendor/mod.rs new file mode 100644 index 0000000..38b1e01 --- /dev/null +++ b/src/synchronizations/vendor/mod.rs @@ -0,0 +1 @@ +pub mod synchronizer; diff --git a/src/synchronizations/vendor/synchronizer.rs b/src/synchronizations/vendor/synchronizer.rs new file mode 100644 index 0000000..464dd02 --- /dev/null +++ b/src/synchronizations/vendor/synchronizer.rs @@ -0,0 +1,120 @@ +use crate::api; +use crate::core; +use crate::repositories; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; + +/// +pub struct Synchronizer { + pool: Pool>, + api_config: core::config::ApiConfig, + synchronization_history_repository: repositories::synchronization_history::repository::Repository, + vendor_repository: repositories::vendor::repository::Repository, + vendor_api: api::vendor::api::Api, +} + +impl std::fmt::Debug for Synchronizer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Synchronizer of api.kgon.identity").finish() + } +} + +impl std::clone::Clone for Synchronizer { + fn clone(&self) -> Self { + Self { + pool: self.pool.clone(), + api_config: self.api_config.clone(), + synchronization_history_repository: + repositories::synchronization_history::repository::Repository::new(), + vendor_repository: repositories::vendor::repository::Repository::new(), + vendor_api: api::vendor::api::Api::new(self.api_config.clone()), + } + } +} + +impl Synchronizer { + /// + pub fn new( + pool: Pool>, + api_config: core::config::ApiConfig, + ) -> Synchronizer { + Synchronizer { + pool, + api_config: api_config.clone(), + synchronization_history_repository: + repositories::synchronization_history::repository::Repository::new(), + vendor_repository: repositories::vendor::repository::Repository::new(), + vendor_api: api::vendor::api::Api::new(api_config), + } + } + + pub async fn vendors(&self) -> Result<(), Box> { + let start_at = (chrono::Utc::now()).timestamp(); + + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let req = api::vendor::models::ListVendorsRequest {}; + let res = self.vendor_api.list_vendors(req).await?; + + let upsert_vendors: Vec = res + .vendors + .iter() + .map(|d| repositories::vendor::models::UpsertVendor { + id: d.id, + company_id: d.company_id, + vendor_id: d.vendor_id, + key: d.key.clone(), + name: d.name.clone(), + category: d.category.clone(), + max_bet_casino: d.max_bet_casino, + max_bet_slot: d.max_bet_slot, + is_enable: d.is_enable.clone(), + bet_count: d.bet_count, + }) + .collect(); + + let _affected = self + .vendor_repository + .upserts(&conn, upsert_vendors) + .expect("vendor upsert"); + + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + }, + ) + .expect("synchronization_history insert"); + + Ok::<(), api::core::models::Error>(()) + } + .await + { + let conn = self.pool.get().expect("conn"); + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + }, + ) + .expect("synchronization_history insert"); + } + + Ok(()) + } +}