use beteran_common_rust as bcr; use beteran_protobuf_rust as bpr; use prost::Message; /// pub struct Service { connection_broker: nats::asynk::Connection, queue_broker: String, } impl std::fmt::Debug for Service { fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { f.debug_struct("beteran-backend-server-edge::member_class::service::Service") .finish() } } impl Service { /// pub fn new(connection_broker: nats::asynk::Connection, queue_broker: String) -> Service { Service { connection_broker, queue_broker, } } pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { futures::try_join!( self.create_member_class(), self.list_member_classes(), self.get_member_class(), self.update_member_class(), self.delete_member_class(), ) .map(|_| ()) } fn get_client_in_header( &self, message: &nats::asynk::Message, ) -> Result { match &message.headers { Some(headers) => { let client = match headers.get(bpr::c2se::core::network::HEADER_CLIENT) { Some(c) => { let msg = base64::decode(c).map_err(|e| { bcr::error::rpc::Error::Parse(bcr::error::rpc::Parse { message: format!("invalid header: {}", e), }) })?; bpr::models::core::network::Client::decode(msg.as_slice()).map_err(|e| { bcr::error::rpc::Error::Parse(bcr::error::rpc::Parse { message: format!("invalid header: {}", e), }) })? } None => { return Err(bcr::error::rpc::Error::Parse(bcr::error::rpc::Parse { message: "invalid client information".to_string(), })); } }; Ok(client) } None => Err(bcr::error::rpc::Error::Parse(bcr::error::rpc::Parse { message: "invalid header".to_string(), })), } } async fn create_member_class(&self) -> Result<(), Box> { let s = self .connection_broker .queue_subscribe( bpr::c2se::backend::member_class::SUBJECT_CREATE_MEMBER_CLASS, self.queue_broker.as_str(), ) .await?; while let Some(message) = s.next().await { if let Err(e) = async { let client = self.get_client_in_header(&message)?; let req = bpr::c2se::member_class::CreateMemberClassRequest::decode(message.data.as_slice()) .map_err(|e| { bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { message: format!("invalid request: {}", e), }) })?; let ss_create_member_class_req = bpr::ss::member_class::CreateMemberClassRequest { client: Some(client), request: Some( bpr::ss::member_class::create_member_class_request::Request { parent_id: req.parent_id, name: req.name, show: req.show, }, ), }; let ss_create_member_class_res_msg = self .connection_broker .request( bpr::ss::member_class::SUBJECT_CREATE_MEMBER_CLASS, ss_create_member_class_req.encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; let ss_create_member_class_res = bpr::ss::member_class::CreateMemberClassResponse::decode( ss_create_member_class_res_msg.data.as_slice(), ) .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; if let Some(e) = ss_create_member_class_res.error { return Err(bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, })); } if let Some(r) = ss_create_member_class_res.result { message .respond( bpr::c2se::member_class::CreateMemberClassResponse { error: None, result: Some( bpr::c2se::member_class::create_member_class_response::Result { member_class: r.member_class, }, ), } .encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; } Ok::<(), bcr::error::rpc::Error>(()) } .await { message .respond( bpr::c2se::member_class::CreateMemberClassResponse { error: Some(bpr::protobuf::rpc::Error::from(e)), result: None, } .encode_to_vec(), ) .await?; } } Ok(()) } async fn list_member_classes(&self) -> Result<(), Box> { let s = self .connection_broker .queue_subscribe( bpr::c2se::backend::member_class::SUBJECT_LIST_MEMBER_CLASSES, self.queue_broker.as_str(), ) .await?; while let Some(message) = s.next().await { if let Err(e) = async { let client = self.get_client_in_header(&message)?; let req = bpr::c2se::member_class::ListMemberClassesRequest::decode(message.data.as_slice()) .map_err(|e| { bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { message: format!("invalid request: {}", e), }) })?; let ss_list_member_classes_req = bpr::ss::member_class::ListMemberClassesRequest { client: Some(client), request: Some( bpr::ss::member_class::list_member_classes_request::Request { pagination: req.pagination, sorts: req.sorts, search: match req.search { Some(s) => Some( bpr::ss::member_class::list_member_classes_request::request::Search { name_like: s.name_like, show: s.show, }, ), None => None, }, }, ), }; let ss_list_member_classes_res_msg = self .connection_broker .request( bpr::ss::member_class::SUBJECT_LIST_MEMBER_CLASSES, ss_list_member_classes_req.encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; let ss_list_member_classes_res = bpr::ss::member_class::ListMemberClassesResponse::decode( ss_list_member_classes_res_msg.data.as_slice(), ) .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; if let Some(e) = ss_list_member_classes_res.error { return Err(bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, })); } if let Some(r) = ss_list_member_classes_res.result { message .respond( bpr::c2se::member_class::ListMemberClassesResponse { error: None, result: Some( bpr::c2se::member_class::list_member_classes_response::Result { total_count: r.total_count, member_classes: r.member_classes, }, ), } .encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; } Ok::<(), bcr::error::rpc::Error>(()) } .await { message .respond( bpr::c2se::member_class::ListMemberClassesResponse { error: Some(bpr::protobuf::rpc::Error::from(e)), result: None, } .encode_to_vec(), ) .await?; } } Ok(()) } async fn get_member_class(&self) -> Result<(), Box> { let s = self .connection_broker .queue_subscribe( bpr::c2se::backend::member_class::SUBJECT_GET_MEMBER_CLASS, self.queue_broker.as_str(), ) .await?; while let Some(message) = s.next().await { if let Err(e) = async { let client = self.get_client_in_header(&message)?; let req = bpr::c2se::member_class::GetMemberClassRequest::decode(message.data.as_slice()) .map_err(|e| { bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { message: format!("invalid request: {}", e), }) })?; let ss_get_member_class_req = bpr::ss::member_class::GetMemberClassRequest { client: Some(client), request: Some(bpr::ss::member_class::get_member_class_request::Request { id: req.id }), }; let ss_get_member_class_res_msg = self .connection_broker .request( bpr::ss::member_class::SUBJECT_GET_MEMBER_CLASS, ss_get_member_class_req.encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; let ss_get_member_class_res = bpr::ss::member_class::GetMemberClassResponse::decode( ss_get_member_class_res_msg.data.as_slice(), ) .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; if let Some(e) = ss_get_member_class_res.error { return Err(bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, })); } if let Some(r) = ss_get_member_class_res.result { message .respond( bpr::c2se::member_class::GetMemberClassResponse { error: None, result: Some(bpr::c2se::member_class::get_member_class_response::Result { member_class: r.member_class, }), } .encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; } Ok::<(), bcr::error::rpc::Error>(()) } .await { message .respond( bpr::c2se::member_class::GetMemberClassResponse { error: Some(bpr::protobuf::rpc::Error::from(e)), result: None, } .encode_to_vec(), ) .await?; } } Ok(()) } async fn update_member_class(&self) -> Result<(), Box> { let s = self .connection_broker .queue_subscribe( bpr::c2se::backend::member_class::SUBJECT_UPDATE_MEMBER_CLASS, self.queue_broker.as_str(), ) .await?; while let Some(message) = s.next().await { if let Err(e) = async { let client = self.get_client_in_header(&message)?; let req = bpr::c2se::member_class::UpdateMemberClassRequest::decode(message.data.as_slice()) .map_err(|e| { bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { message: format!("invalid request: {}", e), }) })?; let ss_update_member_class_req = bpr::ss::member_class::UpdateMemberClassRequest { client: Some(client), request: Some( bpr::ss::member_class::update_member_class_request::Request { id: req.id, parent_id: req.parent_id, name: req.name, show: req.show, }, ), }; let ss_update_member_class_res_msg = self .connection_broker .request( bpr::ss::member_class::SUBJECT_UPDATE_MEMBER_CLASS, ss_update_member_class_req.encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; let ss_update_member_class_res = bpr::ss::member_class::UpdateMemberClassResponse::decode( ss_update_member_class_res_msg.data.as_slice(), ) .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; if let Some(e) = ss_update_member_class_res.error { return Err(bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, })); } if let Some(r) = ss_update_member_class_res.result { message .respond( bpr::c2se::member_class::UpdateMemberClassResponse { error: None, result: Some( bpr::c2se::member_class::update_member_class_response::Result { member_class: r.member_class, }, ), } .encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; } Ok::<(), bcr::error::rpc::Error>(()) } .await { message .respond( bpr::c2se::member_class::UpdateMemberClassResponse { error: Some(bpr::protobuf::rpc::Error::from(e)), result: None, } .encode_to_vec(), ) .await?; } } Ok(()) } async fn delete_member_class(&self) -> Result<(), Box> { let s = self .connection_broker .queue_subscribe( bpr::c2se::backend::member_class::SUBJECT_DELETE_MEMBER_CLASS, self.queue_broker.as_str(), ) .await?; while let Some(message) = s.next().await { if let Err(e) = async { let client = self.get_client_in_header(&message)?; let req = bpr::c2se::member_class::DeleteMemberClassRequest::decode(message.data.as_slice()) .map_err(|e| { bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { message: format!("invalid request: {}", e), }) })?; let ss_delete_member_class_req = bpr::ss::member_class::DeleteMemberClassRequest { client: Some(client), request: Some(bpr::ss::member_class::delete_member_class_request::Request { id: req.id }), }; let ss_delete_member_class_res_msg = self .connection_broker .request( bpr::ss::member_class::SUBJECT_DELETE_MEMBER_CLASS, ss_delete_member_class_req.encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; let ss_delete_member_class_res = bpr::ss::member_class::DeleteMemberClassResponse::decode( ss_delete_member_class_res_msg.data.as_slice(), ) .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; if let Some(e) = ss_delete_member_class_res.error { return Err(bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, })); } if let Some(r) = ss_delete_member_class_res.result { message .respond( bpr::c2se::member_class::DeleteMemberClassResponse { error: None, result: Some(bpr::c2se::member_class::delete_member_class_response::Result {}), } .encode_to_vec(), ) .await .map_err(|e| { bcr::error::rpc::Error::Server(bcr::error::rpc::Server { code: bpr::protobuf::rpc::Error::SERVER_00, message: format!("server {}", e), data: None, }) })?; } Ok::<(), bcr::error::rpc::Error>(()) } .await { message .respond( bpr::c2se::member_class::DeleteMemberClassResponse { error: Some(bpr::protobuf::rpc::Error::from(e)), result: None, } .encode_to_vec(), ) .await?; } } Ok(()) } }