events of member are modified

This commit is contained in:
병준 박 2022-08-15 06:59:22 +00:00
parent 6828925563
commit f8dcce0eba
6 changed files with 177 additions and 127 deletions

View File

@ -31,7 +31,7 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tokio-cron-scheduler = { version = "0" } tokio-cron-scheduler = { version = "0" }
uuid = { version = "0", features = ["serde", "v4", "v5"] } uuid = { version = "0", features = ["serde", "v4", "v5"] }
beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.60-snapshot" } beteran-protobuf-rust = { git = "https://gitlab.loafle.net/bet/beteran-protobuf-rust.git", tag = "v0.1.61-snapshot" }
beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.44-snapshot" } beteran-common-rust = { git = "https://gitlab.loafle.net/bet/beteran-common-rust.git", tag = "v0.1.45-snapshot" }
[build-dependencies] [build-dependencies]

164
src/events/member/event.rs Normal file
View File

@ -0,0 +1,164 @@
use super::super::super::repositories;
use beteran_common_rust as bcr;
use beteran_protobuf_rust as bpr;
use diesel::{
r2d2::{ConnectionManager, Pool},
PgConnection,
};
use prost::Message;
use std::str::FromStr;
pub struct EventHandler {
connection_broker: nats::asynk::Connection,
queue_broker: String,
pool: Pool<ConnectionManager<PgConnection>>,
member_repository: repositories::member::repository::Repository,
}
impl std::fmt::Debug for EventHandler {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("EventHandler of members").finish()
}
}
impl EventHandler {
///
pub fn new(
connection_broker: nats::asynk::Connection,
queue_broker: String,
pool: Pool<ConnectionManager<PgConnection>>,
) -> EventHandler {
EventHandler {
connection_broker,
queue_broker,
pool,
member_repository: repositories::member::repository::Repository::new(),
}
}
pub async fn subscribe(&self) -> std::result::Result<(), std::boxed::Box<dyn std::error::Error>> {
futures::try_join!(self.event_after_signin()).map(|_| ())
}
async fn event_after_signin(&self) -> Result<(), Box<dyn std::error::Error>> {
let s = self
.connection_broker
.queue_subscribe(
bpr::ss::identity::EVENT_SUBJECT_AFTER_SIGNIN,
self.queue_broker.as_str(),
)
.await?;
while let Some(message) = s.next().await {
if let Err(e) = async {
let eve =
bpr::ss::identity::AfterSigninEvent::decode(message.data.as_slice()).map_err(|e| {
bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest {
message: format!("invalid request: {}", e),
})
})?;
let client = match eve.client {
Some(c) => c,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid client information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "client".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let event = match eve.event {
Some(r) => r,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid event information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "event".to_string(),
param: "event".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let member = match event.member {
Some(c) => c,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid client information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "client".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| {
bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams {
message: "invalid member.id param".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "member.id".to_string(),
value: member.id.clone(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: e.to_string(),
},
})
})?;
let conn = self.pool.get().map_err(|e| {
bcr::error::rpc::Error::Server(bcr::error::rpc::Server {
code: bpr::protobuf::rpc::Error::SERVER_00,
message: format!("server {}", e),
data: None,
})
})?;
let last_signined_at = chrono::Utc::now().timestamp();
self
.member_repository
.update_last_signined_ip(
&conn,
id,
&repositories::member::models::ModifyMember4LastSignined {
last_signined_ip: client.client_ip,
last_signined_at,
},
)
.map_err(|e| {
bcr::error::rpc::Error::Server(bcr::error::rpc::Server {
code: bpr::protobuf::rpc::Error::SERVER_00,
message: format!("server {}", e),
data: None,
})
})?;
Ok::<(), bcr::error::rpc::Error>(())
}
.await
{
println!("error: {}", e);
}
}
Ok(())
}
}

1
src/events/member/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod event;

1
src/events/mod.rs Normal file
View File

@ -0,0 +1 @@
pub mod member;

View File

