use crate::api; use crate::core; use crate::repositories; use beteran_common_rust as bcr; use beteran_protobuf_rust as bpr; use diesel::{ r2d2::{ConnectionManager, Pool}, PgConnection, }; use prost::Message; use std::str::FromStr; static MEMBER_CLASS_ID_MAIN_OFFICE: &str = "4b014ef5-3bab-4413-aaf9-b0040a70ec77"; static MEMBER_CLASS_ID_BRANCH: &str = "ae9b874e-5d0e-4c4d-8432-f45f02691ceb"; static MEMBER_CLASS_ID_DIVISION: &str = "f25a17e9-5c5f-4e9c-bf80-92a9cedf829c"; static MEMBER_CLASS_ID_OFFICE: &str = "cac7b897-2549-4f04-8415-8868f1dcb1da"; static MEMBER_CLASS_ID_STORE: &str = "e11cac11-3825-4f4e-9cd5-39367f23f973"; static MEMBER_CLASS_ID_USER: &str = "4598f07a-86d1-42a4-b038-25706683a7cd"; pub struct EventHandler { connection_broker: nats::asynk::Connection, queue_broker: String, pool: Pool>, api_config: core::config::ApiConfig, member_repository: repositories::member::repository::Repository, member_api: api::member::api::Api, } impl std::fmt::Debug for EventHandler { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("EventHandler of members").finish() } } impl EventHandler { /// pub fn new( connection_broker: nats::asynk::Connection, queue_broker: String, pool: Pool>, api_config: core::config::ApiConfig, ) -> EventHandler { EventHandler { connection_broker, queue_broker, pool, api_config: api_config.clone(), member_repository: repositories::member::repository::Repository::new(), member_api: api::member::api::Api::new(api_config.clone()), } } pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { futures::try_join!(self.event_after_create_member()).map(|_| ()) } async fn event_after_create_member(&self) -> Result<(), Box> { let s = self .connection_broker .queue_subscribe( bpr::ss::member::EVENT_SUBJECT_AFTER_CREATE_MEMBER, self.queue_broker.as_str(), ) .await?; while let Some(message) = s.next().await { if let Err(e) = async { let eve = bpr::ss::member::AfterCreateMemberEvent::decode(message.data.as_slice()) .map_err(|e| { bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { message: format!("invalid request: {}", e), }) })?; let client = match eve.client { Some(c) => c, None => { return Err(bcr::error::rpc::Error::InvalidParams( bcr::error::rpc::InvalidParams { message: "invalid client information".to_string(), detail: bcr::error::rpc::InvalidParamsDetail { location: "request".to_string(), param: "client".to_string(), value: "".to_string(), error_type: bcr::error::rpc::InvalidParamsType::Required, message: "".to_string(), }, }, )); } }; let event = match eve.event { Some(r) => r, None => { return Err(bcr::error::rpc::Error::InvalidParams( bcr::error::rpc::InvalidParams { message: "invalid event information".to_string(), detail: bcr::error::rpc::InvalidParamsDetail { location: "event".to_string(), param: "event".to_string(), value: "".to_string(), error_type: bcr::error::rpc::InvalidParamsType::Required, message: "".to_string(), }, }, )); } }; let member = match event.member { Some(c) => c, None => { return Err(bcr::error::rpc::Error::InvalidParams( bcr::error::rpc::InvalidParams { message: "invalid client information".to_string(), detail: bcr::error::rpc::InvalidParamsDetail { location: "request".to_string(), param: "client".to_string(), value: "".to_string(), error_type: bcr::error::rpc::InvalidParamsType::Required, message: "".to_string(), }, }, )); } }; let member_id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| { bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams { message: "invalid member.id param".to_string(), detail: bcr::error::rpc::InvalidParamsDetail { location: "request".to_string(), param: "member.id".to_string(), value: member.id.clone(), error_type: bcr::error::rpc::InvalidParamsType::Required, message: e.to_string(), }, }) })?; let conn = self.pool.get().map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; if let Some(mc) = member.member_class { if mc.id.eq(MEMBER_CLASS_ID_USER) { let api_create_res = self .member_api .create_member(api::member::models::CreateMemberRequest { username: member.username.clone(), nickname: member.nickname.clone(), site_username: member.username.clone(), group_key: match member.parent_member { Some(m) => Some(m.username), None => None, }, }) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {:?}", e), data: None, }) })?; self .member_repository .insert( &conn, &repositories::member::models::NewMember { id: api_create_res.id, member_id, }, ) .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; } } Ok::<(), bcr::error::rpc::Error>(()) } .await { println!("error: {}", e); } } Ok(()) } }