member_balance is added

This commit is contained in:
병준 박 2022-09-16 16:44:45 +00:00
parent aa44d6aab1
commit c3b1ad2023
3 changed files with 83 additions and 2 deletions

View File

@ -36,7 +36,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio-cron-scheduler = { version = "0" } tokio-cron-scheduler = { version = "0" }
uuid = { version = "0", features = ["serde", "v4", "v5"] } uuid = { version = "0", features = ["serde", "v4", "v5"] }
beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.97-snapshot" } beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.99-snapshot" }
beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.83-snapshot" } beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.85-snapshot" }
[build-dependencies] [build-dependencies]

View File

@ -80,6 +80,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
); );
let member_account_synchronizer = let member_account_synchronizer =
synchronizations::member_account::synchronizer::Synchronizer::new( synchronizations::member_account::synchronizer::Synchronizer::new(
connection_broker.clone(),
pool.clone(), pool.clone(),
api_config.clone(), api_config.clone(),
); );

View File

@ -1,13 +1,18 @@
use crate::api; use crate::api;
use crate::core; use crate::core;
use crate::repositories; use crate::repositories;
use beteran_protobuf_rust as bpr;
use diesel::{ use diesel::{
r2d2::{ConnectionManager, Pool}, r2d2::{ConnectionManager, Pool},
PgConnection, PgConnection,
}; };
use prost::Message;
use std::net::IpAddr;
use std::net::Ipv4Addr;
/// ///
pub struct Synchronizer { pub struct Synchronizer {
connection_broker: nats::asynk::Connection,
pool: Pool<ConnectionManager<PgConnection>>, pool: Pool<ConnectionManager<PgConnection>>,
api_config: core::config::ApiConfig, api_config: core::config::ApiConfig,
synchronization_history_repository: repositories::synchronization_history::repository::Repository, synchronization_history_repository: repositories::synchronization_history::repository::Repository,
@ -26,6 +31,7 @@ impl std::fmt::Debug for Synchronizer {
impl std::clone::Clone for Synchronizer { impl std::clone::Clone for Synchronizer {
fn clone(&self) -> Self { fn clone(&self) -> Self {
Self { Self {
connection_broker: self.connection_broker.clone(),
pool: self.pool.clone(), pool: self.pool.clone(),
api_config: self.api_config.clone(), api_config: self.api_config.clone(),
synchronization_history_repository: synchronization_history_repository:
@ -41,10 +47,12 @@ impl std::clone::Clone for Synchronizer {
impl Synchronizer { impl Synchronizer {
/// ///
pub fn new( pub fn new(
connection_broker: nats::asynk::Connection,
pool: Pool<ConnectionManager<PgConnection>>, pool: Pool<ConnectionManager<PgConnection>>,
api_config: core::config::ApiConfig, api_config: core::config::ApiConfig,
) -> Synchronizer { ) -> Synchronizer {
Synchronizer { Synchronizer {
connection_broker,
pool, pool,
api_config: api_config.clone(), api_config: api_config.clone(),
synchronization_history_repository: synchronization_history_repository:
@ -98,6 +106,16 @@ impl Synchronizer {
} }
}; };
let m = match self.member_repository.select(&conn, u.id) {
Ok(c) => c,
Err(e) => {
return Err(api::core::models::Error {
code: -1,
msg: Some(format!("member_repository.select error: {}", e)),
});
}
};
let modify_member = repositories::member::models::ModifyMemberForBalance { let modify_member = repositories::member::models::ModifyMemberForBalance {
balance: res.balance, balance: res.balance,
balance_bota: res.balance_bota, balance_bota: res.balance_bota,
@ -117,6 +135,38 @@ impl Synchronizer {
}); });
} }
}; };
if let Some(_m) = m {
if let Err(e) = self
.connection_broker
.publish(
bpr::ss::member_balance::EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE,
bpr::ss::member_balance::AfterUpdateMemberBalanceEvent {
client: Some(bpr::models::core::network::Client {
client_ip: self
.connection_broker
.client_ip()
.unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
.to_string(),
site_url: None,
access_token: None,
}),
event: Some(
bpr::ss::member_balance::after_update_member_balance_event::Event {
member_id: _m.member_id.to_string(),
balance: res.balance,
balance_bota: res.balance_bota,
balance_sum: res.balance_sum,
},
),
}
.encode_to_vec(),
)
.await
{
println!("EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE error: {}", e);
}
}
} }
if let Err(e) = self.synchronization_history_repository.insert( if let Err(e) = self.synchronization_history_repository.insert(
@ -232,6 +282,36 @@ impl Synchronizer {
} }
}; };
if let Err(e) = self
.connection_broker
.publish(
bpr::ss::member_balance::EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE,
bpr::ss::member_balance::AfterUpdateMemberBalanceEvent {
client: Some(bpr::models::core::network::Client {
client_ip: self
.connection_broker
.client_ip()
.unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))
.to_string(),
site_url: None,
access_token: None,
}),
event: Some(
bpr::ss::member_balance::after_update_member_balance_event::Event {
member_id: m.member_id.to_string(),
balance: res.balance,
balance_bota: res.balance_bota,
balance_sum: res.balance_sum,
},
),
}
.encode_to_vec(),
)
.await
{
println!("EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE error: {}", e);
}
if let Err(e) = self.synchronization_history_repository.insert( if let Err(e) = self.synchronization_history_repository.insert(
&conn, &conn,
&repositories::synchronization_history::models::NewSynchronizationHistory { &repositories::synchronization_history::models::NewSynchronizationHistory {