diff --git a/Cargo.toml b/Cargo.toml index 45d8807..96939d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-cron-scheduler = { version = "0" } uuid = { version = "0", features = ["serde", "v4", "v5"] } -beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.97-snapshot" } -beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.83-snapshot" } +beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.99-snapshot" } +beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.85-snapshot" } [build-dependencies] diff --git a/src/main.rs b/src/main.rs index e70e2a9..d2b0ad3 100644 --- a/src/main.rs +++ b/src/main.rs @@ -80,6 +80,7 @@ async fn main() -> Result<(), Box> { ); let member_account_synchronizer = synchronizations::member_account::synchronizer::Synchronizer::new( + connection_broker.clone(), pool.clone(), api_config.clone(), ); diff --git a/src/synchronizations/member_account/synchronizer.rs b/src/synchronizations/member_account/synchronizer.rs index 5f6386d..9932d16 100644 --- a/src/synchronizations/member_account/synchronizer.rs +++ b/src/synchronizations/member_account/synchronizer.rs @@ -1,13 +1,18 @@ use crate::api; use crate::core; use crate::repositories; +use beteran_protobuf_rust as bpr; use diesel::{ r2d2::{ConnectionManager, Pool}, PgConnection, }; +use prost::Message; +use std::net::IpAddr; +use std::net::Ipv4Addr; /// pub struct Synchronizer { + connection_broker: nats::asynk::Connection, pool: Pool>, api_config: core::config::ApiConfig, synchronization_history_repository: repositories::synchronization_history::repository::Repository, @@ -26,6 +31,7 @@ impl std::fmt::Debug for Synchronizer { impl std::clone::Clone for Synchronizer { fn clone(&self) -> Self { Self { + connection_broker: self.connection_broker.clone(), pool: self.pool.clone(), api_config: self.api_config.clone(), synchronization_history_repository: @@ -41,10 +47,12 @@ impl std::clone::Clone for Synchronizer { impl Synchronizer { /// pub fn new( + connection_broker: nats::asynk::Connection, pool: Pool>, api_config: core::config::ApiConfig, ) -> Synchronizer { Synchronizer { + connection_broker, pool, api_config: api_config.clone(), synchronization_history_repository: @@ -98,6 +106,16 @@ impl Synchronizer { } }; + let m = match self.member_repository.select(&conn, u.id) { + Ok(c) => c, + Err(e) => { + return Err(api::core::models::Error { + code: -1, + msg: Some(format!("member_repository.select error: {}", e)), + }); + } + }; + let modify_member = repositories::member::models::ModifyMemberForBalance { balance: res.balance, balance_bota: res.balance_bota, @@ -117,6 +135,38 @@ impl Synchronizer { }); } }; + + if let Some(_m) = m { + if let Err(e) = self + .connection_broker + .publish( + bpr::ss::member_balance::EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE, + bpr::ss::member_balance::AfterUpdateMemberBalanceEvent { + client: Some(bpr::models::core::network::Client { + client_ip: self + .connection_broker + .client_ip() + .unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) + .to_string(), + site_url: None, + access_token: None, + }), + event: Some( + bpr::ss::member_balance::after_update_member_balance_event::Event { + member_id: _m.member_id.to_string(), + balance: res.balance, + balance_bota: res.balance_bota, + balance_sum: res.balance_sum, + }, + ), + } + .encode_to_vec(), + ) + .await + { + println!("EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE error: {}", e); + } + } } if let Err(e) = self.synchronization_history_repository.insert( @@ -232,6 +282,36 @@ impl Synchronizer { } }; + if let Err(e) = self + .connection_broker + .publish( + bpr::ss::member_balance::EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE, + bpr::ss::member_balance::AfterUpdateMemberBalanceEvent { + client: Some(bpr::models::core::network::Client { + client_ip: self + .connection_broker + .client_ip() + .unwrap_or(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1))) + .to_string(), + site_url: None, + access_token: None, + }), + event: Some( + bpr::ss::member_balance::after_update_member_balance_event::Event { + member_id: m.member_id.to_string(), + balance: res.balance, + balance_bota: res.balance_bota, + balance_sum: res.balance_sum, + }, + ), + } + .encode_to_vec(), + ) + .await + { + println!("EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE error: {}", e); + } + if let Err(e) = self.synchronization_history_repository.insert( &conn, &repositories::synchronization_history::models::NewSynchronizationHistory {