From 6edcc2d6152677c273723fd9a40d6281059c7d10 Mon Sep 17 00:00:00 2001 From: PARK BYUNG JUN Date: Wed, 31 Aug 2022 02:41:53 +0000 Subject: [PATCH] synchronization buf fixed --- src/main.rs | 2 +- src/repositories/synchronization/models.rs | 1 + .../betting_history/synchronizer.rs | 129 ++++--- src/synchronizations/game/synchronizer.rs | 129 ++++--- src/synchronizations/member/synchronizer.rs | 146 ++++--- .../member_account/synchronizer.rs | 355 ++++++++++++------ src/synchronizations/vendor/synchronizer.rs | 96 +++-- 7 files changed, 557 insertions(+), 301 deletions(-) diff --git a/src/main.rs b/src/main.rs index f689308..8f3e2f8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -59,7 +59,7 @@ async fn main() -> Result<(), Box> { let manager = ConnectionManager::::new(url_db); let pool = Pool::builder() - .max_size(4) + .max_size(10) .test_on_check_out(true) .build(manager)?; let conn = pool.get()?; diff --git a/src/repositories/synchronization/models.rs b/src/repositories/synchronization/models.rs index 384d9d3..1b79fa3 100644 --- a/src/repositories/synchronization/models.rs +++ b/src/repositories/synchronization/models.rs @@ -2,6 +2,7 @@ use super::schema::api_kgon_synchronizations; pub static ITEM_MEMBERS: &str = "members"; pub static ITEM_BALANCE_PARTNER: &str = "balance_partner"; +pub static ITEM_BALANCE_USER_ALL: &str = "balance_user_all"; pub static ITEM_BALANCE_USER: &str = "balance_user"; pub static ITEM_VENDORS: &str = "vendors"; pub static ITEM_GAMES: &str = "games"; diff --git a/src/synchronizations/betting_history/synchronizer.rs b/src/synchronizations/betting_history/synchronizer.rs index 761066f..81898a0 100644 --- a/src/synchronizations/betting_history/synchronizer.rs +++ b/src/synchronizations/betting_history/synchronizer.rs @@ -54,18 +54,35 @@ impl Synchronizer { let start_at = (chrono::Utc::now()).timestamp(); if let Err(e) = async { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("pool.get error: {}", e)), + }); + } + }; let sdate = match self .synchronization_history_repository .select_latest_by_item( &conn, repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(), - ) - .expect("synchronization_repository.select_by_item") - { - Some(s) => s.data, - None => None, + ) { + Ok(v) => match v { + Some(s) => s.data, + None => None, + }, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!( + "synchronization_history_repository.select_latest_by_item error: {}", + e + )), + }); + } }; let req = api::betting::models::ListBettingsRequest { @@ -76,7 +93,15 @@ impl Synchronizer { username: None, limit: 2000, }; - let res = self.betting_api.list_bettings(req).await?; + let res = match self.betting_api.list_bettings(req).await { + Ok(v) => v, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("betting_api.list_bettings error: {:?}", e)), + }); + } + }; let last_object_id = res.last_object_id; let mut last_utc_created_at: Option = None; @@ -94,15 +119,22 @@ impl Synchronizer { match chrono::NaiveDateTime::parse_from_str(&b.created_at, "%Y-%m-%d %H:%M:%S") { Ok(t) => t.timestamp(), Err(e) => { - println!("NaiveDateTime::parse_from_str error: {}", e); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!( + "chrono::NaiveDateTime::parse_from_str error: {}", + e + )), + }); } }; let utc_created_at = match chrono::DateTime::parse_from_rfc3339(b.utc_created_at.as_str()) { Ok(t) => t.timestamp(), Err(e) => { - println!("chrono::DateTime::parse_from_rfc3339 error: {}", e); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("chrono::DateTime::parse_from_rfc3339 error: {}", e)), + }); } }; @@ -134,46 +166,59 @@ impl Synchronizer { } if !new_betting_history.is_empty() { - self + match self .betting_history_repository .inserts(&conn, &new_betting_history) - .expect("betting_history_repository.inserts"); + { + Ok(v) => v, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("betting_history_repository.inserts error: {:?}", e)), + }); + } + }; } - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - data: last_utc_created_at, - }, - ) - .expect("synchronization_history insert"); + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + data: last_utc_created_at, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; Ok::<(), api::core::models::Error>(()) } .await { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - data: None, - }, - ) - .expect("synchronization_history insert"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + println!("pool.get error: {}", e); + return Ok(()); + } + }; + + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; } Ok(()) diff --git a/src/synchronizations/game/synchronizer.rs b/src/synchronizations/game/synchronizer.rs index 10b270d..b15ed8b 100644 --- a/src/synchronizations/game/synchronizer.rs +++ b/src/synchronizations/game/synchronizer.rs @@ -57,19 +57,32 @@ impl Synchronizer { let start_at = (chrono::Utc::now()).timestamp(); if let Err(e) = async { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("pool.get error: {}", e)), + }); + } + }; - let vendors = self - .vendor_repository - .select_all( - &conn, - &repositories::vendor::models::FindAll { - pagination: None, - sorts: None, - search: None, - }, - ) - .expect("vendor select_all"); + let vendors = match self.vendor_repository.select_all( + &conn, + &repositories::vendor::models::FindAll { + pagination: None, + sorts: None, + search: None, + }, + ) { + Ok(v) => v, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("vendor_repository.select_all error: {}", e)), + }); + } + }; let mut upsert_games: Vec = vec![]; @@ -77,13 +90,23 @@ impl Synchronizer { let req = api::game::models::ListGamesRequest { vendor_key: v.key.clone(), }; - let res = self.game_api.list_games(req).await?; + let res = match self.game_api.list_games(req).await { + Ok(v) => v, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("game_api.list_games error: {:?}", e)), + }); + } + }; for g in res.games { match upsert_games.iter().find(|v| v.id.eq(&g.id)) { Some(d) => { - println!("id is duplicated. e: {:?}, n: {:?}", d, g); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("id is duplicated. e: {:?}, n: {:?}", d, g)), + }); } None => {} }; @@ -101,45 +124,55 @@ impl Synchronizer { } } - let _affected = self - .game_repository - .upserts(&conn, upsert_games) - .expect("game upsert"); + let _affected = match self.game_repository.upserts(&conn, upsert_games) { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("game_repository.upserts error: {}", e)), + }); + } + }; - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_GAMES.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - data: None, - }, - ) - .expect("synchronization_history insert"); + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_GAMES.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; Ok::<(), api::core::models::Error>(()) } .await { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_GAMES.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - data: None, - }, - ) - .expect("synchronization_history insert"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + println!("pool.get error: {}", e); + return Ok(()); + } + }; + + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_GAMES.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; } Ok(()) diff --git a/src/synchronizations/member/synchronizer.rs b/src/synchronizations/member/synchronizer.rs index 05fd736..ba9482d 100644 --- a/src/synchronizations/member/synchronizer.rs +++ b/src/synchronizations/member/synchronizer.rs @@ -63,17 +63,38 @@ impl Synchronizer { let start_at = (chrono::Utc::now()).timestamp(); if let Err(e) = async { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("pool.get error: {}", e)), + }); + } + }; let req = api::member::models::ListMembersRequest { group_key: None }; - let res = self.member_api.list_members(req).await?; + let res = match self.member_api.list_members(req).await { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_api.list_members: {:?}", e)), + }); + } + }; for u in res.users { - if self - .member_repository - .select(&conn, u.id) - .expect("member_repository.select") - .is_none() + if match self.member_repository.select(&conn, u.id) { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_repository.select error: {}", e)), + }); + } + } + .is_none() { let ss_get_member_by_username_req = bpr::ss::member::GetMemberByUsernameRequest { client: Some(bpr::models::core::network::Client { @@ -100,8 +121,13 @@ impl Synchronizer { { Ok(r) => r, Err(e) => { - println!("bpr::ss::member::SUBJECT_GET_MEMBER_BY_USERNAME: {}", e); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!( + "bpr::ss::member::SUBJECT_GET_MEMBER_BY_USERNAME: {}", + e + )), + }); } }; @@ -111,17 +137,24 @@ impl Synchronizer { ) { Ok(r) => r, Err(e) => { - println!("bpr::ss::member::GetMemberByUsernameResponse: {}", e); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!( + "bpr::ss::member::GetMemberByUsernameResponse: {}", + e + )), + }); } }; if let Some(e) = ss_get_member_by_username_res.error { - println!( - "bpr::ss::member::SUBJECT_GET_MEMBER_BY_USERNAME error: {}", - e - ); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!( + "bpr::ss::member::SUBJECT_GET_MEMBER_BY_USERNAME error: {}", + e + )), + }); } match ss_get_member_by_username_res.result { @@ -138,19 +171,25 @@ impl Synchronizer { ) { Ok(r) => r, Err(e) => { - println!("member_repository.insert: {}", e); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_repository.insert error: {}", e)), + }); } }; } None => { - println!("member is not exist: {}", u.site_username); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member is not exist: {}", u.site_username)), + }); } }, None => { - println!("member is not exist: {}", u.site_username); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member is not exist: {}", u.site_username)), + }); } }; } @@ -169,40 +208,45 @@ impl Synchronizer { .expect("member update"); } - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - data: None, - }, - ) - .expect("synchronization_history insert"); + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; Ok::<(), api::core::models::Error>(()) } .await { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - data: None, - }, - ) - .expect("synchronization_history insert"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + println!("pool.get error: {}", e); + return Ok(()); + } + }; + + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; } Ok(()) diff --git a/src/synchronizations/member_account/synchronizer.rs b/src/synchronizations/member_account/synchronizer.rs index 4750f70..5f6386d 100644 --- a/src/synchronizations/member_account/synchronizer.rs +++ b/src/synchronizations/member_account/synchronizer.rs @@ -60,12 +60,26 @@ impl Synchronizer { let start_at = (chrono::Utc::now()).timestamp(); if let Err(e) = async { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("pool.get error: {}", e)), + }); + } + }; let req = api::member::models::ListMembersRequest { group_key: None }; - let res = self.member_api.list_members(req).await?; - - let mut messages: Vec = vec![]; + let res = match self.member_api.list_members(req).await { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_api.list_members: {:?}", e)), + }); + } + }; for u in res.users { let req = api::member_account::models::GetBalanceForUserRequest { @@ -74,11 +88,13 @@ impl Synchronizer { let res = match self.member_account_api.get_balance_for_user(req).await { Ok(r) => r, Err(e) => { - messages.push(format!( - "api.get_balance_for_user code: {}, msg: {:?}", - e.code, e.msg - )); - continue; + return Err(api::core::models::Error { + code: -1, + msg: Some(format!( + "member_account_api.get_balance_for_user error: {:?}", + e + )), + }); } }; @@ -89,56 +105,59 @@ impl Synchronizer { companies: res.companies, }; - if let Err(e) = self + match self .member_repository .update_balance(&conn, u.id, &modify_member) { - messages.push(format!("repository.update_balance error: {}", e)); + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_repository.update_balance error: {}", e)), + }); + } }; } - if !messages.is_empty() { - return Err(api::core::models::Error { - code: -1, - msg: Some(serde_json::to_string(&messages).expect("json")), - }); - } - - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - data: None, - }, - ) - .expect("synchronization_history insert"); + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_USER_ALL.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; Ok::<(), api::core::models::Error>(()) } .await { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + println!("pool.get error: {}", e); + return Ok(()); + } + }; - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - data: None, - }, - ) - .expect("synchronization_history insert"); + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_USER_ALL.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; } Ok(()) @@ -152,23 +171,46 @@ impl Synchronizer { let start_at = (chrono::Utc::now()).timestamp(); if let Err(e) = async { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("pool.get error: {}", e)), + }); + } + }; - let m = match self - .member_repository - .select_by_member_id(&conn, member_id) - .expect("member_repository.select_by_member_id") - { - Some(m) => m, - None => { - return Ok(()); + let m = match self.member_repository.select_by_member_id(&conn, member_id) { + Ok(c) => match c { + Some(m) => m, + None => { + return Ok(()); + } + }, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!( + "member_repository.select_by_member_id error: {}", + e + )), + }); } }; let req = api::member_account::models::GetBalanceForUserRequest { username: usename.to_string(), }; - let res = self.member_account_api.get_balance_for_user(req).await?; + let res = match self.member_account_api.get_balance_for_user(req).await { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_account_api.get_balance_for_user: {:?}", e)), + }); + } + }; let modify_member = repositories::member::models::ModifyMemberForBalance { balance: res.balance, @@ -177,31 +219,58 @@ impl Synchronizer { companies: res.companies, }; - self + match self .member_repository .update_balance(&conn, m.id, &modify_member) - .expect("member_repository.update_balance"); + { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_repository.update_balance: {}", e)), + }); + } + }; + + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; Ok::<(), api::core::models::Error>(()) } .await { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + println!("pool.get error: {}", e); + return Ok(()); + } + }; - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - data: None, - }, - ) - .expect("synchronization_history insert"); + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; } Ok(()) @@ -211,77 +280,115 @@ impl Synchronizer { let start_at = (chrono::Utc::now()).timestamp(); if let Err(e) = async { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("pool.get error: {}", e)), + }); + } + }; let req = api::member_account::models::GetBalanceForPartnerRequest {}; - let res = self.member_account_api.get_balance_for_partner(req).await?; + let res = match self.member_account_api.get_balance_for_partner(req).await { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!( + "member_account_api.get_balance_for_partner: {:?}", + e + )), + }); + } + }; - match self - .balance_repository - .select(&conn) - .expect("balance select") - { - Some(b) => { - self - .balance_repository - .update( + match self.balance_repository.select(&conn) { + Ok(c) => match c { + Some(b) => { + match self.balance_repository.update( &conn, b.id, &repositories::balance::models::ModifyBalance { balance: res.balance, balance_bota: res.balance_bota, }, - ) - .expect("balance update"); - } - None => { - self - .balance_repository - .insert( + ) { + Ok(_c) => {} + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("balance_repository.update error: {}", e)), + }); + } + } + } + None => { + match self.balance_repository.insert( &conn, &repositories::balance::models::NewBalance { balance: res.balance, balance_bota: res.balance_bota, }, - ) - .expect("balance insert"); + ) { + Ok(_c) => {} + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("balance_repository.insert error: {}", e)), + }); + } + }; + } + }, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_repository.select error: {}", e)), + }); } - } + }; - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - data: None, - }, - ) - .expect("synchronization_history insert"); + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; Ok::<(), api::core::models::Error>(()) } .await { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - data: None, - }, - ) - .expect("synchronization_history insert"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + println!("pool.get error: {}", e); + return Ok(()); + } + }; + + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; } Ok(()) diff --git a/src/synchronizations/vendor/synchronizer.rs b/src/synchronizations/vendor/synchronizer.rs index 9403fa4..9f2944e 100644 --- a/src/synchronizations/vendor/synchronizer.rs +++ b/src/synchronizations/vendor/synchronizer.rs @@ -54,10 +54,26 @@ impl Synchronizer { let start_at = (chrono::Utc::now()).timestamp(); if let Err(e) = async { - let conn = self.pool.get().expect("conn"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("pool.get error: {}", e)), + }); + } + }; let req = api::vendor::models::ListVendorsRequest {}; - let res = self.vendor_api.list_vendors(req).await?; + let res = match self.vendor_api.list_vendors(req).await { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("vendor_api.list_vendors: {:?}", e)), + }); + } + }; let upsert_vendors: Vec = res .vendors @@ -76,45 +92,55 @@ impl Synchronizer { }) .collect(); - let _affected = self - .vendor_repository - .upserts(&conn, upsert_vendors) - .expect("vendor upsert"); + let _affected = match self.vendor_repository.upserts(&conn, upsert_vendors) { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("vendor_repository.upserts: {:?}", e)), + }); + } + }; - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: 0, - message: None, - data: None, - }, - ) - .expect("synchronization_history insert"); + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: 0, + message: None, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; Ok::<(), api::core::models::Error>(()) } .await { - let conn = self.pool.get().expect("conn"); - self - .synchronization_history_repository - .insert( - &conn, - &repositories::synchronization_history::models::NewSynchronizationHistory { - item: repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - complete_at: (chrono::Utc::now()).timestamp(), - code: e.code, - message: e.msg, - data: None, - }, - ) - .expect("synchronization_history insert"); + let conn = match self.pool.get() { + Ok(c) => c, + Err(e) => { + println!("pool.get error: {}", e); + return Ok(()); + } + }; + + if let Err(e) = self.synchronization_history_repository.insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + data: None, + }, + ) { + println!("synchronization_history.insert error: {}", e); + }; } Ok(())