bug fixed
This commit is contained in:
parent
0db6f75d61
commit
b2c439de7c
|
@ -1,5 +1,5 @@
|
||||||
CREATE TABLE IF NOT EXISTS api_kgon_synchronizations (
|
CREATE TABLE IF NOT EXISTS api_kgon_synchronizations (
|
||||||
id SERIAL PRIMARY KEY;
|
id SERIAL PRIMARY KEY,
|
||||||
item TEXT NOT NULL,
|
item TEXT NOT NULL,
|
||||||
last_code BIGINT NOT NULL,
|
last_code BIGINT NOT NULL,
|
||||||
synchronized_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000)
|
synchronized_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000)
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
CREATE TABLE IF NOT EXISTS api_kgon_synchronization_history (
|
CREATE TABLE IF NOT EXISTS api_kgon_synchronization_history (
|
||||||
id SERIAL PRIMARY KEY;
|
id SERIAL PRIMARY KEY,
|
||||||
item TEXT NOT NULL,
|
item TEXT NOT NULL,
|
||||||
start_at BIGINT NOT NULL,
|
start_at BIGINT NOT NULL,
|
||||||
complete_at BIGINT NOT NULL,
|
complete_at BIGINT NOT NULL,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
CREATE TABLE IF NOT EXISTS api_kgon_balances (
|
CREATE TABLE IF NOT EXISTS api_kgon_balances (
|
||||||
id SERIAL PRIMARY KEY;
|
id SERIAL PRIMARY KEY,
|
||||||
balance BIGINT NOT NULL DEFAULT 0,
|
balance BIGINT NOT NULL DEFAULT 0,
|
||||||
balance_bota BIGINT NOT NULL DEFAULT 0,
|
balance_bota BIGINT NOT NULL DEFAULT 0,
|
||||||
created_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000),
|
created_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000),
|
||||||
|
|
|
@ -66,18 +66,18 @@ impl Scheduler {
|
||||||
|
|
||||||
async fn add_history(
|
async fn add_history(
|
||||||
&'static self,
|
&'static self,
|
||||||
conn: &diesel::PgConnection,
|
|
||||||
item: String,
|
item: String,
|
||||||
start_at: i64,
|
start_at: i64,
|
||||||
code: i64,
|
code: i64,
|
||||||
message: Option<String>,
|
message: Option<String>,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let conn = self.pool.get().expect("conn");
|
||||||
let complete_at = (chrono::Utc::now()).timestamp();
|
let complete_at = (chrono::Utc::now()).timestamp();
|
||||||
|
|
||||||
self
|
self
|
||||||
.synchronization_history_repository
|
.synchronization_history_repository
|
||||||
.insert(
|
.insert(
|
||||||
conn,
|
&conn,
|
||||||
&repositories::synchronization_history::models::NewSynchronizationHistory {
|
&repositories::synchronization_history::models::NewSynchronizationHistory {
|
||||||
item,
|
item,
|
||||||
start_at,
|
start_at,
|
||||||
|
@ -101,13 +101,15 @@ impl Scheduler {
|
||||||
let res = match self.member_api.list_members(req).await {
|
let res = match self.member_api.list_members(req).await {
|
||||||
Ok(r) => Ok(r),
|
Ok(r) => Ok(r),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
|
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
e.code,
|
e.code,
|
||||||
e.msg.clone(),
|
e.msg.clone(),
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
|
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
|
@ -121,13 +123,15 @@ impl Scheduler {
|
||||||
let res = match self.member_account_api.get_balance_for_user(req).await {
|
let res = match self.member_account_api.get_balance_for_user(req).await {
|
||||||
Ok(r) => Ok(r),
|
Ok(r) => Ok(r),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
|
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
e.code,
|
e.code,
|
||||||
e.msg.clone(),
|
e.msg.clone(),
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
|
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
|
@ -147,17 +151,19 @@ impl Scheduler {
|
||||||
.expect("member update_balance");
|
.expect("member update_balance");
|
||||||
}
|
}
|
||||||
|
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
|
repositories::synchronization::models::ITEM_BALANCE_USER.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
0,
|
0,
|
||||||
None,
|
None,
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.sched.add(j_synchronization).await;
|
self.sched.add(j_synchronization).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -172,13 +178,15 @@ impl Scheduler {
|
||||||
let res = match self.member_account_api.get_balance_for_partner(req).await {
|
let res = match self.member_account_api.get_balance_for_partner(req).await {
|
||||||
Ok(r) => Ok(r),
|
Ok(r) => Ok(r),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(),
|
repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
e.code,
|
e.code,
|
||||||
e.msg.clone(),
|
e.msg.clone(),
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
|
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
|
@ -217,17 +225,19 @@ impl Scheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(),
|
repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
0,
|
0,
|
||||||
None,
|
None,
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.sched.add(j_synchronization).await;
|
self.sched.add(j_synchronization).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -63,18 +63,18 @@ impl Scheduler {
|
||||||
|
|
||||||
async fn add_history(
|
async fn add_history(
|
||||||
&'static self,
|
&'static self,
|
||||||
conn: &diesel::PgConnection,
|
|
||||||
item: String,
|
item: String,
|
||||||
start_at: i64,
|
start_at: i64,
|
||||||
code: i64,
|
code: i64,
|
||||||
message: Option<String>,
|
message: Option<String>,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let complete_at = (chrono::Utc::now()).timestamp();
|
let complete_at = (chrono::Utc::now()).timestamp();
|
||||||
|
let conn = self.pool.get().expect("conn");
|
||||||
|
|
||||||
self
|
self
|
||||||
.synchronization_history_repository
|
.synchronization_history_repository
|
||||||
.insert(
|
.insert(
|
||||||
conn,
|
&conn,
|
||||||
&repositories::synchronization_history::models::NewSynchronizationHistory {
|
&repositories::synchronization_history::models::NewSynchronizationHistory {
|
||||||
item,
|
item,
|
||||||
start_at,
|
start_at,
|
||||||
|
@ -115,13 +115,15 @@ impl Scheduler {
|
||||||
let res = match self.game_api.list_games(req).await {
|
let res = match self.game_api.list_games(req).await {
|
||||||
Ok(r) => Ok(r),
|
Ok(r) => Ok(r),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_VENDORS.to_string(),
|
repositories::synchronization::models::ITEM_VENDORS.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
e.code,
|
e.code,
|
||||||
e.msg.clone(),
|
e.msg.clone(),
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
|
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
|
@ -142,22 +144,24 @@ impl Scheduler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let affected = self
|
let _affected = self
|
||||||
.game_repository
|
.game_repository
|
||||||
.upserts(&conn, upsert_games)
|
.upserts(&conn, upsert_games)
|
||||||
.expect("game upsert");
|
.expect("game upsert");
|
||||||
|
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_GAMES.to_string(),
|
repositories::synchronization::models::ITEM_GAMES.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
0,
|
0,
|
||||||
None,
|
None,
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.sched.add(j_synchronization).await;
|
self.sched.add(j_synchronization).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,18 +61,18 @@ impl Scheduler {
|
||||||
|
|
||||||
async fn add_history(
|
async fn add_history(
|
||||||
&'static self,
|
&'static self,
|
||||||
conn: &diesel::PgConnection,
|
|
||||||
item: String,
|
item: String,
|
||||||
start_at: i64,
|
start_at: i64,
|
||||||
code: i64,
|
code: i64,
|
||||||
message: Option<String>,
|
message: Option<String>,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let conn = self.pool.get().expect("conn");
|
||||||
let complete_at = (chrono::Utc::now()).timestamp();
|
let complete_at = (chrono::Utc::now()).timestamp();
|
||||||
|
|
||||||
self
|
self
|
||||||
.synchronization_history_repository
|
.synchronization_history_repository
|
||||||
.insert(
|
.insert(
|
||||||
conn,
|
&conn,
|
||||||
&repositories::synchronization_history::models::NewSynchronizationHistory {
|
&repositories::synchronization_history::models::NewSynchronizationHistory {
|
||||||
item,
|
item,
|
||||||
start_at,
|
start_at,
|
||||||
|
@ -96,13 +96,15 @@ impl Scheduler {
|
||||||
let res = match self.member_api.list_members(req).await {
|
let res = match self.member_api.list_members(req).await {
|
||||||
Ok(r) => Ok(r),
|
Ok(r) => Ok(r),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_MEMBERS.to_string(),
|
repositories::synchronization::models::ITEM_MEMBERS.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
e.code,
|
e.code,
|
||||||
e.msg.clone(),
|
e.msg.clone(),
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
|
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
|
@ -122,17 +124,19 @@ impl Scheduler {
|
||||||
.expect("member update");
|
.expect("member update");
|
||||||
}
|
}
|
||||||
|
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_MEMBERS.to_string(),
|
repositories::synchronization::models::ITEM_MEMBERS.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
0,
|
0,
|
||||||
None,
|
None,
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.sched.add(j_synchronization).await;
|
self.sched.add(j_synchronization).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
24
src/schedulers/vendor/scheduler.rs
vendored
24
src/schedulers/vendor/scheduler.rs
vendored
|
@ -61,18 +61,18 @@ impl Scheduler {
|
||||||
|
|
||||||
async fn add_history(
|
async fn add_history(
|
||||||
&'static self,
|
&'static self,
|
||||||
conn: &diesel::PgConnection,
|
|
||||||
item: String,
|
item: String,
|
||||||
start_at: i64,
|
start_at: i64,
|
||||||
code: i64,
|
code: i64,
|
||||||
message: Option<String>,
|
message: Option<String>,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let complete_at = (chrono::Utc::now()).timestamp();
|
let complete_at = (chrono::Utc::now()).timestamp();
|
||||||
|
let conn = self.pool.get().expect("conn");
|
||||||
|
|
||||||
self
|
self
|
||||||
.synchronization_history_repository
|
.synchronization_history_repository
|
||||||
.insert(
|
.insert(
|
||||||
conn,
|
&conn,
|
||||||
&repositories::synchronization_history::models::NewSynchronizationHistory {
|
&repositories::synchronization_history::models::NewSynchronizationHistory {
|
||||||
item,
|
item,
|
||||||
start_at,
|
start_at,
|
||||||
|
@ -96,13 +96,15 @@ impl Scheduler {
|
||||||
let res = match self.vendor_api.list_vendors(req).await {
|
let res = match self.vendor_api.list_vendors(req).await {
|
||||||
Ok(r) => Ok(r),
|
Ok(r) => Ok(r),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_VENDORS.to_string(),
|
repositories::synchronization::models::ITEM_VENDORS.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
e.code,
|
e.code,
|
||||||
e.msg.clone(),
|
e.msg.clone(),
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
|
|
||||||
Err(e)
|
Err(e)
|
||||||
}
|
}
|
||||||
|
@ -126,22 +128,24 @@ impl Scheduler {
|
||||||
})
|
})
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
let affected = self
|
let _affected = self
|
||||||
.vendor_repository
|
.vendor_repository
|
||||||
.upserts(&conn, upsert_vendors)
|
.upserts(&conn, upsert_vendors)
|
||||||
.expect("vendor upsert");
|
.expect("vendor upsert");
|
||||||
|
|
||||||
self.add_history(
|
self
|
||||||
&conn,
|
.add_history(
|
||||||
repositories::synchronization::models::ITEM_VENDORS.to_string(),
|
repositories::synchronization::models::ITEM_VENDORS.to_string(),
|
||||||
start_at,
|
start_at,
|
||||||
0,
|
0,
|
||||||
None,
|
None,
|
||||||
);
|
)
|
||||||
|
.await
|
||||||
|
.expect("add_history");
|
||||||
})
|
})
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
self.sched.add(j_synchronization).await;
|
self.sched.add(j_synchronization).await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue
Block a user