implemented

This commit is contained in:
병준 박 2022-08-18 22:35:22 +00:00
parent ea2664577a
commit e7bc00c4c6
7 changed files with 275 additions and 386 deletions

View File

@ -87,30 +87,24 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut sched = tokio_cron_scheduler::JobScheduler::new().await?; let mut sched = tokio_cron_scheduler::JobScheduler::new().await?;
// let member_scheduler = schedulers::member::scheduler::Scheduler::get_instance( let member_scheduler = schedulers::member::scheduler::Scheduler::get_instance(
// pool.clone(),
// sched.clone(),
// api_config.clone(),
// )?;
// member_scheduler.queue().await?;
// let balance_scheduler = schedulers::balance::scheduler::Scheduler::get_instance(
// pool.clone(),
// sched.clone(),
// api_config.clone(),
// )?;
// balance_scheduler.queue().await?;
let vendor_scheduler = schedulers::vendor::scheduler::Scheduler::get_instance(
pool.clone(), pool.clone(),
sched.clone(), sched.clone(),
api_config.clone(), api_config.clone(),
)?; )?;
vendor_scheduler.queue().await?; member_scheduler.queue().await?;
// let game_scheduler = schedulers::game::scheduler::Scheduler::get_instance( let balance_scheduler = schedulers::balance::scheduler::Scheduler::get_instance(
// pool.clone(), pool.clone(),
// sched.clone(), sched.clone(),
// api_config.clone(), api_config.clone(),
// )?; )?;
// game_scheduler.queue().await?; balance_scheduler.queue().await?;
let game_scheduler = schedulers::game::scheduler::Scheduler::get_instance(
pool.clone(),
sched.clone(),
api_config.clone(),
)?;
game_scheduler.queue().await?;
let _h_scheduler = sched.start().await?; let _h_scheduler = sched.start().await?;

View File

@ -95,71 +95,56 @@ impl Scheduler {
let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| {
Box::pin(async move { Box::pin(async move {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
let conn = self.pool.get().expect("conn");
let req = api::member::models::ListMembersRequest { group_key: None }; if let Err(e) = async {
let res = match self.member_api.list_members(req).await { let conn = self.pool.get().expect("conn");
Ok(r) => Ok(r),
Err(e) => { let req = api::member::models::ListMembersRequest { group_key: None };
let res = self.member_api.list_members(req).await?;
for u in res.users {
let req = api::member_account::models::GetBalanceForUserRequest {
username: u.site_username,
};
let res = self.member_account_api.get_balance_for_user(req).await?;
let modify_member = repositories::member::models::ModifyMemberForBalance {
balance: res.balance,
balance_bota: res.balance_bota,
balance_sum: res.balance_sum,
companies: res.companies,
};
self self
.add_history( .member_repository
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), .update_balance(&conn, u.id, &modify_member)
start_at, .expect("member update_balance");
e.code,
e.msg.clone(),
)
.await
.expect("add_history");
Err(e)
} }
}
.expect("list_members");
for u in res.users {
let req = api::member_account::models::GetBalanceForUserRequest {
username: u.site_username,
};
let res = match self.member_account_api.get_balance_for_user(req).await {
Ok(r) => Ok(r),
Err(e) => {
self
.add_history(
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
start_at,
e.code,
e.msg.clone(),
)
.await
.expect("add_history");
Err(e)
}
}
.expect("get_balance_for_user");
let modify_member = repositories::member::models::ModifyMemberForBalance {
balance: res.balance,
balance_bota: res.balance_bota,
balance_sum: res.balance_sum,
companies: res.companies,
};
self self
.member_repository .add_history(
.update_balance(&conn, u.id, &modify_member) repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
.expect("member update_balance"); start_at,
} 0,
None,
)
.await
.expect("add_history");
self Ok::<(), api::core::models::Error>(())
.add_history( }
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), .await
start_at, {
0, self
None, .add_history(
) repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
.await start_at,
.expect("add_history"); e.code,
e.msg.clone(),
)
.await
.expect("add_history");
}
}) })
})?; })?;
@ -172,68 +157,69 @@ impl Scheduler {
let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| {
Box::pin(async move { Box::pin(async move {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
let conn = self.pool.get().expect("conn");
let req = api::member_account::models::GetBalanceForPartnerRequest {}; if let Err(e) = async {
let res = match self.member_account_api.get_balance_for_partner(req).await { let conn = self.pool.get().expect("conn");
Ok(r) => Ok(r),
Err(e) => {
self
.add_history(
repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(),
start_at,
e.code,
e.msg.clone(),
)
.await
.expect("add_history");
Err(e) let req = api::member_account::models::GetBalanceForPartnerRequest {};
let res = self.member_account_api.get_balance_for_partner(req).await?;
match self
.balance_repository
.select(&conn)
.expect("balance select")
{
Some(b) => {
self
.balance_repository
.update(
&conn,
b.id,
&repositories::balance::models::ModifyBalance {
balance: res.balance,
balance_bota: res.balance_bota,
},
)
.expect("balance update");
}
None => {
self
.balance_repository
.insert(
&conn,
&repositories::balance::models::NewBalance {
balance: res.balance,
balance_bota: res.balance_bota,
},
)
.expect("balance insert");
}
} }
}
.expect("list_members");
match self self
.balance_repository .add_history(
.select(&conn) repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
.expect("balance select") start_at,
0,
None,
)
.await
.expect("add_history");
Ok::<(), api::core::models::Error>(())
}
.await
{ {
Some(b) => { self
self .add_history(
.balance_repository repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
.update( start_at,
&conn, e.code,
b.id, e.msg.clone(),
&repositories::balance::models::ModifyBalance { )
balance: res.balance, .await
balance_bota: res.balance_bota, .expect("add_history");
},
)
.expect("balance update");
}
None => {
self
.balance_repository
.insert(
&conn,
&repositories::balance::models::NewBalance {
balance: res.balance,
balance_bota: res.balance_bota,
},
)
.expect("balance insert");
}
} }
self
.add_history(
repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(),
start_at,
0,
None,
)
.await
.expect("add_history");
}) })
})?; })?;

