diff --git a/migrations/202208051000_api_kgon_synchronization/up.sql b/migrations/202208051000_api_kgon_synchronization/up.sql index 88601ad..ee53c7b 100644 --- a/migrations/202208051000_api_kgon_synchronization/up.sql +++ b/migrations/202208051000_api_kgon_synchronization/up.sql @@ -1,5 +1,5 @@ CREATE TABLE IF NOT EXISTS api_kgon_synchronizations ( - id SERIAL PRIMARY KEY; + id SERIAL PRIMARY KEY, item TEXT NOT NULL, last_code BIGINT NOT NULL, synchronized_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000) diff --git a/migrations/202208051010_api_kgon_synchronization_history/up.sql b/migrations/202208051010_api_kgon_synchronization_history/up.sql index 320fabe..f1c7d99 100644 --- a/migrations/202208051010_api_kgon_synchronization_history/up.sql +++ b/migrations/202208051010_api_kgon_synchronization_history/up.sql @@ -1,5 +1,5 @@ CREATE TABLE IF NOT EXISTS api_kgon_synchronization_history ( - id SERIAL PRIMARY KEY; + id SERIAL PRIMARY KEY, item TEXT NOT NULL, start_at BIGINT NOT NULL, complete_at BIGINT NOT NULL, diff --git a/migrations/202208051400_api_kgon_balance/up.sql b/migrations/202208051400_api_kgon_balance/up.sql index a605cae..14fa1d3 100644 --- a/migrations/202208051400_api_kgon_balance/up.sql +++ b/migrations/202208051400_api_kgon_balance/up.sql @@ -1,5 +1,5 @@ CREATE TABLE IF NOT EXISTS api_kgon_balances ( - id SERIAL PRIMARY KEY; + id SERIAL PRIMARY KEY, balance BIGINT NOT NULL DEFAULT 0, balance_bota BIGINT NOT NULL DEFAULT 0, created_at BIGINT NOT NULL DEFAULT (extract(epoch from now()) * 1000), diff --git a/migrations/202208071200_api_kgon_betting/down.sql b/migrations/202208071200_api_kgon_betting/down.sql deleted file mode 100644 index e69de29..0000000 diff --git a/migrations/202208071200_api_kgon_betting/up.sql b/migrations/202208071200_api_kgon_betting/up.sql deleted file mode 100644 index e69de29..0000000 diff --git a/src/schedulers/balance/scheduler.rs b/src/schedulers/balance/scheduler.rs index b580aef..9d78432 100644 --- a/src/schedulers/balance/scheduler.rs +++ b/src/schedulers/balance/scheduler.rs @@ -66,18 +66,18 @@ impl Scheduler { async fn add_history( &'static self, - conn: &diesel::PgConnection, item: String, start_at: i64, code: i64, message: Option, ) -> Result<(), Box> { + let conn = self.pool.get().expect("conn"); let complete_at = (chrono::Utc::now()).timestamp(); self .synchronization_history_repository .insert( - conn, + &conn, &repositories::synchronization_history::models::NewSynchronizationHistory { item, start_at, @@ -101,13 +101,15 @@ impl Scheduler { let res = match self.member_api.list_members(req).await { Ok(r) => Ok(r), Err(e) => { - self.add_history( - &conn, - repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - e.code, - e.msg.clone(), - ); + self + .add_history( + repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); Err(e) } @@ -121,13 +123,15 @@ impl Scheduler { let res = match self.member_account_api.get_balance_for_user(req).await { Ok(r) => Ok(r), Err(e) => { - self.add_history( - &conn, - repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - e.code, - e.msg.clone(), - ); + self + .add_history( + repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); Err(e) } @@ -147,17 +151,19 @@ impl Scheduler { .expect("member update_balance"); } - self.add_history( - &conn, - repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), - start_at, - 0, - None, - ); + self + .add_history( + repositories::synchronization::models::ITEM_BALANCE_USER.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); }) })?; - self.sched.add(j_synchronization).await; + self.sched.add(j_synchronization).await?; Ok(()) } @@ -172,13 +178,15 @@ impl Scheduler { let res = match self.member_account_api.get_balance_for_partner(req).await { Ok(r) => Ok(r), Err(e) => { - self.add_history( - &conn, - repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), - start_at, - e.code, - e.msg.clone(), - ); + self + .add_history( + repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); Err(e) } @@ -217,17 +225,19 @@ impl Scheduler { } } - self.add_history( - &conn, - repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), - start_at, - 0, - None, - ); + self + .add_history( + repositories::synchronization::models::ITEM_BALANCE_PARTNER.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); }) })?; - self.sched.add(j_synchronization).await; + self.sched.add(j_synchronization).await?; Ok(()) } diff --git a/src/schedulers/game/scheduler.rs b/src/schedulers/game/scheduler.rs index 5cdb035..44445d8 100644 --- a/src/schedulers/game/scheduler.rs +++ b/src/schedulers/game/scheduler.rs @@ -63,18 +63,18 @@ impl Scheduler { async fn add_history( &'static self, - conn: &diesel::PgConnection, item: String, start_at: i64, code: i64, message: Option, ) -> Result<(), Box> { let complete_at = (chrono::Utc::now()).timestamp(); + let conn = self.pool.get().expect("conn"); self .synchronization_history_repository .insert( - conn, + &conn, &repositories::synchronization_history::models::NewSynchronizationHistory { item, start_at, @@ -115,13 +115,15 @@ impl Scheduler { let res = match self.game_api.list_games(req).await { Ok(r) => Ok(r), Err(e) => { - self.add_history( - &conn, - repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - e.code, - e.msg.clone(), - ); + self + .add_history( + repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); Err(e) } @@ -142,22 +144,24 @@ impl Scheduler { } } - let affected = self + let _affected = self .game_repository .upserts(&conn, upsert_games) .expect("game upsert"); - self.add_history( - &conn, - repositories::synchronization::models::ITEM_GAMES.to_string(), - start_at, - 0, - None, - ); + self + .add_history( + repositories::synchronization::models::ITEM_GAMES.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); }) })?; - self.sched.add(j_synchronization).await; + self.sched.add(j_synchronization).await?; Ok(()) } diff --git a/src/schedulers/member/scheduler.rs b/src/schedulers/member/scheduler.rs index 43ed92b..02e2f37 100644 --- a/src/schedulers/member/scheduler.rs +++ b/src/schedulers/member/scheduler.rs @@ -61,18 +61,18 @@ impl Scheduler { async fn add_history( &'static self, - conn: &diesel::PgConnection, item: String, start_at: i64, code: i64, message: Option, ) -> Result<(), Box> { + let conn = self.pool.get().expect("conn"); let complete_at = (chrono::Utc::now()).timestamp(); self .synchronization_history_repository .insert( - conn, + &conn, &repositories::synchronization_history::models::NewSynchronizationHistory { item, start_at, @@ -96,13 +96,15 @@ impl Scheduler { let res = match self.member_api.list_members(req).await { Ok(r) => Ok(r), Err(e) => { - self.add_history( - &conn, - repositories::synchronization::models::ITEM_MEMBERS.to_string(), - start_at, - e.code, - e.msg.clone(), - ); + self + .add_history( + repositories::synchronization::models::ITEM_MEMBERS.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); Err(e) } @@ -122,17 +124,19 @@ impl Scheduler { .expect("member update"); } - self.add_history( - &conn, - repositories::synchronization::models::ITEM_MEMBERS.to_string(), - start_at, - 0, - None, - ); + self + .add_history( + repositories::synchronization::models::ITEM_MEMBERS.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); }) })?; - self.sched.add(j_synchronization).await; + self.sched.add(j_synchronization).await?; Ok(()) } diff --git a/src/schedulers/vendor/scheduler.rs b/src/schedulers/vendor/scheduler.rs index 68452cb..13cda79 100644 --- a/src/schedulers/vendor/scheduler.rs +++ b/src/schedulers/vendor/scheduler.rs @@ -61,18 +61,18 @@ impl Scheduler { async fn add_history( &'static self, - conn: &diesel::PgConnection, item: String, start_at: i64, code: i64, message: Option, ) -> Result<(), Box> { let complete_at = (chrono::Utc::now()).timestamp(); + let conn = self.pool.get().expect("conn"); self .synchronization_history_repository .insert( - conn, + &conn, &repositories::synchronization_history::models::NewSynchronizationHistory { item, start_at, @@ -96,13 +96,15 @@ impl Scheduler { let res = match self.vendor_api.list_vendors(req).await { Ok(r) => Ok(r), Err(e) => { - self.add_history( - &conn, - repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - e.code, - e.msg.clone(), - ); + self + .add_history( + repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + e.code, + e.msg.clone(), + ) + .await + .expect("add_history"); Err(e) } @@ -126,22 +128,24 @@ impl Scheduler { }) .collect(); - let affected = self + let _affected = self .vendor_repository .upserts(&conn, upsert_vendors) .expect("vendor upsert"); - self.add_history( - &conn, - repositories::synchronization::models::ITEM_VENDORS.to_string(), - start_at, - 0, - None, - ); + self + .add_history( + repositories::synchronization::models::ITEM_VENDORS.to_string(), + start_at, + 0, + None, + ) + .await + .expect("add_history"); }) })?; - self.sched.add(j_synchronization).await; + self.sched.add(j_synchronization).await?; Ok(()) }