use crate::db::{ConnectionPool, DbError, Row}; use crate::message_broker::MessageBroker; use std::collections::HashSet; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; pub struct PoliticsWorker { base: BaseWorker, } #[derive(Debug, Clone)] struct OfficeCounts { region_id: i32, required: i32, occupied: i32, } #[derive(Debug, Clone)] struct Election { election_id: i32, region_id: i32, posts_to_fill: i32, } #[derive(Debug, Clone)] struct OfficeGap { office_type_id: i32, region_id: i32, gaps: i32, } #[derive(Debug, Clone)] struct Office { office_id: i32, office_type_id: i32, character_id: i32, region_id: i32, } // --- SQL-Konstanten (1:1 aus politics_worker.h übernommen) ------------------ const QUERY_COUNT_OFFICES_PER_REGION: &str = r#" WITH seats_per_region AS ( SELECT pot.id AS office_type_id, rt.id AS region_id, pot.seats_per_region AS seats_total FROM falukant_type.political_office_type AS pot JOIN falukant_type.region AS rt ON pot.region_type = rt.label_tr ), occupied AS ( SELECT po.office_type_id, po.region_id, COUNT(*) AS occupied_count FROM falukant_data.political_office AS po GROUP BY po.office_type_id, po.region_id ), combined AS ( SELECT spr.region_id, spr.seats_total AS required_count, COALESCE(o.occupied_count, 0) AS occupied_count FROM seats_per_region AS spr LEFT JOIN occupied AS o ON spr.office_type_id = o.office_type_id AND spr.region_id = o.region_id ) SELECT region_id, SUM(required_count) AS required_count, SUM(occupied_count) AS occupied_count FROM combined GROUP BY region_id; "#; /// Findet alle Kombinationen aus Amtstyp und Region, für die laut /// `seats_per_region` mehr Sitze existieren sollten, als aktuell in /// `falukant_data.political_office` belegt sind. Es werden ausschließlich /// positive Differenzen (Gaps) zurückgegeben – wenn `occupied > required` /// (z. B. nach Reduktion der Sitzzahl), wird **nichts** gelöscht und die /// Kombination erscheint hier nicht. const QUERY_FIND_OFFICE_GAPS: &str = r#" WITH seats AS ( SELECT pot.id AS office_type_id, rt.id AS region_id, pot.seats_per_region AS seats_total FROM falukant_type.political_office_type AS pot JOIN falukant_type.region AS rt ON pot.region_type = rt.label_tr ), occupied AS ( SELECT po.office_type_id, po.region_id, COUNT(*) AS occupied_count FROM falukant_data.political_office AS po GROUP BY po.office_type_id, po.region_id ) SELECT s.office_type_id, s.region_id, (s.seats_total - COALESCE(o.occupied_count, 0)) AS gaps FROM seats AS s LEFT JOIN occupied AS o ON s.office_type_id = o.office_type_id AND s.region_id = o.region_id WHERE (s.seats_total - COALESCE(o.occupied_count, 0)) > 0; "#; const QUERY_SELECT_NEEDED_ELECTIONS: &str = r#" WITH target_date AS ( SELECT NOW()::date AS election_date ), expired_today AS ( DELETE FROM falukant_data.political_office AS po USING falukant_type.political_office_type AS pot WHERE po.office_type_id = pot.id AND (po.created_at + (pot.term_length * INTERVAL '1 day'))::date = (SELECT election_date FROM target_date) RETURNING pot.id AS office_type_id, po.region_id AS region_id ), gaps_per_region AS ( SELECT office_type_id, region_id, COUNT(*) AS gaps FROM expired_today GROUP BY office_type_id, region_id ), to_schedule AS ( SELECT g.office_type_id, g.region_id, g.gaps, td.election_date FROM gaps_per_region AS g CROSS JOIN target_date AS td WHERE NOT EXISTS ( SELECT 1 FROM falukant_data.election AS e WHERE e.office_type_id = g.office_type_id AND e.region_id = g.region_id AND e.date::date = td.election_date ) ), new_elections AS ( INSERT INTO falukant_data.election (office_type_id, date, posts_to_fill, created_at, updated_at, region_id) SELECT ts.office_type_id, ts.election_date, ts.gaps, NOW(), NOW(), ts.region_id FROM to_schedule AS ts RETURNING id AS election_id, region_id, posts_to_fill ) SELECT ne.election_id, ne.region_id, ne.posts_to_fill FROM new_elections AS ne ORDER BY ne.region_id, ne.election_id; "#; const QUERY_INSERT_CANDIDATES: &str = r#" INSERT INTO falukant_data.candidate (election_id, character_id, created_at, updated_at) SELECT $1 AS election_id, sub.id AS character_id, NOW() AS created_at, NOW() AS updated_at FROM ( WITH RECURSIVE region_tree AS ( SELECT r.id FROM falukant_data.region AS r WHERE r.id = $2 UNION ALL SELECT r2.id FROM falukant_data.region AS r2 JOIN region_tree AS rt ON r2.parent_id = rt.id ) SELECT ch.id FROM falukant_data.character AS ch JOIN region_tree AS rt2 ON ch.region_id = rt2.id WHERE ch.user_id IS NULL AND ch.birthdate <= NOW() - INTERVAL '21 days' AND ch.title_of_nobility IN ( SELECT id FROM falukant_type.title WHERE label_tr != 'noncivil' ) ORDER BY RANDOM() LIMIT ($3 * 2) ) AS sub(id); "#; const QUERY_PROCESS_EXPIRED_AND_FILL: &str = r#" WITH expired_offices AS ( DELETE FROM falukant_data.political_office AS po USING falukant_type.political_office_type AS pot WHERE po.office_type_id = pot.id AND (po.created_at + (pot.term_length * INTERVAL '1 day')) <= NOW() RETURNING pot.id AS office_type_id, po.region_id AS region_id ), distinct_types AS ( SELECT DISTINCT office_type_id, region_id FROM expired_offices ), votes_per_candidate AS ( SELECT dt.office_type_id, dt.region_id, c.character_id, COUNT(v.id) AS vote_count FROM distinct_types AS dt JOIN falukant_data.election AS e ON e.office_type_id = dt.office_type_id JOIN falukant_data.vote AS v ON v.election_id = e.id JOIN falukant_data.candidate AS c ON c.election_id = e.id AND c.id = v.candidate_id WHERE e.date >= (NOW() - INTERVAL '30 days') GROUP BY dt.office_type_id, dt.region_id, c.character_id ), ranked_winners AS ( SELECT vpc.office_type_id, vpc.region_id, vpc.character_id, ROW_NUMBER() OVER ( PARTITION BY vpc.office_type_id, vpc.region_id ORDER BY vpc.vote_count DESC ) AS rn FROM votes_per_candidate AS vpc ), selected_winners AS ( SELECT rw.office_type_id, rw.region_id, rw.character_id FROM ranked_winners AS rw JOIN falukant_type.political_office_type AS pot ON pot.id = rw.office_type_id WHERE rw.rn <= pot.seats_per_region ), insert_winners AS ( INSERT INTO falukant_data.political_office (office_type_id, character_id, created_at, updated_at, region_id) SELECT sw.office_type_id, sw.character_id, NOW(), NOW(), sw.region_id FROM selected_winners AS sw RETURNING id AS new_office_id, office_type_id, character_id, region_id ), count_inserted AS ( SELECT office_type_id, region_id, COUNT(*) AS inserted_count FROM insert_winners GROUP BY office_type_id, region_id ), needed_to_fill AS ( SELECT dt.office_type_id, dt.region_id, (pot.seats_per_region - COALESCE(ci.inserted_count, 0)) AS gaps FROM distinct_types AS dt JOIN falukant_type.political_office_type AS pot ON pot.id = dt.office_type_id LEFT JOIN count_inserted AS ci ON ci.office_type_id = dt.office_type_id AND ci.region_id = dt.region_id WHERE (pot.seats_per_region - COALESCE(ci.inserted_count, 0)) > 0 ), random_candidates AS ( SELECT rtf.office_type_id, rtf.region_id, ch.id AS character_id, ROW_NUMBER() OVER ( PARTITION BY rtf.office_type_id, rtf.region_id ORDER BY RANDOM() ) AS rn FROM needed_to_fill AS rtf JOIN falukant_data.character AS ch ON ch.region_id = rtf.region_id AND ch.user_id IS NULL AND ch.birthdate <= NOW() - INTERVAL '21 days' AND ch.title_of_nobility IN ( SELECT id FROM falukant_type.title WHERE label_tr != 'noncivil' ) AND NOT EXISTS ( SELECT 1 FROM falukant_data.political_office AS po2 JOIN falukant_type.political_office_type AS pot2 ON pot2.id = po2.office_type_id WHERE po2.character_id = ch.id AND (po2.created_at + (pot2.term_length * INTERVAL '1 day')) > NOW() + INTERVAL '2 days' ) ), insert_random AS ( INSERT INTO falukant_data.political_office (office_type_id, character_id, created_at, updated_at, region_id) SELECT rc.office_type_id, rc.character_id, NOW(), NOW(), rc.region_id FROM random_candidates AS rc JOIN needed_to_fill AS rtf ON rtf.office_type_id = rc.office_type_id AND rtf.region_id = rc.region_id WHERE rc.rn <= rtf.gaps RETURNING id AS new_office_id, office_type_id, character_id, region_id ) SELECT new_office_id AS office_id, office_type_id, character_id, region_id FROM insert_winners UNION ALL SELECT new_office_id AS office_id, office_type_id, character_id, region_id FROM insert_random; "#; const QUERY_USERS_IN_CITIES_OF_REGIONS: &str = r#" WITH RECURSIVE region_tree AS ( SELECT id FROM falukant_data.region WHERE id = $1 UNION ALL SELECT r2.id FROM falukant_data.region AS r2 JOIN region_tree AS rt ON r2.parent_id = rt.id ) SELECT DISTINCT ch.user_id FROM falukant_data.character AS ch JOIN region_tree AS rt2 ON ch.region_id = rt2.id WHERE ch.user_id IS NOT NULL; "#; const QUERY_NOTIFY_OFFICE_EXPIRATION: &str = r#" INSERT INTO falukant_log.notification (user_id, tr, created_at, updated_at) SELECT po.character_id, 'notify_office_expiring', NOW(), NOW() FROM falukant_data.political_office AS po JOIN falukant_type.political_office_type AS pot ON po.office_type_id = pot.id WHERE (po.created_at + (pot.term_length * INTERVAL '1 day')) BETWEEN (NOW() + INTERVAL '2 days') AND (NOW() + INTERVAL '2 days' + INTERVAL '1 second'); "#; const QUERY_NOTIFY_ELECTION_CREATED: &str = r#" INSERT INTO falukant_log.notification (user_id, tr, created_at, updated_at) VALUES ($1, 'notify_election_created', NOW(), NOW()); "#; const QUERY_NOTIFY_OFFICE_FILLED: &str = r#" INSERT INTO falukant_log.notification (user_id, tr, created_at, updated_at) VALUES ($1, 'notify_office_filled', NOW(), NOW()); "#; const QUERY_GET_USERS_WITH_EXPIRING_OFFICES: &str = r#" SELECT DISTINCT ch.user_id FROM falukant_data.political_office AS po JOIN falukant_type.political_office_type AS pot ON po.office_type_id = pot.id JOIN falukant_data.character AS ch ON po.character_id = ch.id WHERE ch.user_id IS NOT NULL AND (po.created_at + (pot.term_length * INTERVAL '1 day')) BETWEEN (NOW() + INTERVAL '2 days') AND (NOW() + INTERVAL '2 days' + INTERVAL '1 second'); "#; const QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS: &str = r#" SELECT DISTINCT ch.user_id FROM falukant_data.election AS e JOIN falukant_data.character AS ch ON ch.region_id = e.region_id WHERE ch.user_id IS NOT NULL AND e.date >= NOW() - INTERVAL '1 day'; "#; const QUERY_GET_USERS_WITH_FILLED_OFFICES: &str = r#" SELECT DISTINCT ch.user_id FROM falukant_data.political_office AS po JOIN falukant_data.character AS ch ON po.character_id = ch.id WHERE ch.user_id IS NOT NULL AND po.created_at >= NOW() - INTERVAL '1 minute'; "#; const QUERY_PROCESS_ELECTIONS: &str = r#" SELECT office_id, office_type_id, character_id, region_id FROM falukant_data.process_elections(); "#; /// Schneidet für alle Amtstyp/Region-Kombinationen überzählige Einträge in /// `falukant_data.political_office` ab, so dass höchstens /// `seats_per_region` Ämter pro Kombination übrig bleiben. /// /// Die Auswahl, welche Ämter entfernt werden, erfolgt deterministisch über /// `created_at DESC`: die **neuesten** Ämter bleiben bevorzugt im Amt, /// ältere Einträge werden zuerst entfernt. Damit lässt sich das Verhalten /// später leicht anpassen (z. B. nach bestimmten Prioritäten). const QUERY_TRIM_EXCESS_OFFICES_GLOBAL: &str = r#" WITH seats AS ( SELECT pot.id AS office_type_id, rt.id AS region_id, pot.seats_per_region AS seats_total FROM falukant_type.political_office_type AS pot JOIN falukant_type.region AS rt ON pot.region_type = rt.label_tr ), ranked AS ( SELECT po.id, po.office_type_id, po.region_id, s.seats_total, ROW_NUMBER() OVER ( PARTITION BY po.office_type_id, po.region_id ORDER BY po.created_at DESC ) AS rn FROM falukant_data.political_office AS po JOIN seats AS s ON s.office_type_id = po.office_type_id AND s.region_id = po.region_id ), to_delete AS ( SELECT id FROM ranked WHERE rn > seats_total ) DELETE FROM falukant_data.political_office WHERE id IN (SELECT id FROM to_delete); "#; impl PoliticsWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("PoliticsWorker", pool, broker), } } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { let mut last_execution: Option = None; while state.running_worker.load(Ordering::Relaxed) { let now = Instant::now(); let should_run = match last_execution { None => true, Some(prev) => now.saturating_duration_since(prev) >= Duration::from_secs(24 * 3600), }; if should_run { if let Err(err) = Self::perform_daily_politics_task(&pool, &broker) { eprintln!("[PoliticsWorker] Fehler bei performDailyPoliticsTask: {err}"); } last_execution = Some(now); } // Entspricht ungefähr der 5-Sekunden-Schleife im C++-Code for _ in 0..5 { if !state.running_worker.load(Ordering::Relaxed) { break; } std::thread::sleep(Duration::from_secs(1)); } } } fn perform_daily_politics_task( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { // 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur) let _ = Self::evaluate_political_positions(pool)?; // 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen, // ohne bestehende Amtsinhaber bei Reduktion zu entfernen. let _ = Self::sync_offices_with_types(pool)?; // 3) Ämter, die bald auslaufen, benachrichtigen Self::notify_office_expirations(pool, broker)?; // 4) Abgelaufene Ämter verarbeiten und neue besetzen let new_offices_direct = Self::process_expired_offices_and_fill(pool)?; if !new_offices_direct.is_empty() { Self::notify_office_filled(pool, broker, &new_offices_direct)?; } // 5) Neue Wahlen planen und Kandidaten eintragen let elections = Self::schedule_elections(pool)?; if !elections.is_empty() { Self::insert_candidates_for_elections(pool, &elections)?; // Benachrichtige User in betroffenen Regionen let region_ids: HashSet = elections.iter().map(|e| e.region_id).collect(); let user_ids = Self::get_user_ids_in_cities_of_regions(pool, ®ion_ids)?; Self::notify_election_created(pool, broker, &user_ids)?; } // 6) Wahlen auswerten und neu besetzte Ämter melden let new_offices_from_elections = Self::process_elections(pool)?; if !new_offices_from_elections.is_empty() { Self::notify_office_filled(pool, broker, &new_offices_from_elections)?; } // 7) Als letzter Schritt sicherstellen, dass es für keinen // Amtstyp/Region-Kombi mehr besetzte Ämter gibt als laut // `seats_per_region` erlaubt. Dieser Abgleich wird nach allen // Lösch- und Besetzungsvorgängen ausgeführt. Self::trim_excess_offices_global(pool)?; Ok(()) } fn evaluate_political_positions( pool: &ConnectionPool, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "count_offices_per_region", QUERY_COUNT_OFFICES_PER_REGION, )?; let rows = conn.execute("count_offices_per_region", &[])?; let mut result = Vec::with_capacity(rows.len()); for row in rows { let region_id = parse_i32(&row, "region_id", -1); let required = parse_i32(&row, "required_count", 0); let occupied = parse_i32(&row, "occupied_count", 0); if region_id >= 0 { let oc = OfficeCounts { region_id, required, occupied, }; // Felder aktiv nutzen: einfache Debug-Ausgabe kann später // durch Logging ersetzt werden. eprintln!( "[PoliticsWorker] Region {}: required={}, occupied={}", oc.region_id, oc.required, oc.occupied ); result.push(oc); } } Ok(result) } /// Entfernt global alle überzähligen Ämter in Relation zur /// konfigurierten Sitzzahl pro Amtstyp/Region. fn trim_excess_offices_global(pool: &ConnectionPool) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "trim_excess_offices_global", QUERY_TRIM_EXCESS_OFFICES_GLOBAL, )?; if let Err(err) = conn.execute("trim_excess_offices_global", &[]) { eprintln!( "[PoliticsWorker] Fehler bei trim_excess_offices_global: {err}" ); } Ok(()) } /// Gleicht die konfigurierte Anzahl der Ämter (seats_per_region pro /// Amtstyp/Region) mit den tatsächlich vorhandenen Einträgen in /// `falukant_data.political_office` ab und legt für fehlende Sitze /// zusätzliche Wahlen an. /// /// Wichtig: Wenn `seats_per_region` gesenkt wurde und damit aktuell /// mehr Amtsinhaber existieren als Sitze vorgesehen sind, werden /// **keine** Amtsinhaber entfernt oder Wahlen zum Abbau erzeugt – die /// entsprechenden Kombinationen tauchen schlicht nicht in der Gaps‑Liste /// auf. Erst wenn durch natürliches Auslaufen weniger Ämter existieren /// als vorgesehen, entstehen wieder Gaps. fn sync_offices_with_types(pool: &ConnectionPool) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("find_office_gaps", QUERY_FIND_OFFICE_GAPS)?; let rows = conn.execute("find_office_gaps", &[])?; if rows.is_empty() { return Ok(()); } let mut gaps: Vec = Vec::with_capacity(rows.len()); for row in rows { let office_type_id = parse_i32(&row, "office_type_id", -1); let region_id = parse_i32(&row, "region_id", -1); let gaps_count = parse_i32(&row, "gaps", 0); if office_type_id >= 0 && region_id >= 0 && gaps_count > 0 { gaps.push(OfficeGap { office_type_id, region_id, gaps: gaps_count, }); } } if gaps.is_empty() { return Ok(()); } // Für jede Lücke eine Wahl am aktuellen Tag anlegen, sofern es nicht // bereits eine Wahl für diesen Amtstyp/Region an einem heutigen oder // zukünftigen Datum gibt. Das eigentliche Anlegen der Wahl und das // Eintragen von Kandidaten läuft weiterhin über die bestehende // Logik in der Datenbank (process_elections / schedule_elections). // // Hier nutzen wir bewusst eine einfache, wiederholbare Logik im // Rust‑Code, statt alles in eine riesige DB‑Funktion zu packen. // Wir lehnen uns an die Struktur von `schedule_elections` an, // erzeugen aber unsere eigenen Einträge in `falukant_data.election`. let insert_sql = r#" INSERT INTO falukant_data.election (office_type_id, date, posts_to_fill, created_at, updated_at, region_id) SELECT $1::int AS office_type_id, CURRENT_DATE AS date, $2::int AS posts_to_fill, NOW() AS created_at, NOW() AS updated_at, $3::int AS region_id WHERE NOT EXISTS ( SELECT 1 FROM falukant_data.election e WHERE e.office_type_id = $1::int AND e.region_id = $3::int AND e.date::date >= CURRENT_DATE ); "#; conn.prepare("insert_gap_election", insert_sql)?; for gap in gaps { // Sicherheitshalber nur positive Gaps berücksichtigen. if gap.gaps <= 0 { continue; } if let Err(err) = conn.execute( "insert_gap_election", &[&gap.office_type_id, &gap.gaps, &gap.region_id], ) { eprintln!( "[PoliticsWorker] Fehler beim Anlegen von Gap‑Wahlen \ (office_type_id={}, region_id={}): {err}", gap.office_type_id, gap.region_id ); } } Ok(()) } fn schedule_elections(pool: &ConnectionPool) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("select_needed_elections", QUERY_SELECT_NEEDED_ELECTIONS)?; let rows = conn.execute("select_needed_elections", &[])?; let mut elections = Vec::with_capacity(rows.len()); for row in rows { let election_id = parse_i32(&row, "election_id", -1); let region_id = parse_i32(&row, "region_id", -1); let posts_to_fill = parse_i32(&row, "posts_to_fill", 0); if election_id >= 0 && region_id >= 0 { elections.push(Election { election_id, region_id, posts_to_fill, }); } } Ok(elections) } fn insert_candidates_for_elections( pool: &ConnectionPool, elections: &[Election], ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("insert_candidates", QUERY_INSERT_CANDIDATES)?; for e in elections { conn.execute( "insert_candidates", &[&e.election_id, &e.region_id, &e.posts_to_fill], )?; } Ok(()) } fn process_expired_offices_and_fill( pool: &ConnectionPool, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("process_expired_and_fill", QUERY_PROCESS_EXPIRED_AND_FILL)?; let rows = conn.execute("process_expired_and_fill", &[])?; Ok(rows .into_iter() .filter_map(map_row_to_office) .collect()) } fn get_user_ids_in_cities_of_regions( pool: &ConnectionPool, region_ids: &HashSet, ) -> Result, DbError> { if region_ids.is_empty() { return Ok(Vec::new()); } let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_users_in_cities", QUERY_USERS_IN_CITIES_OF_REGIONS)?; let mut user_ids = Vec::new(); for rid in region_ids { let rows = conn.execute("get_users_in_cities", &[rid])?; for row in rows { if let Some(uid) = row.get("user_id").and_then(|v| v.parse::().ok()) { user_ids.push(uid); } } } Ok(user_ids) } fn notify_office_expirations( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("notify_office_expiration", QUERY_NOTIFY_OFFICE_EXPIRATION)?; conn.execute("notify_office_expiration", &[])?; conn.prepare( "get_users_with_expiring_offices", QUERY_GET_USERS_WITH_EXPIRING_OFFICES, )?; let rows = conn.execute("get_users_with_expiring_offices", &[])?; for row in rows { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); broker.publish(msg); } } Ok(()) } fn notify_election_created( pool: &ConnectionPool, broker: &MessageBroker, user_ids: &[i32], ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("notify_election_created", QUERY_NOTIFY_ELECTION_CREATED)?; for uid in user_ids { conn.execute("notify_election_created", &[uid])?; } conn.prepare( "get_users_in_regions_with_elections", QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS, )?; let rows = conn.execute("get_users_in_regions_with_elections", &[])?; for row in rows { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); broker.publish(msg); } } Ok(()) } fn notify_office_filled( pool: &ConnectionPool, broker: &MessageBroker, new_offices: &[Office], ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("notify_office_filled", QUERY_NOTIFY_OFFICE_FILLED)?; for office in new_offices { // Debug-Logging mit allen Feldern, damit sie aktiv genutzt werden eprintln!( "[PoliticsWorker] Office filled: id={}, type={}, character={}, region={}", office.office_id, office.office_type_id, office.character_id, office.region_id ); conn.execute("notify_office_filled", &[&office.character_id])?; } conn.prepare( "get_users_with_filled_offices", QUERY_GET_USERS_WITH_FILLED_OFFICES, )?; let rows = conn.execute("get_users_with_filled_offices", &[])?; for row in rows { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); broker.publish(msg); } } Ok(()) } fn process_elections(pool: &ConnectionPool) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("process_elections", QUERY_PROCESS_ELECTIONS)?; let rows = conn.execute("process_elections", &[])?; Ok(rows .into_iter() .filter_map(map_row_to_office) .collect()) } } impl Worker for PoliticsWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { PoliticsWorker::run_loop(pool.clone(), broker.clone(), state); }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } } fn parse_i32(row: &Row, key: &str, default: i32) -> i32 { row.get(key) .and_then(|v| v.parse::().ok()) .unwrap_or(default) } fn map_row_to_office(row: Row) -> Option { Some(Office { office_id: row.get("office_id")?.parse().ok()?, office_type_id: row.get("office_type_id")?.parse().ok()?, character_id: row.get("character_id")?.parse().ok()?, region_id: row.get("region_id")?.parse().ok()?, }) }