synchronization buf fixed

This commit is contained in:
병준 박 2022-08-31 02:41:53 +00:00
parent 0b16750e01
commit 6edcc2d615
7 changed files with 557 additions and 301 deletions

View File

@ -59,7 +59,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let manager = ConnectionManager::<PgConnection>::new(url_db); let manager = ConnectionManager::<PgConnection>::new(url_db);
let pool = Pool::builder() let pool = Pool::builder()
.max_size(4) .max_size(10)
.test_on_check_out(true) .test_on_check_out(true)
.build(manager)?; .build(manager)?;
let conn = pool.get()?; let conn = pool.get()?;

View File

@ -2,6 +2,7 @@ use super::schema::api_kgon_synchronizations;
pub static ITEM_MEMBERS: &str = "members"; pub static ITEM_MEMBERS: &str = "members";
pub static ITEM_BALANCE_PARTNER: &str = "balance_partner"; pub static ITEM_BALANCE_PARTNER: &str = "balance_partner";
pub static ITEM_BALANCE_USER_ALL: &str = "balance_user_all";
pub static ITEM_BALANCE_USER: &str = "balance_user"; pub static ITEM_BALANCE_USER: &str = "balance_user";
pub static ITEM_VENDORS: &str = "vendors"; pub static ITEM_VENDORS: &str = "vendors";
pub static ITEM_GAMES: &str = "games"; pub static ITEM_GAMES: &str = "games";

View File

@ -54,18 +54,35 @@ impl Synchronizer {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async { if let Err(e) = async {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("pool.get error: {}", e)),
});
}
};
let sdate = match self let sdate = match self
.synchronization_history_repository .synchronization_history_repository
.select_latest_by_item( .select_latest_by_item(
&conn, &conn,
repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(), repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(),
) ) {
.expect("synchronization_repository.select_by_item") Ok(v) => match v {
{ Some(s) => s.data,
Some(s) => s.data, None => None,
None => None, },
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!(
"synchronization_history_repository.select_latest_by_item error: {}",
e
)),
});
}
}; };
let req = api::betting::models::ListBettingsRequest { let req = api::betting::models::ListBettingsRequest {
@ -76,7 +93,15 @@ impl Synchronizer {
username: None, username: None,
limit: 2000, limit: 2000,
}; };
let res = self.betting_api.list_bettings(req).await?; let res = match self.betting_api.list_bettings(req).await {
Ok(v) => v,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("betting_api.list_bettings error: {:?}", e)),
});
}
};
let last_object_id = res.last_object_id; let last_object_id = res.last_object_id;
let mut last_utc_created_at: Option<String> = None; let mut last_utc_created_at: Option<String> = None;
@ -94,15 +119,22 @@ impl Synchronizer {
match chrono::NaiveDateTime::parse_from_str(&b.created_at, "%Y-%m-%d %H:%M:%S") { match chrono::NaiveDateTime::parse_from_str(&b.created_at, "%Y-%m-%d %H:%M:%S") {
Ok(t) => t.timestamp(), Ok(t) => t.timestamp(),
Err(e) => { Err(e) => {
println!("NaiveDateTime::parse_from_str error: {}", e); return Err(api::core::models::Error {
continue; code: -1,
msg: Some(format!(
"chrono::NaiveDateTime::parse_from_str error: {}",
e
)),
});
} }
}; };
let utc_created_at = match chrono::DateTime::parse_from_rfc3339(b.utc_created_at.as_str()) { let utc_created_at = match chrono::DateTime::parse_from_rfc3339(b.utc_created_at.as_str()) {
Ok(t) => t.timestamp(), Ok(t) => t.timestamp(),
Err(e) => { Err(e) => {
println!("chrono::DateTime::parse_from_rfc3339 error: {}", e); return Err(api::core::models::Error {
continue; code: -1,
msg: Some(format!("chrono::DateTime::parse_from_rfc3339 error: {}", e)),
});
} }
}; };
@ -134,46 +166,59 @@ impl Synchronizer {
} }
if !new_betting_history.is_empty() { if !new_betting_history.is_empty() {
self match self
.betting_history_repository .betting_history_repository
.inserts(&conn, &new_betting_history) .inserts(&conn, &new_betting_history)
.expect("betting_history_repository.inserts"); {
Ok(v) => v,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("betting_history_repository.inserts error: {:?}", e)),
});
}
};
} }
self if let Err(e) = self.synchronization_history_repository.insert(
.synchronization_history_repository &conn,
.insert( &repositories::synchronization_history::models::NewSynchronizationHistory {
&conn, item: repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(),
&repositories::synchronization_history::models::NewSynchronizationHistory { start_at,
item: repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(), complete_at: (chrono::Utc::now()).timestamp(),
start_at, code: 0,
complete_at: (chrono::Utc::now()).timestamp(), message: None,
code: 0, data: last_utc_created_at,
message: None, },
data: last_utc_created_at, ) {
}, println!("synchronization_history.insert error: {}", e);
) };
.expect("synchronization_history insert");
Ok::<(), api::core::models::Error>(()) Ok::<(), api::core::models::Error>(())
} }
.await .await
{ {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
self Ok(c) => c,
.synchronization_history_repository Err(e) => {
.insert( println!("pool.get error: {}", e);
&conn, return Ok(());
&repositories::synchronization_history::models::NewSynchronizationHistory { }
item: repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(), };
start_at,
complete_at: (chrono::Utc::now()).timestamp(), if let Err(e) = self.synchronization_history_repository.insert(
code: e.code, &conn,
message: e.msg, &repositories::synchronization_history::models::NewSynchronizationHistory {
data: None, item: repositories::synchronization::models::ITEM_BETTING_HISTORY.to_string(),
}, start_at,
) complete_at: (chrono::Utc::now()).timestamp(),
.expect("synchronization_history insert"); code: e.code,
message: e.msg,
data: None,
},
) {
println!("synchronization_history.insert error: {}", e);
};
} }
Ok(()) Ok(())

