From 4637972b10ff60ff3b4fc991b32aaede63d31103 Mon Sep 17 00:00:00 2001 From: PARK BYUNG JUN Date: Sun, 28 Aug 2022 14:59:35 +0000 Subject: [PATCH] events are added --- .devcontainer/Dockerfile | 10 +- .devcontainer/devcontainer.json | 7 +- Cargo.toml | 4 +- src/events/member_bank_deposit/event.rs | 208 ++++++++++++++++++ src/events/member_bank_deposit/mod.rs | 1 + src/events/member_bank_withdraw/event.rs | 206 +++++++++++++++++ src/events/member_bank_withdraw/mod.rs | 1 + src/events/mod.rs | 2 + src/schedulers/balance/scheduler.rs | 2 +- .../member_account/synchronizer.rs | 70 +++++- 10 files changed, 498 insertions(+), 13 deletions(-) create mode 100644 src/events/member_bank_deposit/event.rs create mode 100644 src/events/member_bank_deposit/mod.rs create mode 100644 src/events/member_bank_withdraw/event.rs create mode 100644 src/events/member_bank_withdraw/mod.rs diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index e109d35..c381773 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -1,9 +1,11 @@ -# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.245.0/containers/rust/.devcontainer/base.Dockerfile +# See here for image contents: https://github.com/microsoft/vscode-dev-containers/tree/v0.245.0/containers/debian/.devcontainer/base.Dockerfile -# [Choice] Debian OS version (use bullseye on local arm64/Apple Silicon): buster, bullseye +# [Choice] Debian version (use bullseye on local arm64/Apple Silicon): bullseye, buster ARG VARIANT="buster" -FROM mcr.microsoft.com/vscode/devcontainers/rust:0-${VARIANT} +FROM mcr.microsoft.com/vscode/devcontainers/base:0-${VARIANT} -# [Optional] Uncomment this section to install additional packages. +# ** [Optional] Uncomment this section to install additional packages. ** # RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ # && apt-get -y install --no-install-recommends +RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ + && apt-get -y install --no-install-recommends libpq-dev diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index 79bfb9e..75647a6 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -7,7 +7,7 @@ "args": { // Use the VARIANT arg to pick a Debian OS version: buster, bullseye // Use bullseye when on local on arm64/Apple Silicon. - "VARIANT": "buster" + "VARIANT": "bullseye" } }, "runArgs": [ @@ -40,6 +40,10 @@ }, // Add the IDs of extensions you want installed when the container is created. "extensions": [ + "donjayamanne.githistory", + "eamodio.gitlens", + "mhutchie.git-graph", + "ms-azuretools.vscode-docker", "mutantdino.resourcemonitor", "rust-lang.rust-analyzer", "serayuzgur.crates", @@ -56,6 +60,7 @@ // Comment out to connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "vscode", "features": { + "git": "latest", "rust": "latest" } } \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index de589ce..c49e0ea 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -29,7 +29,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-cron-scheduler = { version = "*" } uuid = { version = "0", features = ["serde", "v4", "v5"] } -beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.78-snapshot" } -beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.63-snapshot" } +beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.79-snapshot" } +beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.64-snapshot" } [build-dependencies] diff --git a/src/events/member_bank_deposit/event.rs b/src/events/member_bank_deposit/event.rs new file mode 100644 index 0000000..97f4c10 --- /dev/null +++ b/src/events/member_bank_deposit/event.rs @@ -0,0 +1,208 @@ +use crate::api; +use crate::core; +use crate::repositories; +use crate::synchronizations; +use beteran_common_rust as bcr; +use beteran_protobuf_rust as bpr; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; +use prost::Message; +use std::str::FromStr; + +pub struct EventHandler { + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + api_config: core::config::ApiConfig, + member_repository: repositories::member::repository::Repository, + member_account_api: api::member_account::api::Api, + member_account_synchronizer: synchronizations::member_account::synchronizer::Synchronizer, +} + +impl std::fmt::Debug for EventHandler { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("EventHandler of members").finish() + } +} + +impl EventHandler { + /// + pub fn new( + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + member_account_synchronizer: synchronizations::member_account::synchronizer::Synchronizer, + api_config: core::config::ApiConfig, + ) -> EventHandler { + EventHandler { + connection_broker, + queue_broker, + pool, + member_account_synchronizer, + api_config: api_config.clone(), + member_repository: repositories::member::repository::Repository::new(), + member_account_api: api::member_account::api::Api::new(api_config.clone()), + } + } + + pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { + futures::try_join!(self.event_after_update_member_bank_deposit_for_state()).map(|_| ()) + } + + async fn event_after_update_member_bank_deposit_for_state( + &self, + ) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member_bank_deposit::EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BANK_DEPOSIT_FOR_STATE, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let eve = bpr::ss::member_bank_deposit::AfterUpdateMemberBankDepositForState::decode( + message.data.as_slice(), + ) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match eve.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let event = match eve.event { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "event".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let member_bank_deposit = match event.member_bank_deposit { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "member_bank_deposit".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let member = match member_bank_deposit.member { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "member_bank_deposit.member".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let member_id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid member.id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "member.id".to_string(), + value: member.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + match beteran_protobuf_rust::models::member_bank_deposit::MemberBankDepositState::from_i32( + member_bank_deposit.state, + ) { + Some(s) => { + if s == beteran_protobuf_rust::models::member_bank_deposit::MemberBankDepositState::Complete { + self.member_account_api.create_deposit(api::member_account::models::CreateDepositRequest{ + username: member.username.clone(), + cash_type: None, + amount: member_bank_deposit.amount as i64, + request_key: None, + }).await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {:?}", e), + data: None, + }) + })?; + + self.member_account_synchronizer.balance_for_user(member_id, &member.username).await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {:?}", e), + data: None, + }) + })?; + } + } + None => {} + }; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + println!("error: {}", e); + } + } + + Ok(()) + } +} diff --git a/src/events/member_bank_deposit/mod.rs b/src/events/member_bank_deposit/mod.rs new file mode 100644 index 0000000..53f1126 --- /dev/null +++ b/src/events/member_bank_deposit/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/src/events/member_bank_withdraw/event.rs b/src/events/member_bank_withdraw/event.rs new file mode 100644 index 0000000..e894ed1 --- /dev/null +++ b/src/events/member_bank_withdraw/event.rs @@ -0,0 +1,206 @@ +use crate::api; +use crate::core; +use crate::repositories; +use crate::synchronizations; +use beteran_common_rust as bcr; +use beteran_protobuf_rust as bpr; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; +use prost::Message; +use std::str::FromStr; + +pub struct EventHandler { + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + api_config: core::config::ApiConfig, + member_repository: repositories::member::repository::Repository, + member_account_api: api::member_account::api::Api, + member_account_synchronizer: synchronizations::member_account::synchronizer::Synchronizer, +} + +impl std::fmt::Debug for EventHandler { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("EventHandler of members").finish() + } +} + +impl EventHandler { + /// + pub fn new( + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + member_account_synchronizer: synchronizations::member_account::synchronizer::Synchronizer, + api_config: core::config::ApiConfig, + ) -> EventHandler { + EventHandler { + connection_broker, + queue_broker, + pool, + member_account_synchronizer, + api_config: api_config.clone(), + member_repository: repositories::member::repository::Repository::new(), + member_account_api: api::member_account::api::Api::new(api_config.clone()), + } + } + + pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { + futures::try_join!(self.event_after_update_member_bank_withdraw_for_state()).map(|_| ()) + } + + async fn event_after_update_member_bank_withdraw_for_state( + &self, + ) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member_bank_withdraw::EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BANK_WITHDRAW_FOR_STATE, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let eve = bpr::ss::member_bank_withdraw::AfterUpdateMemberBankWithdrawForState::decode( + message.data.as_slice(), + ) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match eve.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let event = match eve.event { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "event".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let member_bank_withdraw = match event.member_bank_withdraw { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "member_bank_withdraw".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let member = match member_bank_withdraw.member { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "member_bank_withdraw.member".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let member_id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid member.id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "member.id".to_string(), + value: member.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + match beteran_protobuf_rust::models::member_bank_withdraw::MemberBankWithdrawState::from_i32( + member_bank_withdraw.state, + ) { + Some(s) => { + if s == beteran_protobuf_rust::models::member_bank_withdraw::MemberBankWithdrawState::Complete { + self.member_account_api.create_withdraw(api::member_account::models::CreateWithdrawRequest{ + username: member.username.clone(), + request_key: None, + }).await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {:?}", e), + data: None, + }) + })?; + + self.member_account_synchronizer.balance_for_user(member_id, &member.username).await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {:?}", e), + data: None, + }) + })?; + } + } + None => {} + }; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + println!("error: {}", e); + } + } + + Ok(()) + } +} diff --git a/src/events/member_bank_withdraw/mod.rs b/src/events/member_bank_withdraw/mod.rs new file mode 100644 index 0000000..53f1126 --- /dev/null +++ b/src/events/member_bank_withdraw/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/src/events/mod.rs b/src/events/mod.rs index 0dd2a03..eaf2aec 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1 +1,3 @@ pub mod member; +pub mod member_bank_withdraw; +pub mod member_bank_deposit; diff --git a/src/schedulers/balance/scheduler.rs b/src/schedulers/balance/scheduler.rs index cdbef2a..a35c075 100644 --- a/src/schedulers/balance/scheduler.rs +++ b/src/schedulers/balance/scheduler.rs @@ -49,7 +49,7 @@ impl Scheduler { Box::pin(async move { self .member_account_synchronizer - .balance_for_user() + .balance_for_user_all() .await .expect("member_account_synchronizer.balance_for_user"); }) diff --git a/src/synchronizations/member_account/synchronizer.rs b/src/synchronizations/member_account/synchronizer.rs index 0c68b2d..cc8677e 100644 --- a/src/synchronizations/member_account/synchronizer.rs +++ b/src/synchronizations/member_account/synchronizer.rs @@ -30,7 +30,7 @@ impl std::clone::Clone for Synchronizer { api_config: self.api_config.clone(), synchronization_history_repository: repositories::synchronization_history::repository::Repository::new(), - balance_repository: repositories::balance::repository::Repository::new(), + balance_repository: repositories::balance::repository::Repository::new(), member_repository: repositories::member::repository::Repository::new(), member_api: api::member::api::Api::new(self.api_config.clone()), member_account_api: api::member_account::api::Api::new(self.api_config.clone()), @@ -49,14 +49,14 @@ impl Synchronizer { api_config: api_config.clone(), synchronization_history_repository: repositories::synchronization_history::repository::Repository::new(), - balance_repository: repositories::balance::repository::Repository::new(), + balance_repository: repositories::balance::repository::Repository::new(), member_repository: repositories::member::repository::Repository::new(), member_api: api::member::api::Api::new(api_config.clone()), member_account_api: api::member_account::api::Api::new(api_config), } } - pub async fn balance_for_user(&self) -> Result<(), Box> { + pub async fn balance_for_user_all(&self) -> Result<(), Box> { let start_at = (chrono::Utc::now()).timestamp(); if let Err(e) = async { @@ -142,6 +142,68 @@ impl Synchronizer { Ok(()) } + pub async fn balance_for_user( + &self, + member_id: uuid::Uuid, + usename: &str, + ) -> Result<(), Box> { + let start_at = (chrono::Utc::now()).timestamp(); + + if let Err(e) = async { + let conn = self.pool.get().expect("conn"); + + let m = match self + .member_repository + .select_by_member_id(&conn, member_id) + .expect("member_repository.select_by_member_id") + { + Some(m) => m, + None => { + return Ok(()); + } + }; + + let req = api::member_account::models::GetBalanceForUserRequest { + username: usename.to_string(), + }; + let res = self.member_account_api.get_balance_for_user(req).await?; + + let modify_member = repositories::member::models::ModifyMemberForBalance { + balance: res.balance, + balance_bota: res.balance_bota, + balance_sum: res.balance_sum, + companies: res.companies, + }; + + self + .member_repository + .update_balance(&conn, m.id, &modify_member) + .expect("member_repository.update_balance"); + + Ok::<(), api::core::models::Error>(()) + } + .await + { + let conn = self.pool.get().expect("conn"); + + self + .synchronization_history_repository + .insert( + &conn, + &repositories::synchronization_history::models::NewSynchronizationHistory { + item: repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + complete_at: (chrono::Utc::now()).timestamp(), + code: e.code, + message: e.msg, + }, + ) + .expect("synchronization_history insert"); + } + + Ok(()) + } + pub async fn balance_for_partner(&self) -> Result<(), Box> { let start_at = (chrono::Utc::now()).timestamp(); @@ -219,6 +281,4 @@ impl Synchronizer { Ok(()) } - - }