diff --git a/src/member_referrer/service.rs b/src/member_referrer/service.rs index 1ae467f..fc29c9c 100644 --- a/src/member_referrer/service.rs +++ b/src/member_referrer/service.rs @@ -27,10 +27,13 @@ impl Service { pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box> { futures::try_join!( self.create_member_referrer(), + self.create_member_referrers(), self.list_member_referrers(), self.get_member_referrer(), + self.get_member_referrer_by_code(), self.update_member_referrer(), self.delete_member_referrer(), + self.delete_member_referrers(), ) .map(|_| ()) } @@ -176,6 +179,114 @@ impl Service { Ok(()) } + async fn create_member_referrers(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::c2se::backend::member_referrer::SUBJECT_CREATE_MEMBER_REFERRERS, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let client = self.get_client_in_header(&message)?; + + let req = + bpr::c2se::member_referrer::CreateMemberReferrersRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let ss_create_member_referrers_req = + bpr::ss::member_referrer::CreateMemberReferrersRequest { + client: Some(client), + request: Some( + bpr::ss::member_referrer::create_member_referrers_request::Request { + member_ids: req.member_ids, + }, + ), + }; + + let ss_create_member_referrers_res_msg = self + .connection_broker + .request( + bpr::ss::member_referrer::SUBJECT_CREATE_MEMBER_REFERRERS, + ss_create_member_referrers_req.encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let ss_create_member_referrers_res = + bpr::ss::member_referrer::CreateMemberReferrersResponse::decode( + ss_create_member_referrers_res_msg.data.as_slice(), + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + if let Some(e) = ss_create_member_referrers_res.error { + return Err(bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + })); + } + + if let Some(r) = ss_create_member_referrers_res.result { + message + .respond( + bpr::c2se::member_referrer::CreateMemberReferrersResponse { + error: None, + result: Some( + bpr::c2se::member_referrer::create_member_referrers_response::Result { + member_referrers: r.member_referrers, + }, + ), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + } + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::c2se::member_referrer::CreateMemberReferrersResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } + async fn list_member_referrers(&self) -> Result<(), Box> { let s = self .connection_broker @@ -325,7 +436,7 @@ impl Service { let ss_get_member_referrer_res_msg = self .connection_broker .request( - bpr::ss::member_referrer::SUBJECT_UPDATE_MEMBER_REFERRER, + bpr::ss::member_referrer::SUBJECT_GET_MEMBER_REFERRER, ss_get_member_referrer_req.encode_to_vec(), ) .await @@ -399,6 +510,115 @@ impl Service { Ok(()) } + async fn get_member_referrer_by_code(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::c2se::backend::member_referrer::SUBJECT_GET_MEMBER_REFERRER_BY_CODE, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let client = self.get_client_in_header(&message)?; + + let req = bpr::c2se::member_referrer::GetMemberReferrerByCodeRequest::decode( + message.data.as_slice(), + ) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let ss_get_member_referrer_by_code_req = + bpr::ss::member_referrer::GetMemberReferrerByCodeRequest { + client: Some(client), + request: Some( + bpr::ss::member_referrer::get_member_referrer_by_code_request::Request { + code: req.code, + }, + ), + }; + + let ss_get_member_referrer_by_code_res_msg = self + .connection_broker + .request( + bpr::ss::member_referrer::SUBJECT_GET_MEMBER_REFERRER_BY_CODE, + ss_get_member_referrer_by_code_req.encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let ss_get_member_referrer_by_code_res = + bpr::ss::member_referrer::GetMemberReferrerByCodeResponse::decode( + ss_get_member_referrer_by_code_res_msg.data.as_slice(), + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + if let Some(e) = ss_get_member_referrer_by_code_res.error { + return Err(bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + })); + } + + if let Some(r) = ss_get_member_referrer_by_code_res.result { + message + .respond( + bpr::c2se::member_referrer::GetMemberReferrerByCodeResponse { + error: None, + result: Some( + bpr::c2se::member_referrer::get_member_referrer_by_code_response::Result { + member_referrer: r.member_referrer, + }, + ), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + } + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::c2se::member_referrer::GetMemberReferrerByCodeResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } + async fn update_member_referrer(&self) -> Result<(), Box> { let s = self .connection_broker @@ -610,4 +830,108 @@ impl Service { Ok(()) } + + async fn delete_member_referrers(&self) -> Result<(), Box> { + let s = self + .connection_broker + .queue_subscribe( + bpr::c2se::backend::member_referrer::SUBJECT_DELETE_MEMBER_REFERRERS, + self.queue_broker.as_str(), + ) + .await?; + + while let Some(message) = s.next().await { + if let Err(e) = async { + let client = self.get_client_in_header(&message)?; + + let req = + bpr::c2se::member_referrer::DeleteMemberReferrersRequest::decode(message.data.as_slice()) + .map_err(|e| { + bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest { + message: format!("invalid request: {}", e), + }) + })?; + + let ss_delete_member_referrers_req = + bpr::ss::member_referrer::DeleteMemberReferrersRequest { + client: Some(client), + request: Some( + bpr::ss::member_referrer::delete_member_referrers_request::Request { ids: req.ids }, + ), + }; + + let ss_delete_member_referrers_res_msg = self + .connection_broker + .request( + bpr::ss::member_referrer::SUBJECT_DELETE_MEMBER_REFERRERS, + ss_delete_member_referrers_req.encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + let ss_delete_member_referrers_res = + bpr::ss::member_referrer::DeleteMemberReferrersResponse::decode( + ss_delete_member_referrers_res_msg.data.as_slice(), + ) + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + + if let Some(e) = ss_delete_member_referrers_res.error { + return Err(bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + })); + } + + if let Some(r) = ss_delete_member_referrers_res.result { + message + .respond( + bpr::c2se::member_referrer::DeleteMemberReferrersResponse { + error: None, + result: Some( + bpr::c2se::member_referrer::delete_member_referrers_response::Result {}, + ), + } + .encode_to_vec(), + ) + .await + .map_err(|e| { + bcr::error::rpc::Error::Server(bcr::error::rpc::Server { + code: bpr::protobuf::rpc::Error::SERVER_00, + message: format!("server {}", e), + data: None, + }) + })?; + } + + Ok::<(), bcr::error::rpc::Error>(()) + } + .await + { + message + .respond( + bpr::c2se::member_referrer::DeleteMemberReferrersResponse { + error: Some(bpr::protobuf::rpc::Error::from(e)), + result: None, + } + .encode_to_vec(), + ) + .await?; + } + } + + Ok(()) + } }