View File

@ -57,19 +57,32 @@ impl Synchronizer {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async { if let Err(e) = async {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("pool.get error: {}", e)),
});
}
};
let vendors = self let vendors = match self.vendor_repository.select_all(
.vendor_repository &conn,
.select_all( &repositories::vendor::models::FindAll {
&conn, pagination: None,
&repositories::vendor::models::FindAll { sorts: None,
pagination: None, search: None,
sorts: None, },
search: None, ) {
}, Ok(v) => v,
) Err(e) => {
.expect("vendor select_all"); return Err(api::core::models::Error {
code: -1,
msg: Some(format!("vendor_repository.select_all error: {}", e)),
});
}
};
let mut upsert_games: Vec<repositories::game::models::UpsertGame> = vec![]; let mut upsert_games: Vec<repositories::game::models::UpsertGame> = vec![];
@ -77,13 +90,23 @@ impl Synchronizer {
let req = api::game::models::ListGamesRequest { let req = api::game::models::ListGamesRequest {
vendor_key: v.key.clone(), vendor_key: v.key.clone(),
}; };
let res = self.game_api.list_games(req).await?; let res = match self.game_api.list_games(req).await {
Ok(v) => v,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("game_api.list_games error: {:?}", e)),
});
}
};
for g in res.games { for g in res.games {
match upsert_games.iter().find(|v| v.id.eq(&g.id)) { match upsert_games.iter().find(|v| v.id.eq(&g.id)) {
Some(d) => { Some(d) => {
println!("id is duplicated. e: {:?}, n: {:?}", d, g); return Err(api::core::models::Error {
continue; code: -1,
msg: Some(format!("id is duplicated. e: {:?}, n: {:?}", d, g)),
});
} }
None => {} None => {}
}; };
@ -101,45 +124,55 @@ impl Synchronizer {
} }
} }
let _affected = self let _affected = match self.game_repository.upserts(&conn, upsert_games) {
.game_repository Ok(c) => c,
.upserts(&conn, upsert_games) Err(e) => {
.expect("game upsert"); return Err(api::core::models::Error {
code: -1,
msg: Some(format!("game_repository.upserts error: {}", e)),
});
}
};
self if let Err(e) = self.synchronization_history_repository.insert(
.synchronization_history_repository &conn,
.insert( &repositories::synchronization_history::models::NewSynchronizationHistory {
&conn, item: repositories::synchronization::models::ITEM_GAMES.to_string(),
&repositories::synchronization_history::models::NewSynchronizationHistory { start_at,
item: repositories::synchronization::models::ITEM_GAMES.to_string(), complete_at: (chrono::Utc::now()).timestamp(),
start_at, code: 0,
complete_at: (chrono::Utc::now()).timestamp(), message: None,
code: 0, data: None,
message: None, },
data: None, ) {
}, println!("synchronization_history.insert error: {}", e);
) };
.expect("synchronization_history insert");
Ok::<(), api::core::models::Error>(()) Ok::<(), api::core::models::Error>(())
} }
.await .await
{ {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
self Ok(c) => c,
.synchronization_history_repository Err(e) => {
.insert( println!("pool.get error: {}", e);
&conn, return Ok(());
&repositories::synchronization_history::models::NewSynchronizationHistory { }
item: repositories::synchronization::models::ITEM_GAMES.to_string(), };
start_at,
complete_at: (chrono::Utc::now()).timestamp(), if let Err(e) = self.synchronization_history_repository.insert(
code: e.code, &conn,
message: e.msg, &repositories::synchronization_history::models::NewSynchronizationHistory {
data: None, item: repositories::synchronization::models::ITEM_GAMES.to_string(),
}, start_at,
) complete_at: (chrono::Utc::now()).timestamp(),
.expect("synchronization_history insert"); code: e.code,
message: e.msg,
data: None,
},
) {
println!("synchronization_history.insert error: {}", e);
};
} }
Ok(()) Ok(())

