Add daily cleanup of old notifications in EventsWorker: Implement a new method to delete notifications older than 30 days, enhancing database management and reducing clutter. Update run loop to trigger this cleanup daily, ensuring timely maintenance of notification records.

This commit is contained in:
Torsten Schulz (local)
2026-01-15 13:45:31 +01:00
parent fc1b0c2259
commit 0083d7be23
2 changed files with 33 additions and 3 deletions

View File

@@ -402,6 +402,7 @@ impl EventsWorker {
fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc<WorkerState>) { fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc<WorkerState>) {
let mut last_event_check = None; let mut last_event_check = None;
let mut last_notification_cleanup: Option<Instant> = None;
let mut rng = rand::thread_rng(); let mut rng = rand::thread_rng();
let events = Self::initialize_events(); let events = Self::initialize_events();
eprintln!("[EventsWorker] Worker-Thread gestartet"); eprintln!("[EventsWorker] Worker-Thread gestartet");
@@ -414,7 +415,6 @@ impl EventsWorker {
let now = Instant::now(); let now = Instant::now();
// Heartbeat im Run-Loop (unabhängig davon, ob Events triggern) // Heartbeat im Run-Loop (unabhängig davon, ob Events triggern)
// Damit man sofort sieht, ob der Worker überhaupt läuft.
static LAST_RUNLOOP_HEARTBEAT: Mutex<Option<Instant>> = Mutex::new(None); static LAST_RUNLOOP_HEARTBEAT: Mutex<Option<Instant>> = Mutex::new(None);
{ {
let mut last = LAST_RUNLOOP_HEARTBEAT.lock().unwrap(); let mut last = LAST_RUNLOOP_HEARTBEAT.lock().unwrap();
@@ -441,8 +441,13 @@ impl EventsWorker {
last_event_check = Some(now); last_event_check = Some(now);
} }
// 10-Sekunden-Wartezeit in kurze Scheiben aufteilen, damit ein Shutdown // Alte Notifications einmal täglich aufräumen (alle 24 Stunden)
// (running_worker = false) schnell greift. if should_run_interval(last_notification_cleanup, now, Duration::from_secs(86400)) {
Self::cleanup_old_notifications(&pool);
last_notification_cleanup = Some(now);
}
// 10-Sekunden-Wartezeit in kurze Scheiben aufteilen
const SLICE_MS: u64 = 500; const SLICE_MS: u64 = 500;
let total_ms = 10_000; let total_ms = 10_000;
let mut slept = 0; let mut slept = 0;
@@ -458,6 +463,25 @@ impl EventsWorker {
} }
} }
fn cleanup_old_notifications(pool: &ConnectionPool) {
use crate::worker::sql::QUERY_DELETE_OLD_NOTIFICATIONS;
let result = (|| -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("delete_old_notifications", QUERY_DELETE_OLD_NOTIFICATIONS)?;
conn.execute("delete_old_notifications", &[])?;
Ok(())
})();
match result {
Ok(()) => eprintln!("[EventsWorker] Alte Notifications (>30 Tage) aufgeräumt"),
Err(err) => eprintln!("[EventsWorker] Fehler beim Aufräumen alter Notifications: {err}"),
}
}
// Globaler Skalierungsfaktor für Ereignisfrequenz. // Globaler Skalierungsfaktor für Ereignisfrequenz.
// Default: 1.0 (unverändert). Optional per ENV `EVENT_RATE_SCALE` konfigurierbar. // Default: 1.0 (unverändert). Optional per ENV `EVENT_RATE_SCALE` konfigurierbar.
fn event_rate_scale() -> f64 { fn event_rate_scale() -> f64 {

View File

@@ -71,6 +71,12 @@ pub const QUERY_INSERT_NOTIFICATION_FULL: &str = r#"
) VALUES ($1, $2, $3, FALSE, NOW(), NOW()); ) VALUES ($1, $2, $3, FALSE, NOW(), NOW());
"#; "#;
/// Löscht alte Notifications (älter als 30 Tage)
pub const QUERY_DELETE_OLD_NOTIFICATIONS: &str = r#"
DELETE FROM falukant_log.notification
WHERE created_at < NOW() - INTERVAL '30 days';
"#;
// Product pricing // Product pricing
pub const QUERY_GET_PRODUCT_COST: &str = r#" pub const QUERY_GET_PRODUCT_COST: &str = r#"
SELECT sell_cost, sell_cost AS original_sell_cost FROM falukant_type.product WHERE id = $1; SELECT sell_cost, sell_cost AS original_sell_cost FROM falukant_type.product WHERE id = $1;