View File

@ -18,6 +18,7 @@ pub struct Scheduler {
api_config: core::config::ApiConfig, api_config: core::config::ApiConfig,
synchronization_history_repository: repositories::synchronization_history::repository::Repository, synchronization_history_repository: repositories::synchronization_history::repository::Repository,
vendor_repository: repositories::vendor::repository::Repository, vendor_repository: repositories::vendor::repository::Repository,
vendor_api: api::vendor::api::Api,
game_repository: repositories::game::repository::Repository, game_repository: repositories::game::repository::Repository,
game_api: api::game::api::Api, game_api: api::game::api::Api,
} }
@ -44,6 +45,7 @@ impl Scheduler {
synchronization_history_repository: synchronization_history_repository:
repositories::synchronization_history::repository::Repository::new(), repositories::synchronization_history::repository::Repository::new(),
vendor_repository: repositories::vendor::repository::Repository::new(), vendor_repository: repositories::vendor::repository::Repository::new(),
vendor_api: api::vendor::api::Api::new(api_config.clone()),
game_repository: repositories::game::repository::Repository::new(), game_repository: repositories::game::repository::Repository::new(),
game_api: api::game::api::Api::new(api_config.clone()), game_api: api::game::api::Api::new(api_config.clone()),
}; };
@ -92,72 +94,61 @@ impl Scheduler {
let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| {
Box::pin(async move { Box::pin(async move {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
let conn = self.pool.get().expect("conn");
let vendors = self if let Err(e) = async {
.vendor_repository let conn = self.pool.get().expect("conn");
.select_all(
&conn,
&repositories::vendor::models::FindAll {
pagination: None,
sorts: None,
search: None,
},
)
.expect("vendor select_all");
let mut upsert_games: Vec<repositories::game::models::UpsertGame> = vec![]; let req = api::vendor::models::ListVendorsRequest {};
let res = self.vendor_api.list_vendors(req).await?;
for v in vendors { let upsert_vendors: Vec<repositories::vendor::models::UpsertVendor> = res
let req = api::game::models::ListGamesRequest { .vendors
vendor_key: v.key.clone(), .iter()
}; .map(|d| repositories::vendor::models::UpsertVendor {
let res = match self.game_api.list_games(req).await { id: d.id,
Ok(r) => Ok(r), company_id: d.company_id,
Err(e) => { vendor_id: d.vendor_id,
self key: d.key.clone(),
.add_history( name: d.name.clone(),
repositories::synchronization::models::ITEM_VENDORS.to_string(), category: d.category.clone(),
start_at, max_bet_casino: d.max_bet_casino,
e.code, max_bet_slot: d.max_bet_slot,
e.msg.clone(), is_enable: d.is_enable.clone(),
) bet_count: d.bet_count,
.await })
.expect("add_history"); .collect();
Err(e) let _affected = self
} .vendor_repository
} .upserts(&conn, upsert_vendors)
.expect("list_games"); .expect("vendor upsert");
for g in res.games { self
upsert_games.push(repositories::game::models::UpsertGame { .add_history(
id: g.id, repositories::synchronization::models::ITEM_VENDORS.to_string(),
vendor_id: v.id, start_at,
key: g.key.clone(), 0,
names: serde_json::to_string(&g.names).expect("names"), None,
platform: g.platform.clone(), )
category: g.category.clone(), .await
game_type: g.game_type.clone(), .expect("add_history");
image: g.image,
}); self.list_games().await;
}
Ok::<(), api::core::models::Error>(())
}
.await
{
self
.add_history(
repositories::synchronization::models::ITEM_VENDORS.to_string(),
start_at,
e.code,
e.msg.clone(),
)
.await
.expect("add_history");
} }
let _affected = self
.game_repository
.upserts(&conn, upsert_games)
.expect("game upsert");
self
.add_history(
repositories::synchronization::models::ITEM_GAMES.to_string(),
start_at,
0,
None,
)
.await
.expect("add_history");
}) })
})?; })?;
@ -165,4 +156,75 @@ impl Scheduler {
Ok(()) Ok(())
} }
async fn list_games(&'static self) {
let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async {
let conn = self.pool.get().expect("conn");
let vendors = self
.vendor_repository
.select_all(
&conn,
&repositories::vendor::models::FindAll {
pagination: None,
sorts: None,
search: None,
},
)
.expect("vendor select_all");
let mut upsert_games: Vec<repositories::game::models::UpsertGame> = vec![];
for v in vendors {
let req = api::game::models::ListGamesRequest {
vendor_key: v.key.clone(),
};
let res = self.game_api.list_games(req).await?;
for g in res.games {
upsert_games.push(repositories::game::models::UpsertGame {
id: g.id,
vendor_id: v.id,
key: g.key.clone(),
names: serde_json::to_string(&g.names).expect("names"),
platform: g.platform.clone(),
category: g.category.clone(),
game_type: g.game_type.clone(),
image: g.image,
});
}
}
let _affected = self
.game_repository
.upserts(&conn, upsert_games)
.expect("game upsert");
self
.add_history(
repositories::synchronization::models::ITEM_GAMES.to_string(),
start_at,
0,
None,
)
.await
.expect("add_history");
Ok::<(), api::core::models::Error>(())
}
.await
{
self
.add_history(
repositories::synchronization::models::ITEM_GAMES.to_string(),
start_at,
e.code,
e.msg.clone(),
)
.await
.expect("add_history");
}
}
} }

