From 61bab7ef6a675b115c953c82c263d72813d3c83d Mon Sep 17 00:00:00 2001 From: PARK BYUNG JUN Date: Fri, 19 Aug 2022 03:20:45 +0000 Subject: [PATCH] event handler of member is added --- .../202208051200_api_kgon_member/up.sql | 2 +- src/api/member/api.rs | 42 +++- src/api/member/models.rs | 9 +- src/events/member/event.rs | 188 ++++++++++++++++++ src/events/member/mod.rs | 1 + src/events/mod.rs | 1 + src/main.rs | 45 +++-- src/repositories/member/models.rs | 10 - 8 files changed, 258 insertions(+), 40 deletions(-) create mode 100644 src/events/member/event.rs create mode 100644 src/events/member/mod.rs create mode 100644 src/events/mod.rs diff --git a/migrations/202208051200_api_kgon_member/up.sql b/migrations/202208051200_api_kgon_member/up.sql index ad97f37..da46f27 100644 --- a/migrations/202208051200_api_kgon_member/up.sql +++ b/migrations/202208051200_api_kgon_member/up.sql @@ -4,7 +4,7 @@ CREATE TABLE IF NOT EXISTS api_kgon_members ( balance_bota BIGINT NOT NULL DEFAULT 0, balance_sum BIGINT NOT NULL DEFAULT 0, companies BIGINT NOT NULL DEFAULT 0, - oriental_play CHAR(1) NOT NULL, + oriental_play CHAR(1) NOT NULL DEFAULT 'n', member_id UUID NOT NULL, created_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000), updated_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000), diff --git a/src/api/member/api.rs b/src/api/member/api.rs index 3d7a905..1d9634c 100644 --- a/src/api/member/api.rs +++ b/src/api/member/api.rs @@ -83,10 +83,10 @@ impl Api { } /// - pub async fn save_member( + pub async fn create_member( &self, - data: models::SaveMemberRequest, - ) -> Result { + data: models::CreateMemberRequest, + ) -> Result { let mut params = HashMap::new(); params.insert("username", data.username); @@ -112,17 +112,41 @@ impl Api { .await { Ok(res) => res, - Err(err) => { - return Err(Error { code: 0, msg: None }); + Err(e) => { + return Err(Error { + code: -1, + msg: Some(e.to_string()), + }); } }; match res.status() { - reqwest::StatusCode::OK => match res.json::().await { - Ok(r) => Ok(r), - Err(e) => Err(Error { code: 0, msg: None }), + reqwest::StatusCode::OK => match res.json::().await { + Ok(r) => { + if r.code != 0 { + return Err(Error { + code: r.code, + msg: r.msg, + }); + } + + match r.id { + Some(id) => Ok(models::CreateMemberResponse { id }), + None => Err(Error { + code: -1, + msg: Some("id is not exist in response of api".to_string()), + }), + } + } + Err(e) => Err(Error { + code: -1, + msg: Some(e.to_string()), + }), }, - _ => Err(Error { code: 0, msg: None }), + _ => Err(Error { + code: -1, + msg: None, + }), } } } diff --git a/src/api/member/models.rs b/src/api/member/models.rs index 5deb137..908fb76 100644 --- a/src/api/member/models.rs +++ b/src/api/member/models.rs @@ -30,7 +30,7 @@ pub struct ListMembersResponse { } #[derive(Serialize, Deserialize, Debug)] -pub struct SaveMemberRequest { +pub struct CreateMemberRequest { pub username: String, pub nickname: String, pub site_username: String, @@ -38,8 +38,13 @@ pub struct SaveMemberRequest { } #[derive(Serialize, Deserialize, Debug)] -pub struct SaveMemberResponse { +pub struct _CreateMemberResponse { pub code: i64, pub msg: Option, pub id: Option, } + +#[derive(Serialize, Deserialize, Debug)] +pub struct CreateMemberResponse { + pub id: i64, +} diff --git a/src/events/member/event.rs b/src/events/member/event.rs new file mode 100644 index 0000000..1816739 --- /dev/null +++ b/src/events/member/event.rs @@ -0,0 +1,188 @@ +use crate::api; +use crate::core; +use crate::repositories; +use beteran_common_rust as bcr; +use beteran_protobuf_rust as bpr; +use diesel::{ + r2d2::{ConnectionManager, Pool}, + PgConnection, +}; +use prost::Message; +use std::str::FromStr; + +pub struct EventHandler { + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + api_config: core::config::ApiConfig, + member_repository: repositories::member::repository::Repository, + member_api: api::member::api::Api, +} + +impl std::fmt::Debug for EventHandler { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("EventHandler of members").finish() + } +} + +impl EventHandler { + /// + pub fn new( + connection_broker: nats::asynk::Connection, + queue_broker: String, + pool: Pool>, + api_config: core::config::ApiConfig, + ) -> EventHandler { + EventHandler { + connection_broker, + queue_broker, + pool, + api_config: api_config.clone(), + member_repository: repositories::member::repository::Repository::new(), + member_api: api::member::api::Api::new(api_config.clone()), + } + } + + pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { + futures::try_join!(self.event_after_create_member()).map(|_| ()) + } + + async fn event_after_create_member(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::ss::member::EVENT_SUBJECT_AFTER_CREATE_MEMBER, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let eve = bpr::ss::member::AfterCreateMemberEvent::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let client = match eve.client { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let event = match eve.event { + Some(r) => r, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid event information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "event".to_string(), + param: "event".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + let member = match event.member { + Some(c) => c, + None => { + return Err(bcr::error::rpc::Error::InvalidParams( + bcr::error::rpc::InvalidParams { + message: "invalid client information".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "client".to_string(), + value: "".to_string(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: "".to_string(), + }, + }, + )); + } + }; + + let member_id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| { + bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { + message: "invalid member.id param".to_string(), + detail: bcr::error::rpc::InvalidParamsDetail { + location: "request".to_string(), + param: "member.id".to_string(), + value: member.id.clone(), + error_type: bcr::error::rpc::InvalidParamsType::Required, + message: e.to_string(), + }, + }) + })?; + + let conn = self.pool.get().map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let api_create_res = self + .member_api + .create_member(api::member::models::CreateMemberRequest { + username: member.username.clone(), + nickname: member.nickname.clone(), + site_username: member.username.clone(), + group_key: match member.referrer_member { + Some(m) => Some(m.username), + None => None, + }, + }) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {:?}", e), + data: None, + }) + })?; + + self + .member_repository + .insert( + &conn, + &repositories::member::models::NewMember { + id: api_create_res.id, + member_id, + }, + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + println!("error: {}", e); + } + } + + Ok(()) + } +} diff --git a/src/events/member/mod.rs b/src/events/member/mod.rs new file mode 100644 index 0000000..53f1126 --- /dev/null +++ b/src/events/member/mod.rs @@ -0,0 +1 @@ +pub mod event; diff --git a/src/events/mod.rs b/src/events/mod.rs new file mode 100644 index 0000000..0dd2a03 --- /dev/null +++ b/src/events/mod.rs @@ -0,0 +1 @@ +pub mod member; diff --git a/src/main.rs b/src/main.rs index ffe152b..439d04d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use std::env; mod api; mod compositions; mod core; +mod events; mod repositories; mod schedulers; mod services; @@ -67,24 +68,6 @@ async fn main() -> Result<(), Box> { let server_broker_opts = nats::asynk::Options::new(); let connection_server_broker = server_broker_opts.connect(url_server_broker).await?; - let member_service = services::member::service::Service::new( - connection_server_broker.clone(), - queue_server_broker.clone(), - ); - let member_account_service = services::member_account::service::Service::new( - connection_server_broker.clone(), - queue_server_broker.clone(), - ); - let game_service = services::game::service::Service::new( - connection_server_broker.clone(), - queue_server_broker.clone(), - pool.clone(), - ); - let betting_service = services::betting::service::Service::new( - connection_server_broker.clone(), - queue_server_broker.clone(), - ); - let mut sched = tokio_cron_scheduler::JobScheduler::new().await?; let member_scheduler = schedulers::member::scheduler::Scheduler::get_instance( @@ -108,6 +91,31 @@ async fn main() -> Result<(), Box> { let _h_scheduler = sched.start().await?; + let member_service = services::member::service::Service::new( + connection_server_broker.clone(), + queue_server_broker.clone(), + ); + let member_account_service = services::member_account::service::Service::new( + connection_server_broker.clone(), + queue_server_broker.clone(), + ); + let game_service = services::game::service::Service::new( + connection_server_broker.clone(), + queue_server_broker.clone(), + pool.clone(), + ); + let betting_service = services::betting::service::Service::new( + connection_server_broker.clone(), + queue_server_broker.clone(), + ); + + let member_event_handler = events::member::event::EventHandler::new( + connection_server_broker.clone(), + queue_server_broker.clone(), + pool.clone(), + api_config.clone(), + ); + println!("Server service [beteran-api-kgon-server-service] is started"); futures::try_join!( @@ -115,6 +123,7 @@ async fn main() -> Result<(), Box> { // member_account_service.subscribe(), game_service.subscribe(), // betting_service.subscribe(), + member_event_handler.subscribe(), )?; sched.shutdown().await?; diff --git a/src/repositories/member/models.rs b/src/repositories/member/models.rs index 947bdd9..83c42a5 100644 --- a/src/repositories/member/models.rs +++ b/src/repositories/member/models.rs @@ -32,16 +32,6 @@ pub struct NewMember { /// pub id: i64, /// - pub balance: i64, - /// - pub balance_bota: i64, - /// - pub balance_sum: i64, - /// - pub companies: i64, - /// - pub oriental_play: String, - /// pub member_id: uuid::Uuid, }