event handler of member is added

This commit is contained in:
병준 박 2022-08-19 03:20:45 +00:00
parent 0142b2ead3
commit 61bab7ef6a
8 changed files with 258 additions and 40 deletions

View File

@ -4,7 +4,7 @@ CREATE TABLE IF NOT EXISTS api_kgon_members (
balance_bota BIGINT NOT NULL DEFAULT 0,
balance_sum BIGINT NOT NULL DEFAULT 0,
companies BIGINT NOT NULL DEFAULT 0,
oriental_play CHAR(1) NOT NULL,
oriental_play CHAR(1) NOT NULL DEFAULT 'n',
member_id UUID NOT NULL,
created_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000),
updated_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000),

View File

@ -83,10 +83,10 @@ impl Api {
}
///
pub async fn save_member(
pub async fn create_member(
&self,
data: models::SaveMemberRequest,
) -> Result<models::SaveMemberResponse, Error> {
data: models::CreateMemberRequest,
) -> Result<models::CreateMemberResponse, Error> {
let mut params = HashMap::new();
params.insert("username", data.username);
@ -112,17 +112,41 @@ impl Api {
.await
{
Ok(res) => res,
Err(err) => {
return Err(Error { code: 0, msg: None });
Err(e) => {
return Err(Error {
code: -1,
msg: Some(e.to_string()),
});
}
};
match res.status() {
reqwest::StatusCode::OK => match res.json::<models::SaveMemberResponse>().await {
Ok(r) => Ok(r),
Err(e) => Err(Error { code: 0, msg: None }),
reqwest::StatusCode::OK => match res.json::<models::_CreateMemberResponse>().await {
Ok(r) => {
if r.code != 0 {
return Err(Error {
code: r.code,
msg: r.msg,
});
}
match r.id {
Some(id) => Ok(models::CreateMemberResponse { id }),
None => Err(Error {
code: -1,
msg: Some("id is not exist in response of api".to_string()),
}),
}
}
Err(e) => Err(Error {
code: -1,
msg: Some(e.to_string()),
}),
},
_ => Err(Error { code: 0, msg: None }),
_ => Err(Error {
code: -1,
msg: None,
}),
}
}
}

View File

@ -30,7 +30,7 @@ pub struct ListMembersResponse {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SaveMemberRequest {
pub struct CreateMemberRequest {
pub username: String,
pub nickname: String,
pub site_username: String,
@ -38,8 +38,13 @@ pub struct SaveMemberRequest {
}
#[derive(Serialize, Deserialize, Debug)]
pub struct SaveMemberResponse {
pub struct _CreateMemberResponse {
pub code: i64,
pub msg: Option<String>,
pub id: Option<i64>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct CreateMemberResponse {
pub id: i64,
}

188
src/events/member/event.rs Normal file
View File

@ -0,0 +1,188 @@
use crate::api;
use crate::core;
use crate::repositories;
use beteran_common_rust as bcr;
use beteran_protobuf_rust as bpr;
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use prost::Message;
use std::str::FromStr;
pub struct EventHandler {
connection_broker: nats::asynk::Connection,
queue_broker: String,
pool: Pool<ConnectionManager<PgConnection>>,
api_config: core::config::ApiConfig,
member_repository: repositories::member::repository::Repository,
member_api: api::member::api::Api,
}
impl std::fmt::Debug for EventHandler {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("EventHandler of members").finish()
}
}
impl EventHandler {
///
pub fn new(
connection_broker: nats::asynk::Connection,
queue_broker: String,
pool: Pool<ConnectionManager<PgConnection>>,
api_config: core::config::ApiConfig,
) -> EventHandler {
EventHandler {
connection_broker,
queue_broker,
pool,
api_config: api_config.clone(),
member_repository: repositories::member::repository::Repository::new(),
member_api: api::member::api::Api::new(api_config.clone()),
}
}
pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box<dyn std::error::Error>> {
futures::try_join!(self.event_after_create_member()).map(|_| ())
}
async fn event_after_create_member(&self) -> Result<(), Box<dyn std::error::Error>> {
let s = self
.connection_broker
.queue_subscribe(
bpr::ss::member::EVENT_SUBJECT_AFTER_CREATE_MEMBER,
self.queue_broker.as_str(),
)
.await?;
while let Some(message) = s.next().await {
if let Err(e) = async {
let eve = bpr::ss::member::AfterCreateMemberEvent::decode(message.data.as_slice())
.map_err(|e| {
bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest {
message: format!("invalid request: {}", e),
})
})?;
let client = match eve.client {
Some(c) => c,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid client information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "client".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let event = match eve.event {
Some(r) => r,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid event information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "event".to_string(),
param: "event".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let member = match event.member {
Some(c) => c,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid client information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "client".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let member_id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| {
bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams {
message: "invalid member.id param".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "member.id".to_string(),
value: member.id.clone(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: e.to_string(),
},
})
})?;
let conn = self.pool.get().map_err(|e| {
bcr::error::rpc::Error::Server(bcr::error::rpc::Server {
code: bpr::protobuf::rpc::Error::SERVER_00,
message: format!("server {}", e),
data: None,
})
})?;
let api_create_res = self
.member_api
.create_member(api::member::models::CreateMemberRequest {
username: member.username.clone(),
nickname: member.nickname.clone(),
site_username: member.username.clone(),
group_key: match member.referrer_member {
Some(m) => Some(m.username),
None => None,
},
})
.await
.map_err(|e| {
bcr::error::rpc::Error::Server(bcr::error::rpc::Server {
code: bpr::protobuf::rpc::Error::SERVER_00,
message: format!("server {:?}", e),
data: None,
})
})?;
self
.member_repository
.insert(
&conn,
&repositories::member::models::NewMember {
id: api_create_res.id,
member_id,
},
)
.map_err(|e| {
bcr::error::rpc::Error::Server(bcr::error::rpc::Server {
code: bpr::protobuf::rpc::Error::SERVER_00,
message: format!("server {}", e),
data: None,
})
})?;
Ok::<(), bcr::error::rpc::Error>(())
}
.await
{
println!("error: {}", e);
}
}
Ok(())
}
}

1
src/events/member/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod event;

1
src/events/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod member;

View File

@ -14,6 +14,7 @@ use std::env;
mod api;
mod compositions;
mod core;
mod events;
mod repositories;
mod schedulers;
mod services;
@ -67,24 +68,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let server_broker_opts = nats::asynk::Options::new();
let connection_server_broker = server_broker_opts.connect(url_server_broker).await?;
let member_service = services::member::service::Service::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
);
let member_account_service = services::member_account::service::Service::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
);
let game_service = services::game::service::Service::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
pool.clone(),
);
let betting_service = services::betting::service::Service::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
);
let mut sched = tokio_cron_scheduler::JobScheduler::new().await?;
let member_scheduler = schedulers::member::scheduler::Scheduler::get_instance(
@ -108,6 +91,31 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let _h_scheduler = sched.start().await?;
let member_service = services::member::service::Service::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
);
let member_account_service = services::member_account::service::Service::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
);
let game_service = services::game::service::Service::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
pool.clone(),
);
let betting_service = services::betting::service::Service::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
);
let member_event_handler = events::member::event::EventHandler::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
pool.clone(),
api_config.clone(),
);
println!("Server service [beteran-api-kgon-server-service] is started");
futures::try_join!(
@ -115,6 +123,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
// member_account_service.subscribe(),
game_service.subscribe(),
// betting_service.subscribe(),
member_event_handler.subscribe(),
)?;
sched.shutdown().await?;

View File

@ -32,16 +32,6 @@ pub struct NewMember {
///
pub id: i64,
///
pub balance: i64,
///
pub balance_bota: i64,
///
pub balance_sum: i64,
///
pub companies: i64,
///
pub oriental_play: String,
///
pub member_id: uuid::Uuid,
}