View File

@ -63,17 +63,38 @@ impl Synchronizer {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async { if let Err(e) = async {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("pool.get error: {}", e)),
});
}
};
let req = api::member::models::ListMembersRequest { group_key: None }; let req = api::member::models::ListMembersRequest { group_key: None };
let res = self.member_api.list_members(req).await?; let res = match self.member_api.list_members(req).await {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("member_api.list_members: {:?}", e)),
});
}
};
for u in res.users { for u in res.users {
if self if match self.member_repository.select(&conn, u.id) {
.member_repository Ok(c) => c,
.select(&conn, u.id) Err(e) => {
.expect("member_repository.select") return Err(api::core::models::Error {
.is_none() code: -1,
msg: Some(format!("member_repository.select error: {}", e)),
});
}
}
.is_none()
{ {
let ss_get_member_by_username_req = bpr::ss::member::GetMemberByUsernameRequest { let ss_get_member_by_username_req = bpr::ss::member::GetMemberByUsernameRequest {
client: Some(bpr::models::core::network::Client { client: Some(bpr::models::core::network::Client {
@ -100,8 +121,13 @@ impl Synchronizer {
{ {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
println!("bpr::ss::member::SUBJECT_GET_MEMBER_BY_USERNAME: {}", e); return Err(api::core::models::Error {
continue; code: -1,
msg: Some(format!(
"bpr::ss::member::SUBJECT_GET_MEMBER_BY_USERNAME: {}",
e
)),
});
} }
}; };
@ -111,17 +137,24 @@ impl Synchronizer {
) { ) {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
println!("bpr::ss::member::GetMemberByUsernameResponse: {}", e); return Err(api::core::models::Error {
continue; code: -1,
msg: Some(format!(
"bpr::ss::member::GetMemberByUsernameResponse: {}",
e
)),
});
} }
}; };
if let Some(e) = ss_get_member_by_username_res.error { if let Some(e) = ss_get_member_by_username_res.error {
println!( return Err(api::core::models::Error {
"bpr::ss::member::SUBJECT_GET_MEMBER_BY_USERNAME error: {}", code: -1,
e msg: Some(format!(
); "bpr::ss::member::SUBJECT_GET_MEMBER_BY_USERNAME error: {}",
continue; e
)),
});
} }
match ss_get_member_by_username_res.result { match ss_get_member_by_username_res.result {
@ -138,19 +171,25 @@ impl Synchronizer {
) { ) {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
println!("member_repository.insert: {}", e); return Err(api::core::models::Error {
continue; code: -1,
msg: Some(format!("member_repository.insert error: {}", e)),
});
} }
}; };
} }
None => { None => {
println!("member is not exist: {}", u.site_username); return Err(api::core::models::Error {
continue; code: -1,
msg: Some(format!("member is not exist: {}", u.site_username)),
});
} }
}, },
None => { None => {
println!("member is not exist: {}", u.site_username); return Err(api::core::models::Error {
continue; code: -1,
msg: Some(format!("member is not exist: {}", u.site_username)),
});
} }
}; };
} }
@ -169,40 +208,45 @@ impl Synchronizer {
.expect("member update"); .expect("member update");
} }
self if let Err(e) = self.synchronization_history_repository.insert(
.synchronization_history_repository &conn,
.insert( &repositories::synchronization_history::models::NewSynchronizationHistory {
&conn, item: repositories::synchronization::models::ITEM_MEMBERS.to_string(),
&repositories::synchronization_history::models::NewSynchronizationHistory { start_at,
item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), complete_at: (chrono::Utc::now()).timestamp(),
start_at, code: 0,
complete_at: (chrono::Utc::now()).timestamp(), message: None,
code: 0, data: None,
message: None, },
data: None, ) {
}, println!("synchronization_history.insert error: {}", e);
) };
.expect("synchronization_history insert");
Ok::<(), api::core::models::Error>(()) Ok::<(), api::core::models::Error>(())
} }
.await .await
{ {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
self Ok(c) => c,
.synchronization_history_repository Err(e) => {
.insert( println!("pool.get error: {}", e);
&conn, return Ok(());
&repositories::synchronization_history::models::NewSynchronizationHistory { }
item: repositories::synchronization::models::ITEM_MEMBERS.to_string(), };
start_at,
complete_at: (chrono::Utc::now()).timestamp(), if let Err(e) = self.synchronization_history_repository.insert(
code: e.code, &conn,
message: e.msg, &repositories::synchronization_history::models::NewSynchronizationHistory {
data: None, item: repositories::synchronization::models::ITEM_MEMBERS.to_string(),
}, start_at,
) complete_at: (chrono::Utc::now()).timestamp(),
.expect("synchronization_history insert"); code: e.code,
message: e.msg,
data: None,
},
) {
println!("synchronization_history.insert error: {}", e);
};
} }
Ok(()) Ok(())

