diff --git a/Cargo.toml b/Cargo.toml index 86810d2..be7e85c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] } tokio-cron-scheduler = { version = "0" } uuid = { version = "0", features = ["serde", "v4", "v5"] } -beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.60-snapshot" } -beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.44-snapshot" } +beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.61-snapshot" } +beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.45-snapshot" } [build-dependencies] diff --git a/src/events/member/event.rs b/src/events/member/event.rs new file mode 100644 index 0000000..3e5e93f --- /dev/null +++ b/src/events/member/event.rs @@ -0,0 +1,164 @@ +use super::super::super::repositories; +use beteran_common_rust as bcr; +use beteran_protobuf_rust as bpr; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; +use prost::Message; +use std::str::FromStr; + +pub struct EventHandler { + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + member_repository: repositories::member::repository::Repository, +} + +impl std::fmt::Debug for EventHandler { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("EventHandler of members").finish() + } +} + +impl EventHandler { + /// + pub fn new( + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + ) -> EventHandler { + EventHandler { + connection_broker, + queue_broker, + pool, + member_repository: repositories::member::repository::Repository::new(), + } + } + + pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { + futures::try_join!(self.event_after_signin()).map(|_| ()) + } + + async fn event_after_signin(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::identity::EVENT_SUBJECT_AFTER_SIGNIN, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let eve = + bpr::ss::identity::AfterSigninEvent::decode(message.data.as_slice()).map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match eve.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let event = match eve.event { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "event".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let member = match event.member { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid member.id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "member.id".to_string(), + value: member.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let last_signined_at = chrono::Utc::now().timestamp(); + + self + .member_repository + .update_last_signined_ip( + &conn, + id, + &repositories::member::models::ModifyMember4LastSignined { + last_signined_ip: client.client_ip, + last_signined_at, + }, + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + println!("error: {}", e); + } + } + + Ok(()) + } +} diff --git a/src/events/member/mod.rs b/src/events/member/mod.rs new file mode 100644 index 0000000..53f1126 --- /dev/null +++ b/src/events/member/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/src/events/mod.rs b/src/events/mod.rs new file mode 100644 index 0000000..0dd2a03 --- /dev/null +++ b/src/events/mod.rs @@ -0,0 +1 @@ +pub mod member; diff --git a/src/main.rs b/src/main.rs index 37bd224..21621c7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use diesel::{ use std::env; mod compositions; +mod events; mod repositories; mod services; @@ -110,6 +111,12 @@ async fn main() -> Result<(), Box> { pool.clone(), ); + let member_event_handler = events::member::event::EventHandler::new( + connection_server_broker.clone(), + queue_server_broker.clone(), + pool.clone(), + ); + println!("Server service [beteran-server-service] is started"); futures::try_join!( @@ -122,6 +129,7 @@ async fn main() -> Result<(), Box> { member_class_service.subscribe(), member_level_service.subscribe(), site_service.subscribe(), + member_event_handler.subscribe(), )?; Ok(()) diff --git a/src/services/member/service.rs b/src/services/member/service.rs index baf4c78..a5a32b2 100644 --- a/src/services/member/service.rs +++ b/src/services/member/service.rs @@ -1,8 +1,6 @@ //! //! -use std::str::FromStr; - use super::super::super::compositions; use super::super::super::repositories; use beteran_common_rust as bcr; @@ -12,6 +10,7 @@ use diesel::{ PgConnection, }; use prost::Message; +use std::str::FromStr; /// pub struct Service<'a> { @@ -66,7 +65,6 @@ impl Service<'_> { self.update_member(), self.update_member_for_state(), self.delete_member(), - self.event_after_signin() ) .map(|_| ()) } @@ -1397,126 +1395,4 @@ impl Service<'_> { Ok(()) } - - async fn event_after_signin(&self) -> Result<(), Box> { - let s = self - .connection_broker - .queue_subscribe( - bpr::ss::identity::EVENT_SUBJECT_AFTER_SIGNIN, - self.queue_broker.as_str(), - ) - .await?; - - while let Some(message) = s.next().await { - if let Err(e) = async { - let eve = - bpr::ss::identity::AfterSigninEvent::decode(message.data.as_slice()).map_err(|e| { - bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { - message: format!("invalid request: {}", e), - }) - })?; - - let client = match eve.client { - Some(c) => c, - None => { - return Err(bcr::error::rpc::Error::InvalidParams( - bcr::error::rpc::InvalidParams { - message: "invalid client information".to_string(), - detail: bcr::error::rpc::InvalidParamsDetail { - location: "request".to_string(), - param: "client".to_string(), - value: "".to_string(), - error_type: bcr::error::rpc::InvalidParamsType::Required, - message: "".to_string(), - }, - }, - )); - } - }; - let event = match eve.event { - Some(r) => r, - None => { - return Err(bcr::error::rpc::Error::InvalidParams( - bcr::error::rpc::InvalidParams { - message: "invalid event information".to_string(), - detail: bcr::error::rpc::InvalidParamsDetail { - location: "event".to_string(), - param: "event".to_string(), - value: "".to_string(), - error_type: bcr::error::rpc::InvalidParamsType::Required, - message: "".to_string(), - }, - }, - )); - } - }; - let member = match event.member { - Some(c) => c, - None => { - return Err(bcr::error::rpc::Error::InvalidParams( - bcr::error::rpc::InvalidParams { - message: "invalid client information".to_string(), - detail: bcr::error::rpc::InvalidParamsDetail { - location: "request".to_string(), - param: "client".to_string(), - value: "".to_string(), - error_type: bcr::error::rpc::InvalidParamsType::Required, - message: "".to_string(), - }, - }, - )); - } - }; - - let id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| { - bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { - message: "invalid member.id param".to_string(), - detail: bcr::error::rpc::InvalidParamsDetail { - location: "request".to_string(), - param: "member.id".to_string(), - value: member.id.clone(), - error_type: bcr::error::rpc::InvalidParamsType::Required, - message: e.to_string(), - }, - }) - })?; - - let conn = self.pool.get().map_err(|e| { - bcr::error::rpc::Error::Server(bcr::error::rpc::Server { - code: bpr::protobuf::rpc::Error::SERVER_00, - message: format!("server {}", e), - data: None, - }) - })?; - - let last_signined_at = chrono::Utc::now().timestamp(); - - self - .member_repository - .update_last_signined_ip( - &conn, - id, - &repositories::member::models::ModifyMember4LastSignined { - last_signined_ip: client.client_ip, - last_signined_at, - }, - ) - .map_err(|e| { - bcr::error::rpc::Error::Server(bcr::error::rpc::Server { - code: bpr::protobuf::rpc::Error::SERVER_00, - message: format!("server {}", e), - data: None, - }) - })?; - - Ok::<(), bcr::error::rpc::Error>(()) - } - .await - { - println!("error: {}", e); - } - } - - Ok(()) - } }