@ -13,6 +13,7 @@ use diesel::{
use std::env; use std::env;
mod compositions; mod compositions;
mod events;
mod repositories; mod repositories;
mod services; mod services;
@ -110,6 +111,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
pool.clone(), pool.clone(),
); );
let member_event_handler = events::member::event::EventHandler::new(
connection_server_broker.clone(),
queue_server_broker.clone(),
pool.clone(),
);
println!("Server service [beteran-server-service] is started"); println!("Server service [beteran-server-service] is started");
futures::try_join!( futures::try_join!(
@ -122,6 +129,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
member_class_service.subscribe(), member_class_service.subscribe(),
member_level_service.subscribe(), member_level_service.subscribe(),
site_service.subscribe(), site_service.subscribe(),
member_event_handler.subscribe(),
)?; )?;
Ok(()) Ok(())

View File

@ -1,8 +1,6 @@
//! //!
//! //!
use std::str::FromStr;
use super::super::super::compositions; use super::super::super::compositions;
use super::super::super::repositories; use super::super::super::repositories;
use beteran_common_rust as bcr; use beteran_common_rust as bcr;
@ -12,6 +10,7 @@ use diesel::{
PgConnection, PgConnection,
}; };
use prost::Message; use prost::Message;
use std::str::FromStr;
/// ///
pub struct Service<'a> { pub struct Service<'a> {
@ -66,7 +65,6 @@ impl Service<'_> {
self.update_member(), self.update_member(),
self.update_member_for_state(), self.update_member_for_state(),
self.delete_member(), self.delete_member(),
self.event_after_signin()
) )
.map(|_| ()) .map(|_| ())
} }
@ -1397,126 +1395,4 @@ impl Service<'_> {
Ok(()) Ok(())
} }
async fn event_after_signin(&self) -> Result<(), Box<dyn std::error::Error>> {
let s = self
.connection_broker
.queue_subscribe(
bpr::ss::identity::EVENT_SUBJECT_AFTER_SIGNIN,
self.queue_broker.as_str(),
)
.await?;
while let Some(message) = s.next().await {
if let Err(e) = async {
let eve =
bpr::ss::identity::AfterSigninEvent::decode(message.data.as_slice()).map_err(|e| {
bcr::error::rpc::Error::InvalidRequest(bcr::error::rpc::InvalidRequest {
message: format!("invalid request: {}", e),
})
})?;
let client = match eve.client {
Some(c) => c,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid client information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "client".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let event = match eve.event {
Some(r) => r,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid event information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "event".to_string(),
param: "event".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let member = match event.member {
Some(c) => c,
None => {
return Err(bcr::error::rpc::Error::InvalidParams(
bcr::error::rpc::InvalidParams {
message: "invalid client information".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "client".to_string(),
value: "".to_string(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: "".to_string(),
},
},
));
}
};
let id = uuid::Uuid::from_str(member.id.as_str()).map_err(|e| {
bcr::error::rpc::Error::InvalidParams(bcr::error::rpc::InvalidParams {
message: "invalid member.id param".to_string(),
detail: bcr::error::rpc::InvalidParamsDetail {
location: "request".to_string(),
param: "member.id".to_string(),
value: member.id.clone(),
error_type: bcr::error::rpc::InvalidParamsType::Required,
message: e.to_string(),
},
})
})?;
let conn = self.pool.get().map_err(|e| {
bcr::error::rpc::Error::Server(bcr::error::rpc::Server {
code: bpr::protobuf::rpc::Error::SERVER_00,
message: format!("server {}", e),
data: None,
})
})?;
let last_signined_at = chrono::Utc::now().timestamp();
self
.member_repository
.update_last_signined_ip(
&conn,
id,
&repositories::member::models::ModifyMember4LastSignined {
last_signined_ip: client.client_ip,
last_signined_at,
},
)
.map_err(|e| {
bcr::error::rpc::Error::Server(bcr::error::rpc::Server {
code: bpr::protobuf::rpc::Error::SERVER_00,
message: format!("server {}", e),
data: None,
})
})?;
Ok::<(), bcr::error::rpc::Error>(())
}
.await
{
println!("error: {}", e);
}
}
Ok(())
}
} }