View File

@ -60,12 +60,26 @@ impl Synchronizer {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async { if let Err(e) = async {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("pool.get error: {}", e)),
});
}
};
let req = api::member::models::ListMembersRequest { group_key: None }; let req = api::member::models::ListMembersRequest { group_key: None };
let res = self.member_api.list_members(req).await?; let res = match self.member_api.list_members(req).await {
Ok(c) => c,
let mut messages: Vec<String> = vec![]; Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("member_api.list_members: {:?}", e)),
});
}
};
for u in res.users { for u in res.users {
let req = api::member_account::models::GetBalanceForUserRequest { let req = api::member_account::models::GetBalanceForUserRequest {
@ -74,11 +88,13 @@ impl Synchronizer {
let res = match self.member_account_api.get_balance_for_user(req).await { let res = match self.member_account_api.get_balance_for_user(req).await {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
messages.push(format!( return Err(api::core::models::Error {
"api.get_balance_for_user code: {}, msg: {:?}", code: -1,
e.code, e.msg msg: Some(format!(
)); "member_account_api.get_balance_for_user error: {:?}",
continue; e
)),
});
} }
}; };
@ -89,56 +105,59 @@ impl Synchronizer {
companies: res.companies, companies: res.companies,
}; };
if let Err(e) = self match self
.member_repository .member_repository
.update_balance(&conn, u.id, &modify_member) .update_balance(&conn, u.id, &modify_member)
{ {
messages.push(format!("repository.update_balance error: {}", e)); Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("member_repository.update_balance error: {}", e)),
});
}
}; };
} }
if !messages.is_empty() { if let Err(e) = self.synchronization_history_repository.insert(
return Err(api::core::models::Error { &conn,
code: -1, &repositories::synchronization_history::models::NewSynchronizationHistory {
msg: Some(serde_json::to_string(&messages).expect("json")), item: repositories::synchronization::models::ITEM_BALANCE_USER_ALL.to_string(),
}); start_at,
} complete_at: (chrono::Utc::now()).timestamp(),
code: 0,
self message: None,
.synchronization_history_repository data: None,
.insert( },
&conn, ) {
&repositories::synchronization_history::models::NewSynchronizationHistory { println!("synchronization_history.insert error: {}", e);
item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), };
start_at,
complete_at: (chrono::Utc::now()).timestamp(),
code: 0,
message: None,
data: None,
},
)
.expect("synchronization_history insert");
Ok::<(), api::core::models::Error>(()) Ok::<(), api::core::models::Error>(())
} }
.await .await
{ {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
println!("pool.get error: {}", e);
return Ok(());
}
};
self if let Err(e) = self.synchronization_history_repository.insert(
.synchronization_history_repository &conn,
.insert( &repositories::synchronization_history::models::NewSynchronizationHistory {
&conn, item: repositories::synchronization::models::ITEM_BALANCE_USER_ALL.to_string(),
&repositories::synchronization_history::models::NewSynchronizationHistory { start_at,
item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), complete_at: (chrono::Utc::now()).timestamp(),
start_at, code: e.code,
complete_at: (chrono::Utc::now()).timestamp(), message: e.msg,
code: e.code, data: None,
message: e.msg, },
data: None, ) {
}, println!("synchronization_history.insert error: {}", e);
) };
.expect("synchronization_history insert");
} }
Ok(()) Ok(())
@ -152,23 +171,46 @@ impl Synchronizer {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async { if let Err(e) = async {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("pool.get error: {}", e)),
});
}
};
let m = match self let m = match self.member_repository.select_by_member_id(&conn, member_id) {
.member_repository Ok(c) => match c {
.select_by_member_id(&conn, member_id) Some(m) => m,
.expect("member_repository.select_by_member_id") None => {
{ return Ok(());
Some(m) => m, }
None => { },
return Ok(()); Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!(
"member_repository.select_by_member_id error: {}",
e
)),
});
} }
}; };
let req = api::member_account::models::GetBalanceForUserRequest { let req = api::member_account::models::GetBalanceForUserRequest {
username: usename.to_string(), username: usename.to_string(),
}; };
let res = self.member_account_api.get_balance_for_user(req).await?; let res = match self.member_account_api.get_balance_for_user(req).await {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("member_account_api.get_balance_for_user: {:?}", e)),
});
}
};
let modify_member = repositories::member::models::ModifyMemberForBalance { let modify_member = repositories::member::models::ModifyMemberForBalance {
balance: res.balance, balance: res.balance,
@ -177,31 +219,58 @@ impl Synchronizer {
companies: res.companies, companies: res.companies,
}; };
self match self
.member_repository .member_repository
.update_balance(&conn, m.id, &modify_member) .update_balance(&conn, m.id, &modify_member)
.expect("member_repository.update_balance"); {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("member_repository.update_balance: {}", e)),
});
}
};
if let Err(e) = self.synchronization_history_repository.insert(
&conn,
&repositories::synchronization_history::models::NewSynchronizationHistory {
item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
start_at,
complete_at: (chrono::Utc::now()).timestamp(),
code: 0,
message: None,
data: None,
},
) {
println!("synchronization_history.insert error: {}", e);
};
Ok::<(), api::core::models::Error>(()) Ok::<(), api::core::models::Error>(())
} }
.await .await
{ {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
println!("pool.get error: {}", e);
return Ok(());
}
};
self if let Err(e) = self.synchronization_history_repository.insert(
.synchronization_history_repository &conn,
.insert( &repositories::synchronization_history::models::NewSynchronizationHistory {
&conn, item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
&repositories::synchronization_history::models::NewSynchronizationHistory { start_at,
item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), complete_at: (chrono::Utc::now()).timestamp(),
start_at, code: e.code,
complete_at: (chrono::Utc::now()).timestamp(), message: e.msg,
code: e.code, data: None,
message: e.msg, },
data: None, ) {
}, println!("synchronization_history.insert error: {}", e);
) };
.expect("synchronization_history insert");
} }
Ok(()) Ok(())
@ -211,77 +280,115 @@ impl Synchronizer {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async { if let Err(e) = async {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("pool.get error: {}", e)),
});
}
};
let req = api::member_account::models::GetBalanceForPartnerRequest {}; let req = api::member_account::models::GetBalanceForPartnerRequest {};
let res = self.member_account_api.get_balance_for_partner(req).await?; let res = match self.member_account_api.get_balance_for_partner(req).await {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!(
"member_account_api.get_balance_for_partner: {:?}",
e
)),
});
}
};
match self match self.balance_repository.select(&conn) {
.balance_repository Ok(c) => match c {
.select(&conn) Some(b) => {
.expect("balance select") match self.balance_repository.update(
{
Some(b) => {
self
.balance_repository
.update(
&conn, &conn,
b.id, b.id,
&repositories::balance::models::ModifyBalance { &repositories::balance::models::ModifyBalance {
balance: res.balance, balance: res.balance,
balance_bota: res.balance_bota, balance_bota: res.balance_bota,
}, },
) ) {
.expect("balance update"); Ok(_c) => {}
} Err(e) => {
None => { return Err(api::core::models::Error {
self code: -1,
.balance_repository msg: Some(format!("balance_repository.update error: {}", e)),
.insert( });
}
}
}
None => {
match self.balance_repository.insert(
&conn, &conn,
&repositories::balance::models::NewBalance { &repositories::balance::models::NewBalance {
balance: res.balance, balance: res.balance,
balance_bota: res.balance_bota, balance_bota: res.balance_bota,
}, },
) ) {
.expect("balance insert"); Ok(_c) => {}
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("balance_repository.insert error: {}", e)),
});
}
};
}
},
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("member_repository.select error: {}", e)),
});
} }
} };
self if let Err(e) = self.synchronization_history_repository.insert(
.synchronization_history_repository &conn,
.insert( &repositories::synchronization_history::models::NewSynchronizationHistory {
&conn, item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(),
&repositories::synchronization_history::models::NewSynchronizationHistory { start_at,
item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), complete_at: (chrono::Utc::now()).timestamp(),
start_at, code: 0,
complete_at: (chrono::Utc::now()).timestamp(), message: None,
code: 0, data: None,
message: None, },
data: None, ) {
}, println!("synchronization_history.insert error: {}", e);
) };
.expect("synchronization_history insert");
Ok::<(), api::core::models::Error>(()) Ok::<(), api::core::models::Error>(())
} }
.await .await
{ {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
self Ok(c) => c,
.synchronization_history_repository Err(e) => {
.insert( println!("pool.get error: {}", e);
&conn, return Ok(());
&repositories::synchronization_history::models::NewSynchronizationHistory { }
item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), };
start_at,
complete_at: (chrono::Utc::now()).timestamp(), if let Err(e) = self.synchronization_history_repository.insert(
code: e.code, &conn,
message: e.msg, &repositories::synchronization_history::models::NewSynchronizationHistory {
data: None, item: repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(),
}, start_at,
) complete_at: (chrono::Utc::now()).timestamp(),
.expect("synchronization_history insert"); code: e.code,
message: e.msg,
data: None,
},
) {
println!("synchronization_history.insert error: {}", e);
};
} }
Ok(()) Ok(())