View File

@ -90,49 +90,50 @@ impl Scheduler {
let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| { let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| {
Box::pin(async move { Box::pin(async move {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
let conn = self.pool.get().expect("conn");
let req = api::member::models::ListMembersRequest { group_key: None }; if let Err(e) = async {
let res = match self.member_api.list_members(req).await { let conn = self.pool.get().expect("conn");
Ok(r) => Ok(r),
Err(e) => { let req = api::member::models::ListMembersRequest { group_key: None };
let res = self.member_api.list_members(req).await?;
for u in res.users {
let modify_member = repositories::member::models::ModifyMember {
balance: u.cash,
balance_bota: u.cash_bota,
oriental_play: u.oriental_play,
};
self self
.add_history( .member_repository
repositories::synchronization::models::ITEM_MEMBERS.to_string(), .update(&conn, u.id, &modify_member)
start_at, .expect("member update");
e.code,
e.msg.clone(),
)
.await
.expect("add_history");
Err(e)
} }
}
.expect("list_members");
for u in res.users {
let modify_member = repositories::member::models::ModifyMember {
balance: u.cash,
balance_bota: u.cash_bota,
oriental_play: u.oriental_play,
};
self self
.member_repository .add_history(
.update(&conn, u.id, &modify_member) repositories::synchronization::models::ITEM_MEMBERS.to_string(),
.expect("member update"); start_at,
} 0,
None,
)
.await
.expect("add_history");
self Ok::<(), api::core::models::Error>(())
.add_history( }
repositories::synchronization::models::ITEM_MEMBERS.to_string(), .await
start_at, {
0, self
None, .add_history(
) repositories::synchronization::models::ITEM_MEMBERS.to_string(),
.await start_at,
.expect("add_history"); e.code,
e.msg.clone(),
)
.await
.expect("add_history");
}
}) })
})?; })?;

