From cddc1c2b68d9a819b8260feb9527830e4ab8982d Mon Sep 17 00:00:00 2001 From: PARK BYUNG JUN Date: Fri, 16 Sep 2022 16:23:37 +0000 Subject: [PATCH] member_balance is added --- Cargo.toml | 4 +- .../202207151000_member_balance/down.sql | 3 + migrations/202207151000_member_balance/up.sql | 23 + src/compositions/member/composition.rs | 44 +- src/compositions/member/models.rs | 30 +- src/events/member_balance/event.rs | 177 ++++ src/events/member_balance/mod.rs | 1 + src/events/mod.rs | 1 + src/main.rs | 6 + src/repositories/member_balance/mod.rs | 9 + src/repositories/member_balance/models.rs | 60 ++ src/repositories/member_balance/repository.rs | 189 +++++ src/repositories/member_balance/schema.rs | 22 + src/repositories/mod.rs | 1 + src/services/member/models.rs | 8 + src/services/member_balance/mod.rs | 2 + src/services/member_balance/models.rs | 18 + src/services/member_balance/service.rs | 785 ++++++++++++++++++ src/services/mod.rs | 1 + 19 files changed, 1376 insertions(+), 8 deletions(-) create mode 100644 migrations/202207151000_member_balance/down.sql create mode 100644 migrations/202207151000_member_balance/up.sql create mode 100644 src/events/member_balance/event.rs create mode 100644 src/events/member_balance/mod.rs create mode 100644 src/repositories/member_balance/mod.rs create mode 100644 src/repositories/member_balance/models.rs create mode 100644 src/repositories/member_balance/repository.rs create mode 100644 src/repositories/member_balance/schema.rs create mode 100644 src/services/member_balance/mod.rs create mode 100644 src/services/member_balance/models.rs create mode 100644 src/services/member_balance/service.rs diff --git a/Cargo.toml b/Cargo.toml index 47da750..5e9062b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-cron-scheduler = { version = "0" } uuid = { version = "0", features = ["serde", "v4", "v5"] } -beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.97-snapshot" } -beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.83-snapshot" } +beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.99-snapshot" } +beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.85-snapshot" } [build-dependencies] diff --git a/migrations/202207151000_member_balance/down.sql b/migrations/202207151000_member_balance/down.sql new file mode 100644 index 0000000..91d491a --- /dev/null +++ b/migrations/202207151000_member_balance/down.sql @@ -0,0 +1,3 @@ +DROP INDEX idx_member_balances_member_id; +DROP TRIGGER tg_member_balances_updated_at; +DROP TABLE member_balances; diff --git a/migrations/202207151000_member_balance/up.sql b/migrations/202207151000_member_balance/up.sql new file mode 100644 index 0000000..0fa0807 --- /dev/null +++ b/migrations/202207151000_member_balance/up.sql @@ -0,0 +1,23 @@ +CREATE TABLE IF NOT EXISTS member_balances ( + id UUID DEFAULT uuid_generate_v4(), + member_id UUID NOT NULL, + balance DOUBLE PRECISION NOT NULL DEFAULT 0.00, + balance_bota DOUBLE PRECISION NOT NULL DEFAULT 0.00, + balance_sum DOUBLE PRECISION NOT NULL DEFAULT 0.00, + created_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000), + updated_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000), + PRIMARY KEY (id), + CONSTRAINT fk_member_balances_member_id + FOREIGN KEY(member_id) + REFERENCES members(id) + +); + +CREATE INDEX idx_member_balances_member_id ON member_balances (member_id); + +-- trigger (updated_at) +CREATE TRIGGER tg_member_balances_updated_at + BEFORE UPDATE + ON member_balances + FOR EACH ROW + EXECUTE PROCEDURE update_updated_at_column(); diff --git a/src/compositions/member/composition.rs b/src/compositions/member/composition.rs index 5017445..9a0b812 100644 --- a/src/compositions/member/composition.rs +++ b/src/compositions/member/composition.rs @@ -114,7 +114,15 @@ WITH RECURSIVE rec AS ( mss.rate_bogglepowerball_combo as mss_rate_bogglepowerball_combo, mss.created_at as mss_created_at, mss.updated_at as mss_updated_at, - + + mb.id as mb_id, + mb.member_id as mb_member_id, + mb.balance as mb_balance, + mb.balance_bota as mb_balance_bota, + mb.balance_sum as mb_balance_sum, + mb.created_at as mb_created_at, + mb.updated_at as mb_updated_at, + mba.id as mba_id, mba.member_id as mba_member_id, mba.bank_id as mba_bank_id, @@ -147,6 +155,8 @@ WITH RECURSIVE rec AS ( ON mss.member_id = m.id LEFT OUTER JOIN member_game_settings mgs ON mgs.member_id = m.id + LEFT OUTER JOIN member_balances mb + ON mb.member_id = m.id LEFT OUTER JOIN member_bank_accounts mba ON mba.member_id = m.id LEFT OUTER JOIN banks mba_b @@ -237,6 +247,14 @@ WITH RECURSIVE rec AS ( c_mss.created_at as mss_created_at, c_mss.updated_at as mss_updated_at, + c_mb.id as mb_id, + c_mb.member_id as mb_member_id, + c_mb.balance as mb_balance, + c_mb.balance_bota as mb_balance_bota, + c_mb.balance_sum as mb_balance_sum, + c_mb.created_at as mb_created_at, + c_mb.updated_at as mb_updated_at, + c_mba.id as mba_id, c_mba.member_id as mba_member_id, c_mba.bank_id as mba_bank_id, @@ -269,6 +287,8 @@ WITH RECURSIVE rec AS ( ON c_mss.member_id = c_m.id LEFT OUTER JOIN member_game_settings c_mgs ON c_mgs.member_id = c_m.id + LEFT OUTER JOIN member_balances c_mb + ON c_mb.member_id = c_m.id LEFT OUTER JOIN member_bank_accounts c_mba ON c_mba.member_id = c_m.id LEFT OUTER JOIN banks c_mba_b @@ -356,7 +376,15 @@ WITH RECURSIVE rec AS ( mss_rate_bogglepowerball_combo, mss_created_at, mss_updated_at, - + + mb_id, + mb_member_id, + mb_balance, + mb_balance_bota, + mb_balance_sum, + mb_created_at, + mb_updated_at, + mba_id, mba_member_id, mba_bank_id, @@ -397,6 +425,8 @@ FROM members as m ON mss.member_id = m.id LEFT OUTER JOIN member_game_settings mgs ON mgs.member_id = m.id + LEFT OUTER JOIN member_balances mb + ON mb.member_id = m.id LEFT OUTER JOIN member_bank_accounts mba ON mba.member_id = m.id LEFT OUTER JOIN banks mba_b @@ -484,6 +514,14 @@ static MEMBER_QUERY: &str = " mss.created_at as mss_created_at, mss.updated_at as mss_updated_at, + mb.id as mb_id, + mb.member_id as mb_member_id, + mb.balance as mb_balance, + mb.balance_bota as mb_balance_bota, + mb.balance_sum as mb_balance_sum, + mb.created_at as mb_created_at, + mb.updated_at as mb_updated_at, + mba.id as mba_id, mba.member_id as mba_member_id, mba.bank_id as mba_bank_id, @@ -514,6 +552,8 @@ static MEMBER_QUERY: &str = " ON mss.member_id = m.id LEFT OUTER JOIN member_game_settings mgs ON mgs.member_id = m.id + LEFT OUTER JOIN member_balances mb + ON mb.member_id = m.id LEFT OUTER JOIN member_bank_accounts mba ON mba.member_id = m.id LEFT OUTER JOIN banks mba_b diff --git a/src/compositions/member/models.rs b/src/compositions/member/models.rs index 4b69667..f27e871 100644 --- a/src/compositions/member/models.rs +++ b/src/compositions/member/models.rs @@ -1,9 +1,10 @@ //! //! use crate::repositories::{ - bank::models::Bank, member::schema::MemberState, member_class::models::MemberClass, - member_game_setting::models::MemberGameSetting, member_level::models::MemberLevel, - member_settlement_setting::models::MemberSettlementSetting, site::models::Site, + bank::models::Bank, member::schema::MemberState, member_balance::models::MemberBalance, + member_class::models::MemberClass, member_game_setting::models::MemberGameSetting, + member_level::models::MemberLevel, member_settlement_setting::models::MemberSettlementSetting, + site::models::Site, }; use diesel::deserialize::QueryableByName; @@ -41,6 +42,8 @@ pub struct _MemberModel { /// pub member_settlement_setting: Option, /// + pub member_balance: Option, + /// pub last_signined_ip: Option, /// pub last_signined_at: Option, @@ -165,6 +168,21 @@ impl QueryableByName for _MemberModel { None => None, }; + let member_balance = match row + .get::, Option>("mb_id")? + { + Some(_) => Some(MemberBalance { + id: row.get("mb_id")?, + member_id: row.get("mb_member_id")?, + balance: row.get("mb_balance")?, + balance_bota: row.get("mb_balance_bota")?, + balance_sum: row.get("mb_balance_sum")?, + created_at: row.get("mb_created_at")?, + updated_at: row.get("mb_updated_at")?, + }), + None => None, + }; + Ok(_MemberModel { id: row.get("m_id")?, site, @@ -178,6 +196,7 @@ impl QueryableByName for _MemberModel { state_changed_at: row.get("m_state_changed_at")?, member_game_setting, member_settlement_setting, + member_balance, bank_account: member_bank_account, parent_member_id: row.get("m_parent_member_id")?, child_member_count: row.get("m_child_member_count")?, @@ -191,7 +210,7 @@ impl QueryableByName for _MemberModel { } /// -#[derive(PartialEq, PartialOrd, Debug, Clone)] +#[derive(PartialEq, Debug, Clone)] pub struct MemberModel { /// pub id: uuid::Uuid, @@ -226,6 +245,8 @@ pub struct MemberModel { /// pub member_settlement_setting: Option, /// + pub member_balance: Option, + /// pub last_signined_ip: Option, /// pub last_signined_at: Option, @@ -255,6 +276,7 @@ pub fn from_member_model(_m: &_MemberModel, p: Option, c: Vec) state_changed_at: _m.state_changed_at, member_game_setting: _m.member_game_setting.clone(), member_settlement_setting: _m.member_settlement_setting.clone(), + member_balance: _m.member_balance.clone(), last_signined_ip: _m.last_signined_ip.clone(), last_signined_at: _m.last_signined_at, created_at: _m.created_at, diff --git a/src/events/member_balance/event.rs b/src/events/member_balance/event.rs new file mode 100644 index 0000000..6cb854a --- /dev/null +++ b/src/events/member_balance/event.rs @@ -0,0 +1,177 @@ +use super::super::super::repositories; +use beteran_common_rust as bcr; +use beteran_protobuf_rust as bpr; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; +use prost::Message; +use std::str::FromStr; + +pub struct EventHandler { + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + member_balance_repository: repositories::member_balance::repository::Repository, +} + +impl std::fmt::Debug for EventHandler { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("EventHandler of members").finish() + } +} + +impl EventHandler { + /// + pub fn new( + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + ) -> EventHandler { + EventHandler { + connection_broker, + queue_broker, + pool, + member_balance_repository: repositories::member_balance::repository::Repository::new(), + } + } + + pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { + futures::try_join!(self.event_after_update_member_balance()).map(|_| ()) + } + + async fn event_after_update_member_balance(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member_balance::EVENT_SUBJECT_AFTER_UPDATE_MEMBER_BALANCE, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let eve = + bpr::ss::member_balance::AfterUpdateMemberBalanceEvent::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match eve.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let event = match eve.event { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "event".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let member_id = uuid::Uuid::from_str(event.member_id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid member_id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "member_id".to_string(), + value: event.member_id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let id = match self + .member_balance_repository + .select_by_member_id(&conn, member_id) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })? { + Some(_mb) => _mb.id, + None => { + let _mb = self + .member_balance_repository + .insert( + &conn, + &repositories::member_balance::models::NewMemberBalance { member_id }, + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + _mb.id + } + }; + + self + .member_balance_repository + .update( + &conn, + id, + &repositories::member_balance::models::ModifyMemberBalance { + balance: event.balance, + balance_bota: event.balance_bota, + balance_sum: event.balance_sum, + }, + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + println!("error: {}", e); + } + } + + Ok(()) + } +} diff --git a/src/events/member_balance/mod.rs b/src/events/member_balance/mod.rs new file mode 100644 index 0000000..53f1126 --- /dev/null +++ b/src/events/member_balance/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/src/events/mod.rs b/src/events/mod.rs index 0dd2a03..fd74dea 100644 --- a/src/events/mod.rs +++ b/src/events/mod.rs @@ -1 +1,2 @@ pub mod member; +pub mod member_balance; diff --git a/src/main.rs b/src/main.rs index 31c9b6d..7869e60 100644 --- a/src/main.rs +++ b/src/main.rs @@ -132,6 +132,11 @@ async fn main() -> Result<(), Box> { queue_server_broker.clone(), pool.clone(), ); + let member_balance_event_handler = events::member_balance::event::EventHandler::new( + connection_server_broker.clone(), + queue_server_broker.clone(), + pool.clone(), + ); println!("Server service [beteran-server-service] is started"); @@ -150,6 +155,7 @@ async fn main() -> Result<(), Box> { member_settlement_setting_service.subscribe(), site_service.subscribe(), member_event_handler.subscribe(), + member_balance_event_handler.subscribe(), )?; Ok(()) diff --git a/src/repositories/member_balance/mod.rs b/src/repositories/member_balance/mod.rs new file mode 100644 index 0000000..bef7c9e --- /dev/null +++ b/src/repositories/member_balance/mod.rs @@ -0,0 +1,9 @@ +//! +//! + +/// +pub mod models; +/// +pub mod repository; +/// +pub mod schema; diff --git a/src/repositories/member_balance/models.rs b/src/repositories/member_balance/models.rs new file mode 100644 index 0000000..4a8f049 --- /dev/null +++ b/src/repositories/member_balance/models.rs @@ -0,0 +1,60 @@ +use super::schema::member_balances; +use beteran_common_rust as bcr; + +/// +#[derive(Identifiable, Queryable, PartialEq, PartialOrd, Debug, Clone)] +#[table_name = "member_balances"] +pub struct MemberBalance { + /// + pub id: uuid::Uuid, + /// + pub member_id: uuid::Uuid, + /// + pub balance: f64, + /// + pub balance_bota: f64, + /// + pub balance_sum: f64, + /// + pub created_at: i64, + /// + pub updated_at: i64, +} + +/// +#[derive(Insertable, Debug, Clone)] +#[table_name = "member_balances"] +pub struct NewMemberBalance { + /// + pub member_id: uuid::Uuid, +} + +/// +#[derive(AsChangeset, Debug, Clone)] +#[table_name = "member_balances"] +pub struct ModifyMemberBalance { + /// + pub balance: f64, + /// + pub balance_bota: f64, + /// + pub balance_sum: f64, +} + +/// +#[derive(Debug, Clone)] +pub struct FindAllSearch { + /// + pub member_id: Option, +} + +/// +#[derive(Debug, Clone)] +pub struct FindAll { + /// + pub search: Option, + /// + pub pagination: Option, + /// + pub sorts: Option>, +} diff --git a/src/repositories/member_balance/repository.rs b/src/repositories/member_balance/repository.rs new file mode 100644 index 0000000..bf2bd96 --- /dev/null +++ b/src/repositories/member_balance/repository.rs @@ -0,0 +1,189 @@ +//! +//! +use super::{models, schema::member_balances}; +use beteran_common_rust as bcr; +use diesel::prelude::*; +use diesel::result::Error; + +/// +pub struct Repository {} + +impl std::fmt::Debug for Repository { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Repository of member_balances").finish() + } +} + +impl Default for Repository { + fn default() -> Self { + Self::new() + } +} + +impl Repository { + /// + pub fn new() -> Repository { + Repository {} + } + + /// + pub fn insert( + &self, + conn: &diesel::PgConnection, + new_resource: &models::NewMemberBalance, + ) -> Result { + let inserted = diesel::insert_into(member_balances::table) + .values(new_resource) + .get_result::(conn)?; + + Ok(inserted) + } + + /// + pub fn select( + &self, + conn: &diesel::PgConnection, + id: uuid::Uuid, + ) -> Result, Error> { + match member_balances::table + .find(id as uuid::Uuid) + .first::(conn) + { + Ok(m) => Ok(Some(m)), + Err(e) => match e { + diesel::result::Error::NotFound => Ok(None), + _ => Err(e), + }, + } + } + + /// + pub fn select_by_member_id( + &self, + conn: &diesel::PgConnection, + member_id: uuid::Uuid, + ) -> Result, Error> { + use member_balances::dsl; + + match member_balances::table + .filter(dsl::member_id.eq(member_id)) + .first::(conn) + { + Ok(m) => Ok(Some(m)), + Err(e) => match e { + diesel::result::Error::NotFound => Ok(None), + _ => Err(e), + }, + } + } + + /// + pub fn select_all_count( + &self, + conn: &diesel::PgConnection, + find_all: &models::FindAll, + ) -> Result { + let mut q = member_balances::table.into_boxed(); + + if let Some(s) = &find_all.search { + if let Some(sp) = s.member_id { + q = q.filter(member_balances::dsl::member_id.eq(sp)); + } + } + + q.count().get_result(conn) + } + + /// + pub fn select_all( + &self, + conn: &diesel::PgConnection, + find_all: &models::FindAll, + ) -> Result, Error> { + let mut q = member_balances::table.into_boxed(); + + if let Some(s) = &find_all.search { + if let Some(sp) = s.member_id { + q = q.filter(member_balances::dsl::member_id.eq(sp)); + } + } + + if let Some(p) = &find_all.pagination { + let page = p.page.unwrap_or(1); + + if let Some(page_size) = p.page_size { + q = q.offset(((page - 1) * page_size) as i64); + q = q.limit(page_size as i64); + } + } + if let Some(orderbys) = &find_all.sorts { + for s in orderbys { + match s { + bcr::pagination::Sort::ASC(property) => match property.as_str() { + "balance" => { + q = q.order_by(member_balances::balance.asc()); + } + "balance_bota" => { + q = q.order_by(member_balances::balance_bota.asc()); + } + "balance_sum" => { + q = q.order_by(member_balances::balance_sum.asc()); + } + "created_at" => { + q = q.order_by(member_balances::created_at.asc()); + } + "updated_at" => { + q = q.order_by(member_balances::updated_at.asc()); + } + _ => {} + }, + bcr::pagination::Sort::DESC(property) => match property.as_str() { + "balance" => { + q = q.order_by(member_balances::balance.desc()); + } + "balance_bota" => { + q = q.order_by(member_balances::balance_bota.desc()); + } + "balance_sum" => { + q = q.order_by(member_balances::balance_sum.desc()); + } + "created_at" => { + q = q.order_by(member_balances::created_at.desc()); + } + "updated_at" => { + q = q.order_by(member_balances::updated_at.desc()); + } + + _ => {} + }, + }; + } + } + + q.load::(conn) + } + + /// + pub fn update( + &self, + conn: &diesel::PgConnection, + id: uuid::Uuid, + modify: &models::ModifyMemberBalance, + ) -> Result { + use member_balances::dsl; + + diesel::update(dsl::member_balances.filter(dsl::id.eq(id))) + .set(modify) + .execute(conn) + .map(|c| c as u64) + } + + /// + pub fn delete(&self, conn: &diesel::PgConnection, id: uuid::Uuid) -> Result { + use member_balances::dsl; + + diesel::delete(member_balances::table.filter(dsl::id.eq(id))) + .execute(conn) + .map(|c| c as u64) + } +} diff --git a/src/repositories/member_balance/schema.rs b/src/repositories/member_balance/schema.rs new file mode 100644 index 0000000..c9511cf --- /dev/null +++ b/src/repositories/member_balance/schema.rs @@ -0,0 +1,22 @@ +//! +//! + +table! { + /// + member_balances(id) { + /// + id -> Uuid, + /// + member_id -> Uuid, + /// + balance -> Double, + /// + balance_bota -> Double, + /// + balance_sum -> Double, + /// + created_at -> BigInt, + /// + updated_at -> BigInt, + } +} diff --git a/src/repositories/mod.rs b/src/repositories/mod.rs index c683dbb..426271a 100644 --- a/src/repositories/mod.rs +++ b/src/repositories/mod.rs @@ -1,5 +1,6 @@ pub mod bank; pub mod member; +pub mod member_balance; pub mod member_bank_account; pub mod member_bank_deposit; pub mod member_bank_withdraw; diff --git a/src/services/member/models.rs b/src/services/member/models.rs index c1b5e91..eed88d3 100644 --- a/src/services/member/models.rs +++ b/src/services/member/models.rs @@ -85,6 +85,10 @@ impl From<&compositions::member::models::MemberModel> for bpr::models::member::M .member_settlement_setting .as_ref() .map(bpr::models::member_settlement_setting::MemberSettlementSetting::from), + member_balance: d + .member_balance + .as_ref() + .map(bpr::models::member_balance::MemberBalance::from), last_signined_ip: d.last_signined_ip.clone(), last_signined_at: d.last_signined_at.map(|d| d as u64), created_at: d.created_at as u64, @@ -130,6 +134,10 @@ impl From<&Box> .member_settlement_setting .as_ref() .map(bpr::models::member_settlement_setting::MemberSettlementSetting::from), + member_balance: d + .member_balance + .as_ref() + .map(bpr::models::member_balance::MemberBalance::from), last_signined_ip: d.last_signined_ip.clone(), last_signined_at: d.last_signined_at.map(|d| d as u64), created_at: d.created_at as u64, diff --git a/src/services/member_balance/mod.rs b/src/services/member_balance/mod.rs new file mode 100644 index 0000000..b9e52db --- /dev/null +++ b/src/services/member_balance/mod.rs @@ -0,0 +1,2 @@ +pub mod models; +pub mod service; diff --git a/src/services/member_balance/models.rs b/src/services/member_balance/models.rs new file mode 100644 index 0000000..edfe458 --- /dev/null +++ b/src/services/member_balance/models.rs @@ -0,0 +1,18 @@ +use crate::repositories; +use beteran_protobuf_rust as bpr; + +impl From<&repositories::member_balance::models::MemberBalance> + for bpr::models::member_balance::MemberBalance +{ + fn from(d: &repositories::member_balance::models::MemberBalance) -> Self { + bpr::models::member_balance::MemberBalance { + id: d.id.to_string(), + member_id: d.member_id.to_string(), + balance: d.balance, + balance_bota: d.balance_bota, + balance_sum: d.balance_sum, + created_at: d.created_at as u64, + updated_at: d.updated_at as u64, + } + } +} diff --git a/src/services/member_balance/service.rs b/src/services/member_balance/service.rs new file mode 100644 index 0000000..2211fa1 --- /dev/null +++ b/src/services/member_balance/service.rs @@ -0,0 +1,785 @@ +//! +//! + +use std::str::FromStr; + +use crate::compositions; +use crate::repositories; +use beteran_common_rust as bcr; +use beteran_protobuf_rust as bpr; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; +use prost::Message; + +/// +pub struct Service<'a> { + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + site_repository: repositories::site::repository::Repository, + site_composition: compositions::site::composition::Composition, + member_balance_repository: repositories::member_balance::repository::Repository, + argon2_config: argon2::Config<'a>, + password_salt: String, +} + +impl std::fmt::Debug for Service<'_> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("Service of service.member.service.identity") + .finish() + } +} + +impl Service<'_> { + /// + pub fn new( + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + password_salt: String, + ) -> Service<'static> { + Service { + connection_broker, + queue_broker, + pool, + site_repository: repositories::site::repository::Repository::new(), + site_composition: compositions::site::composition::Composition::new(), + member_balance_repository: repositories::member_balance::repository::Repository::new(), + argon2_config: argon2::Config::default(), + password_salt, + } + } + + pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { + futures::try_join!( + self.create_member_balance(), + self.list_member_balances(), + self.get_member_balance(), + self.update_member_balance(), + self.delete_member_balance(), + ) + .map(|_| ()) + } + fn check_site( + &self, + conn: &diesel::PgConnection, + url: Option, + site_id: uuid::Uuid, + ) -> Result { + match self + .site_composition + .select_by_url(conn, url, site_id) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })? { + Some(s) => Ok(s), + None => Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid site_url information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client.site_url".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::None, + message: "".to_string(), + }, + }, + )), + } + } + + async fn create_member_balance(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member_balance::SUBJECT_CREATE_MEMBER_BALANCE, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let req = + bpr::ss::member_balance::CreateMemberBalanceRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match req.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let request = match req.request { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid request information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "request".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let member_id = uuid::Uuid::from_str(request.member_id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid member_id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "member_id".to_string(), + value: request.member_id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let s = self + .member_balance_repository + .insert( + &conn, + &repositories::member_balance::models::NewMemberBalance { member_id }, + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + message + .respond( + bpr::ss::member_balance::CreateMemberBalanceResponse { + error: None, + result: Some( + bpr::ss::member_balance::create_member_balance_response::Result { + member_balance: Some(bpr::models::member_balance::MemberBalance::from(&s)), + }, + ), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::ss::member_balance::CreateMemberBalanceResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } + + async fn list_member_balances(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member_balance::SUBJECT_LIST_MEMBER_BALANCES, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let req = + bpr::ss::member_balance::ListMemberBalancesRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match req.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let request = match req.request { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid request information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "request".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let search = match request.search { + Some(s) => { + let member_id = match s.member_id { + Some(d) => match uuid::Uuid::from_str(d.as_str()) { + Ok(dd) => Some(dd), + Err(e) => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid member_id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "member_id".to_string(), + value: d.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }, + )); + } + }, + None => None, + }; + + Some(repositories::member_balance::models::FindAllSearch { member_id }) + } + None => None, + }; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let find_all = repositories::member_balance::models::FindAll { + search, + pagination: request + .pagination + .as_ref() + .map(bcr::pagination::Pagination::from), + sorts: Some( + request + .sorts + .iter() + .map(bcr::pagination::Sort::from) + .collect(), + ), + }; + + let total_count = self + .member_balance_repository + .select_all_count(&conn, &find_all) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let list = self + .member_balance_repository + .select_all(&conn, &find_all) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + message + .respond( + bpr::ss::member_balance::ListMemberBalancesResponse { + error: None, + result: Some( + bpr::ss::member_balance::list_member_balances_response::Result { + total_count: total_count as u64, + member_balances: list + .iter() + .map(|d| bpr::models::member_balance::MemberBalance::from(d)) + .collect(), + }, + ), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::ss::member_balance::ListMemberBalancesResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } + + async fn get_member_balance(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member_balance::SUBJECT_GET_MEMBER_BALANCE, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let req = bpr::ss::member_balance::GetMemberBalanceRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match req.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let request = match req.request { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid request information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "request".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let id = uuid::Uuid::from_str(request.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "id".to_string(), + value: request.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let s = self + .member_balance_repository + .select(&conn, id) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + message + .respond( + bpr::ss::member_balance::GetMemberBalanceResponse { + error: None, + result: Some( + bpr::ss::member_balance::get_member_balance_response::Result { + member_balance: s.map(|d| bpr::models::member_balance::MemberBalance::from(&d)), + }, + ), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::ss::member_balance::GetMemberBalanceResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } + + async fn update_member_balance(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member_balance::SUBJECT_UPDATE_MEMBER_BALANCE, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let req = + bpr::ss::member_balance::UpdateMemberBalanceRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match req.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let request = match req.request { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid request information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "request".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let id = uuid::Uuid::from_str(request.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "id".to_string(), + value: request.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let _affected = self + .member_balance_repository + .update( + &conn, + id, + &repositories::member_balance::models::ModifyMemberBalance { + balance: request.balance, + balance_bota: request.balance_bota, + balance_sum: request.balance_sum, + }, + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let s = self + .member_balance_repository + .select(&conn, id) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + message + .respond( + bpr::ss::member_balance::UpdateMemberBalanceResponse { + error: None, + result: Some( + bpr::ss::member_balance::update_member_balance_response::Result { + member_balance: s.map(|d| bpr::models::member_balance::MemberBalance::from(&d)), + }, + ), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::ss::member_balance::UpdateMemberBalanceResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } + + async fn delete_member_balance(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member_balance::SUBJECT_DELETE_MEMBER_BALANCE, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let req = + bpr::ss::member_balance::DeleteMemberBalanceRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match req.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let request = match req.request { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid request information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "request".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let id = uuid::Uuid::from_str(request.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "id".to_string(), + value: request.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let _affected = self + .member_balance_repository + .delete(&conn, id) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + message + .respond( + bpr::ss::member_balance::DeleteMemberBalanceResponse { + error: None, + result: Some(bpr::ss::member_balance::delete_member_balance_response::Result {}), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::ss::member_balance::DeleteMemberBalanceResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } +} diff --git a/src/services/mod.rs b/src/services/mod.rs index 09f3bfb..0315ed6 100644 --- a/src/services/mod.rs +++ b/src/services/mod.rs @@ -1,6 +1,7 @@ pub mod bank; pub mod identity; pub mod member; +pub mod member_balance; pub mod member_bank_account; pub mod member_bank_deposit; pub mod member_bank_withdraw;