View File

@ -54,10 +54,26 @@ impl Synchronizer {
let start_at = (chrono::Utc::now()).timestamp(); let start_at = (chrono::Utc::now()).timestamp();
if let Err(e) = async { if let Err(e) = async {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("pool.get error: {}", e)),
});
}
};
let req = api::vendor::models::ListVendorsRequest {}; let req = api::vendor::models::ListVendorsRequest {};
let res = self.vendor_api.list_vendors(req).await?; let res = match self.vendor_api.list_vendors(req).await {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("vendor_api.list_vendors: {:?}", e)),
});
}
};
let upsert_vendors: Vec<repositories::vendor::models::UpsertVendor> = res let upsert_vendors: Vec<repositories::vendor::models::UpsertVendor> = res
.vendors .vendors
@ -76,45 +92,55 @@ impl Synchronizer {
}) })
.collect(); .collect();
let _affected = self let _affected = match self.vendor_repository.upserts(&conn, upsert_vendors) {
.vendor_repository Ok(c) => c,
.upserts(&conn, upsert_vendors) Err(e) => {
.expect("vendor upsert"); return Err(api::core::models::Error {
code: -1,
msg: Some(format!("vendor_repository.upserts: {:?}", e)),
});
}
};
self if let Err(e) = self.synchronization_history_repository.insert(
.synchronization_history_repository &conn,
.insert( &repositories::synchronization_history::models::NewSynchronizationHistory {
&conn, item: repositories::synchronization::models::ITEM_VENDORS.to_string(),
&repositories::synchronization_history::models::NewSynchronizationHistory { start_at,
item: repositories::synchronization::models::ITEM_VENDORS.to_string(), complete_at: (chrono::Utc::now()).timestamp(),
start_at, code: 0,
complete_at: (chrono::Utc::now()).timestamp(), message: None,
code: 0, data: None,
message: None, },
data: None, ) {
}, println!("synchronization_history.insert error: {}", e);
) };
.expect("synchronization_history insert");
Ok::<(), api::core::models::Error>(()) Ok::<(), api::core::models::Error>(())
} }
.await .await
{ {
let conn = self.pool.get().expect("conn"); let conn = match self.pool.get() {
self Ok(c) => c,
.synchronization_history_repository Err(e) => {
.insert( println!("pool.get error: {}", e);
&conn, return Ok(());
&repositories::synchronization_history::models::NewSynchronizationHistory { }
item: repositories::synchronization::models::ITEM_VENDORS.to_string(), };
start_at,
complete_at: (chrono::Utc::now()).timestamp(), if let Err(e) = self.synchronization_history_repository.insert(
code: e.code, &conn,
message: e.msg, &repositories::synchronization_history::models::NewSynchronizationHistory {
data: None, item: repositories::synchronization::models::ITEM_VENDORS.to_string(),
}, start_at,
) complete_at: (chrono::Utc::now()).timestamp(),
.expect("synchronization_history insert"); code: e.code,
message: e.msg,
data: None,
},
) {
println!("synchronization_history.insert error: {}", e);
};
} }
Ok(()) Ok(())