View File

@ -1,4 +1,3 @@
pub mod balance; pub mod balance;
pub mod game; pub mod game;
pub mod member; pub mod member;
pub mod vendor;

View File

@ -1 +0,0 @@
pub mod scheduler;

View File

@ -1,152 +0,0 @@
use crate::api;
use crate::core;
use crate::repositories;
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use once_cell::sync::OnceCell;
use std::sync::Arc;
use tokio_cron_scheduler::{Job, JobScheduler};
static G_INSTANCE: OnceCell<Arc<Scheduler>> = OnceCell::new();
///
pub struct Scheduler {
pool: Pool<ConnectionManager<PgConnection>>,
sched: JobScheduler,
api_config: core::config::ApiConfig,
synchronization_history_repository: repositories::synchronization_history::repository::Repository,
vendor_repository: repositories::vendor::repository::Repository,
vendor_api: api::vendor::api::Api,
}
impl std::fmt::Debug for Scheduler {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Scheduler of api.kgon.identity").finish()
}
}
impl Scheduler {
///
pub fn get_instance(
pool: Pool<ConnectionManager<PgConnection>>,
sched: JobScheduler,
api_config: core::config::ApiConfig,
) -> Result<&'static Arc<Scheduler>, Box<dyn std::error::Error>> {
let instance = G_INSTANCE
.get_or_try_init(|| -> Result<Arc<Scheduler>, Box<dyn std::error::Error>> {
let s = Scheduler {
pool,
sched,
api_config: api_config.clone(),
synchronization_history_repository:
repositories::synchronization_history::repository::Repository::new(),
vendor_repository: repositories::vendor::repository::Repository::new(),
vendor_api: api::vendor::api::Api::new(api_config.clone()),
};
Ok(Arc::new(s))
})
.expect("");
Ok(instance)
}
pub async fn queue(&'static self) -> Result<(), std::boxed::Box<dyn std::error::Error>> {
self.list_vendors().await?;
Ok(())
}
async fn add_history(
&'static self,
item: String,
start_at: i64,
code: i64,
message: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let complete_at = (chrono::Utc::now()).timestamp();
let conn = self.pool.get().expect("conn");
self
.synchronization_history_repository
.insert(
&conn,
&repositories::synchronization_history::models::NewSynchronizationHistory {
item,
start_at,
complete_at,
code,
message,
},
)
.expect("synchronization_history insert");
Ok(())
}
async fn list_vendors(&'static self) -> Result<(), Box<dyn std::error::Error>> {
let j_synchronization = Job::new_async("0 0 0/1 * * *", move |_uuid, _l| {
Box::pin(async move {
let start_at = (chrono::Utc::now()).timestamp();
let conn = self.pool.get().expect("conn");
let req = api::vendor::models::ListVendorsRequest {};
let res = match self.vendor_api.list_vendors(req).await {
Ok(r) => Ok(r),
Err(e) => {
self
.add_history(
repositories::synchronization::models::ITEM_VENDORS.to_string(),
start_at,
e.code,
e.msg.clone(),
)
.await
.expect("add_history");
Err(e)
}
}
.expect("list_vendors");
let upsert_vendors: Vec<repositories::vendor::models::UpsertVendor> = res
.vendors
.iter()
.map(|d| repositories::vendor::models::UpsertVendor {
id: d.id,
company_id: d.company_id,
vendor_id: d.vendor_id,
key: d.key.clone(),
name: d.name.clone(),
category: d.category.clone(),
max_bet_casino: d.max_bet_casino,
max_bet_slot: d.max_bet_slot,
is_enable: d.is_enable.clone(),
bet_count: d.bet_count,
})
.collect();
let _affected = self
.vendor_repository
.upserts(&conn, upsert_vendors)
.expect("vendor upsert");
self
.add_history(
repositories::synchronization::models::ITEM_VENDORS.to_string(),
start_at,
0,
None,
)
.await
.expect("add_history");
})
})?;
self.sched.add(j_synchronization).await?;
Ok(())
}
}