From 7a46f34f1d40046c6fad4c4e0b6747350dca7fb9 Mon Sep 17 00:00:00 2001 From: PARK BYUNG JUN Date: Thu, 11 Aug 2022 06:25:25 +0000 Subject: [PATCH] event is added --- Cargo.toml | 4 +- src/compositions/member/composition.rs | 94 ++++++++ src/services/identity/service.rs | 19 ++ src/services/member/service.rs | 318 ++++++++++++++++++++++++- 4 files changed, 429 insertions(+), 6 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ec80925..c59a744 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,7 +30,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-cron-scheduler = { version = "0" } uuid = { version = "0", features = ["serde", "v4", "v5"] } -beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.32-snapshot" } -beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.15-snapshot" } +beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.33-snapshot" } +beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.16-snapshot" } [build-dependencies] diff --git a/src/compositions/member/composition.rs b/src/compositions/member/composition.rs index 83bc0fc..8b08e1a 100644 --- a/src/compositions/member/composition.rs +++ b/src/compositions/member/composition.rs @@ -119,6 +119,100 @@ impl Composition { } } + /// + pub fn select_by_username( + &self, + conn: &diesel::PgConnection, + username: &str, + ) -> Result, Error> { + match sql_query( + " + SELECT + m.id as m_id, + m.site_id as m_site_id, + m.member_class_id as m_member_class_id, + m.member_level_id as m_member_level_id, + m.username as m_username, + m.password as m_password, + m.nickname as m_nickname, + m.mobile_phone_number as m_mobile_phone_number, + m.state as m_state, + m.state_changed_at as m_state_changed_at, + m.referrer_member_id as m_referrer_member_id, + m.referred_count as m_referred_count, + m.last_signined_ip as m_last_signined_ip, + m.last_signined_at as m_last_signined_at, + m.created_at as m_created_at, + m.updated_at as m_updated_at, + m.deleted_at as m_deleted_at, + + s.id as s_id, + s.url as s_url, + s.name as s_name, + s.path as s_path, + s.show as s_show, + s.can_use as s_can_use, + s.memo as s_memo, + s.expires_at as s_expires_at, + s.created_at as s_created_at, + s.updated_at as s_updated_at, + + mc.id as mc_id, + mc.parent_id as mc_parent_id, + mc.name as mc_name, + mc.created_at as mc_created_at, + mc.updated_at as mc_updated_at, + mc.deleted_at as mc_deleted_at, + + ml.id as ml_id, + ml.name as ml_name, + ml.sort_order as ml_sort_order, + ml.created_at as ml_created_at, + ml.updated_at as ml_updated_at, + ml.deleted_at as ml_deleted_at, + + _m.id as _m_id, + _m.site_id as _m_site_id, + _m.member_class_id as _m_member_class_id, + _m.member_level_id as _m_member_level_id, + _m.username as _m_username, + _m.password as _m_password, + _m.nickname as _m_nickname, + _m.mobile_phone_number as _m_mobile_phone_number, + _m.state as _m_state, + _m.state_changed_at as _m_state_changed_at, + _m.referrer_member_id as _m_referrer_member_id, + _m.referred_count as _m_referred_count, + _m.last_signined_ip as _m_last_signined_ip, + _m.last_signined_at as _m_last_signined_at, + _m.created_at as _m_created_at, + _m.updated_at as _m_updated_at, + _m.deleted_at as _m_deleted_at + + FROM members as m + JOIN sites s + ON s.id = m.site_id + JOIN member_classes mc + ON mc.id = m.member_class_id + JOIN member_levels ml + ON ml.id = m.member_level_id + JOIN members _m + ON _m.id = m.referrer_member_id + WHERE + m.username = $1 + ", + ) + .bind::(username) + .get_result::(conn) + { + Ok(m) => Ok(Some(m)), + Err(e) => match e { + diesel::result::Error::NotFound => Ok(None), + _ => Err(e), + }, + } + } + /// pub fn select_all( &self, diff --git a/src/services/identity/service.rs b/src/services/identity/service.rs index 4c6836e..85161b5 100644 --- a/src/services/identity/service.rs +++ b/src/services/identity/service.rs @@ -560,6 +560,25 @@ impl Service<'_> { }) })?; + self + .connection_broker + .publish( + bpr::ss::member::identity::EVENT_SUBJECT_AFTER_SIGNIN, + bpr::ss::member::identity::AfterSigninEvent { + client: Some(client), + member: Some(bpr::models::member::Member::from(&m)), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + message .respond( bpr::ss::member::identity::SigninResponse { diff --git a/src/services/member/service.rs b/src/services/member/service.rs index 22fa6db..17f0606 100644 --- a/src/services/member/service.rs +++ b/src/services/member/service.rs @@ -1,6 +1,8 @@ //! //! +use std::str::FromStr; + use super::super::super::compositions; use super::super::super::repositories; use beteran_common_rust as bcr; @@ -48,7 +50,13 @@ impl Service { } pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { - futures::try_join!(self.list_members(),).map(|_| ()) + futures::try_join!( + self.list_members(), + self.get_member(), + self.get_member_by_username(), + self.event_after_signin() + ) + .map(|_| ()) } fn check_site( @@ -141,12 +149,12 @@ impl Service { pagination: req .pagination .as_ref() - .map(|d| bcr::models::pagination::Pagination::from(d)), + .map(bcr::models::pagination::Pagination::from), sorts: Some( req .sorts .iter() - .map(|d| beteran_common_rust::models::pagination::Sort::from(d)) + .map(beteran_common_rust::models::pagination::Sort::from) .collect(), ), }; @@ -180,7 +188,7 @@ impl Service { result: Some(bpr::ss::member::member::list_members_response::Result { members: member_list .iter() - .map(|d| bpr::models::member::MemberModel::from(d)) + .map(bpr::models::member::MemberModel::from) .collect(), }), } @@ -213,4 +221,306 @@ impl Service { Ok(()) } + + async fn get_member(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member::member::SUBJECT_GET_MEMBER, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let req = bpr::ss::member::member::GetMemberRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let id = uuid::Uuid::from_str(req.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "id".to_string(), + value: req.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let client = match req.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let m = self.member_composition.select(&conn, id).map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + message + .respond( + bpr::ss::member::member::GetMemberResponse { + error: None, + result: Some(bpr::ss::member::member::get_member_response::Result { + member: m.map(|d| bpr::models::member::MemberModel::from(&d)), + }), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::ss::member::member::GetMemberResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } + + async fn get_member_by_username(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member::member::SUBJECT_GET_MEMBER_BY_USERNAME, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let req = + bpr::ss::member::member::GetMemberByUsernameRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match req.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let m = self + .member_composition + .select_by_username(&conn, req.username.as_str()) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + message + .respond( + bpr::ss::member::member::GetMemberByUsernameResponse { + error: None, + result: Some( + bpr::ss::member::member::get_member_by_username_response::Result { + member: m.map(|d| bpr::models::member::MemberModel::from(&d)), + }, + ), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::ss::member::member::GetMemberByUsernameResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } + + async fn event_after_signin(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member::identity::EVENT_SUBJECT_AFTER_SIGNIN, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let event = bpr::ss::member::identity::AfterSigninEvent::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match event.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let member = match event.member { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid member.id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "member.id".to_string(), + value: member.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + self + .member_repository + .update_last_signined_ip( + &conn, + id, + &repositories::member::models::ModifyMember4LastSignined { + last_signined_ip: client.client_ip, + }, + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + println!("error: {}", e); + } + } + + Ok(()) + } }