use crate::db::{ConnectionPool, DbError, Row}; use crate::message_broker::MessageBroker; use std::collections::{HashMap, HashSet}; use std::sync::atomic::Ordering; use std::sync::Arc; use std::time::{Duration, Instant}; use super::base::{BaseWorker, Worker, WorkerState}; use crate::worker::sql::{ QUERY_COUNT_OFFICES_PER_REGION, QUERY_FIND_OFFICE_GAPS, QUERY_SELECT_NEEDED_ELECTIONS, QUERY_INSERT_CANDIDATES, QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES, QUERY_PROCESS_EXPIRED_AND_FILL, QUERY_USERS_IN_CITIES_OF_REGIONS, QUERY_NOTIFY_OFFICE_EXPIRATION, QUERY_NOTIFY_ELECTION_CREATED, QUERY_NOTIFY_OFFICE_FILLED, QUERY_GET_USERS_WITH_EXPIRING_OFFICES, QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS, QUERY_GET_USERS_WITH_FILLED_OFFICES, QUERY_PROCESS_ELECTIONS, QUERY_TRIM_EXCESS_OFFICES_GLOBAL, QUERY_FIND_AVAILABLE_CHURCH_OFFICES, QUERY_FIND_CHURCH_SUPERVISOR, QUERY_GET_CHURCH_OFFICE_REQUIREMENTS, QUERY_CHECK_CHARACTER_ELIGIBILITY, QUERY_APPROVE_CHURCH_APPLICATION, QUERY_REJECT_CHURCH_APPLICATION, QUERY_CREATE_CHURCH_APPLICATION_JOB, QUERY_GET_CHARACTERS_FOR_CHURCH_OFFICE, QUERY_GET_OLD_PENDING_CHURCH_APPLICATIONS, QUERY_AUTO_APPROVE_CHURCH_APPLICATION, QUERY_COUNT_PENDING_CHURCH_APPS_BY_OFFICE_REGION, QUERY_GET_CHURCH_OFFICE_OCCUPIED_COUNT, QUERY_IS_CHARACTER_NPC, QUERY_GET_PENDING_CHURCH_APPLICATIONS_FOR_SCORING, QUERY_INTERIM_APPOINT_CHURCH_OFFICE, QUERY_UPDATE_CHARACTER_HIGHEST_CHURCH_FROM_OFFICE_TYPE, QUERY_FIND_INTERIM_CHURCH_NPC_CANDIDATE, QUERY_REMOVE_LOWER_CHURCH_OFFICES_FOR_CHARACTER, }; pub struct PoliticsWorker { base: BaseWorker, } #[derive(Debug, Clone)] struct OfficeCounts { region_id: i32, required: i32, occupied: i32, } #[derive(Debug, Clone)] struct Election { election_id: i32, region_id: i32, posts_to_fill: i32, } #[derive(Debug, Clone)] struct OfficeGap { office_type_id: i32, region_id: i32, gaps: i32, } #[derive(Debug, Clone)] struct Office { office_id: i32, office_type_id: i32, character_id: i32, region_id: i32, } #[derive(Debug, Clone)] struct AvailableChurchOffice { office_type_id: i32, hierarchy_level: i32, seats_per_region: i32, region_id: i32, occupied_seats: i32, } #[derive(Debug, Clone)] struct ChurchSupervisor { supervisor_character_id: i32, } /// Bis einschließlich dieser `hierarchy_level` (church_office_type): Interimsbesetzung ohne Vorgesetzten. const INTERIM_MAX_CHURCH_HIERARCHY: i32 = 6; struct ChurchAppScoreRow { application_id: i32, office_type_id: i32, applicant_character_id: i32, region_id: i32, seats_per_region: i32, score: f64, } // --- SQL-Konstanten (1:1 aus politics_worker.h übernommen) ------------------ impl PoliticsWorker { pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self { Self { base: BaseWorker::new("PoliticsWorker", pool, broker), } } fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc) { let mut last_execution: Option = None; let mut last_auto_approve_run: Option = None; while state.running_worker.load(Ordering::Relaxed) { let now = Instant::now(); let should_run = match last_execution { None => true, Some(prev) => now.saturating_duration_since(prev) >= Duration::from_secs(24 * 3600), }; if should_run { if let Err(err) = Self::perform_daily_politics_task(&pool, &broker) { eprintln!("[PoliticsWorker] Fehler bei performDailyPoliticsTask: {err}"); } last_execution = Some(now); } // Automatische Annahme alter Applications (stündlich) let should_run_auto_approve = match last_auto_approve_run { None => true, Some(prev) => { now.saturating_duration_since(prev) >= Duration::from_secs(3600) } }; if should_run_auto_approve { if let Err(err) = Self::auto_approve_old_church_applications(&pool, &broker) { eprintln!("[PoliticsWorker] Fehler bei auto_approve_old_church_applications: {err}"); } last_auto_approve_run = Some(now); } // Entspricht ungefähr der 5-Sekunden-Schleife im C++-Code for _ in 0..5 { if !state.running_worker.load(Ordering::Relaxed) { break; } std::thread::sleep(Duration::from_secs(1)); } } } fn perform_daily_politics_task( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { // 0) Täglich: Kirchenämter (NPC-Bewerbungen, NPC-Vorgesetzte, Interimsbesetzung) if let Err(err) = Self::perform_church_office_task(pool, broker) { eprintln!("[PoliticsWorker] Fehler bei perform_church_office_task: {err}"); } // 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur) Self::evaluate_political_positions(pool)?; // 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen, // ohne bestehende Amtsinhaber bei Reduktion zu entfernen. Self::sync_offices_with_types(pool)?; // 3) Ämter, die bald auslaufen, benachrichtigen Self::notify_office_expirations(pool, broker)?; // 4) Abgelaufene Ämter verarbeiten und neue besetzen let new_offices_direct = Self::process_expired_offices_and_fill(pool)?; if !new_offices_direct.is_empty() { Self::notify_office_filled(pool, broker, &new_offices_direct)?; } // 5) Neue Wahlen planen let new_elections = Self::schedule_elections(pool)?; // 6) Für alle Wahlen ohne Kandidaten (inkl. manuell // angelegter) Kandidaten eintragen. let mut elections_to_fill = Self::find_elections_needing_candidates(pool)?; // Die gerade neu angelegten Wahlen sind typischerweise auch // in obiger Liste enthalten. Falls nicht, fügen wir sie // sicherheitshalber hinzu. for e in new_elections.iter() { if !elections_to_fill.iter().any(|x| x.election_id == e.election_id) { elections_to_fill.push(e.clone()); } } if !elections_to_fill.is_empty() { Self::insert_candidates_for_elections(pool, &elections_to_fill)?; // Benachrichtige User in betroffenen Regionen let region_ids: HashSet = elections_to_fill.iter().map(|e| e.region_id).collect(); let user_ids = Self::get_user_ids_in_cities_of_regions(pool, ®ion_ids)?; Self::notify_election_created(pool, broker, &user_ids)?; } // 7) Wahlen auswerten und neu besetzte Ämter melden let new_offices_from_elections = Self::process_elections(pool)?; if !new_offices_from_elections.is_empty() { Self::notify_office_filled(pool, broker, &new_offices_from_elections)?; } // 8) Als letzter Schritt sicherstellen, dass es für keinen // Amtstyp/Region-Kombi mehr besetzte Ämter gibt als laut // `seats_per_region` erlaubt. Dieser Abgleich wird nach allen // Lösch- und Besetzungsvorgängen ausgeführt. Self::trim_excess_offices_global(pool)?; Ok(()) } fn evaluate_political_positions( pool: &ConnectionPool, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "count_offices_per_region", QUERY_COUNT_OFFICES_PER_REGION, ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare count_offices_per_region: {e}")))?; let rows = conn.execute("count_offices_per_region", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec count_offices_per_region: {e}")))?; let mut result = Vec::with_capacity(rows.len()); for row in rows { let region_id = parse_i32(&row, "region_id", -1); let required = parse_i32(&row, "required_count", 0); let occupied = parse_i32(&row, "occupied_count", 0); if region_id >= 0 { let oc = OfficeCounts { region_id, required, occupied, }; // Felder aktiv nutzen: einfache Debug-Ausgabe kann später // durch Logging ersetzt werden. eprintln!( "[PoliticsWorker] Region {}: required={}, occupied={}", oc.region_id, oc.required, oc.occupied ); result.push(oc); } } Ok(result) } /// Entfernt global alle überzähligen Ämter in Relation zur /// konfigurierten Sitzzahl pro Amtstyp/Region. fn trim_excess_offices_global(pool: &ConnectionPool) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "trim_excess_offices_global", QUERY_TRIM_EXCESS_OFFICES_GLOBAL, ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare trim_excess_offices_global: {e}")))?; if let Err(err) = conn.execute("trim_excess_offices_global", &[]) { eprintln!( "[PoliticsWorker] Fehler bei trim_excess_offices_global: {err}" ); } Ok(()) } /// Gleicht die konfigurierte Anzahl der Ämter (seats_per_region pro /// Amtstyp/Region) mit den tatsächlich vorhandenen Einträgen in /// `falukant_data.political_office` ab und legt für fehlende Sitze /// zusätzliche Wahlen an. /// /// Wichtig: Wenn `seats_per_region` gesenkt wurde und damit aktuell /// mehr Amtsinhaber existieren als Sitze vorgesehen sind, werden /// **keine** Amtsinhaber entfernt oder Wahlen zum Abbau erzeugt – die /// entsprechenden Kombinationen tauchen schlicht nicht in der Gaps‑Liste /// auf. Erst wenn durch natürliches Auslaufen weniger Ämter existieren /// als vorgesehen, entstehen wieder Gaps. fn sync_offices_with_types(pool: &ConnectionPool) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("find_office_gaps", QUERY_FIND_OFFICE_GAPS) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare find_office_gaps: {e}")))?; let rows = conn.execute("find_office_gaps", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec find_office_gaps: {e}")))?; if rows.is_empty() { return Ok(()); } let mut gaps: Vec = Vec::with_capacity(rows.len()); for row in rows { let office_type_id = parse_i32(&row, "office_type_id", -1); let region_id = parse_i32(&row, "region_id", -1); let gaps_count = parse_i32(&row, "gaps", 0); if office_type_id >= 0 && region_id >= 0 && gaps_count > 0 { gaps.push(OfficeGap { office_type_id, region_id, gaps: gaps_count, }); } } if gaps.is_empty() { return Ok(()); } // Für jede Lücke eine Wahl am aktuellen Tag anlegen, sofern es nicht // bereits eine Wahl für diesen Amtstyp/Region an einem heutigen oder // zukünftigen Datum gibt. Das eigentliche Anlegen der Wahl und das // Eintragen von Kandidaten läuft weiterhin über die bestehende // Logik in der Datenbank (process_elections / schedule_elections). // // Hier nutzen wir bewusst eine einfache, wiederholbare Logik im // Rust‑Code, statt alles in eine riesige DB‑Funktion zu packen. // Wir lehnen uns an die Struktur von `schedule_elections` an, // erzeugen aber unsere eigenen Einträge in `falukant_data.election`. let insert_sql = r#" INSERT INTO falukant_data.election (office_type_id, date, posts_to_fill, created_at, updated_at, region_id) SELECT $1::int AS office_type_id, CURRENT_DATE AS date, $2::int AS posts_to_fill, NOW() AS created_at, NOW() AS updated_at, $3::int AS region_id WHERE NOT EXISTS ( SELECT 1 FROM falukant_data.election e WHERE e.office_type_id = $1::int AND e.region_id = $3::int AND e.date::date >= CURRENT_DATE ); "#; conn.prepare("insert_gap_election", insert_sql)?; for gap in gaps { // Sicherheitshalber nur positive Gaps berücksichtigen. if gap.gaps <= 0 { continue; } if let Err(err) = conn.execute( "insert_gap_election", &[&gap.office_type_id, &gap.gaps, &gap.region_id], ) { eprintln!( "[PoliticsWorker] Fehler beim Anlegen von Gap‑Wahlen \ (office_type_id={}, region_id={}): {err}", gap.office_type_id, gap.region_id ); } } Ok(()) } fn schedule_elections(pool: &ConnectionPool) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("select_needed_elections", QUERY_SELECT_NEEDED_ELECTIONS) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare select_needed_elections: {e}")))?; let rows = conn.execute("select_needed_elections", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec select_needed_elections: {e}")))?; let mut elections = Vec::with_capacity(rows.len()); for row in rows { let election_id = parse_i32(&row, "election_id", -1); let region_id = parse_i32(&row, "region_id", -1); let posts_to_fill = parse_i32(&row, "posts_to_fill", 0); if election_id >= 0 && region_id >= 0 { elections.push(Election { election_id, region_id, posts_to_fill, }); } } Ok(elections) } /// Findet alle existierenden Wahlen (inkl. manuell angelegter), /// für die noch keine Kandidaten eingetragen wurden. fn find_elections_needing_candidates(pool: &ConnectionPool) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "select_elections_needing_candidates", QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES, ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare select_elections_needing_candidates: {e}")))?; let rows = conn.execute("select_elections_needing_candidates", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec select_elections_needing_candidates: {e}")))?; let mut elections = Vec::with_capacity(rows.len()); for row in rows { let election_id = parse_i32(&row, "election_id", -1); let region_id = parse_i32(&row, "region_id", -1); let posts_to_fill = parse_i32(&row, "posts_to_fill", 0); if election_id >= 0 && region_id >= 0 && posts_to_fill > 0 { elections.push(Election { election_id, region_id, posts_to_fill, }); } } Ok(elections) } fn insert_candidates_for_elections( pool: &ConnectionPool, elections: &[Election], ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("insert_candidates", QUERY_INSERT_CANDIDATES) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare insert_candidates: {e}")))?; for e in elections { conn.execute( "insert_candidates", &[&e.election_id, &e.region_id, &e.posts_to_fill], ).map_err(|err| DbError::new(format!("[PoliticsWorker] exec insert_candidates eid={} rid={}: {}", e.election_id, e.region_id, err)))?; } Ok(()) } fn process_expired_offices_and_fill( pool: &ConnectionPool, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("process_expired_and_fill", QUERY_PROCESS_EXPIRED_AND_FILL) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare process_expired_and_fill: {e}")))?; let rows = conn.execute("process_expired_and_fill", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec process_expired_and_fill: {e}")))?; Ok(rows .into_iter() .filter_map(map_row_to_office) .collect()) } fn get_user_ids_in_cities_of_regions( pool: &ConnectionPool, region_ids: &HashSet, ) -> Result, DbError> { if region_ids.is_empty() { return Ok(Vec::new()); } let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("get_users_in_cities", QUERY_USERS_IN_CITIES_OF_REGIONS)?; let mut user_ids = Vec::new(); for rid in region_ids { let rows = conn.execute("get_users_in_cities", &[rid])?; for row in rows { if let Some(uid) = row.get("user_id").and_then(|v| v.parse::().ok()) { user_ids.push(uid); } } } Ok(user_ids) } fn notify_office_expirations( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("notify_office_expiration", QUERY_NOTIFY_OFFICE_EXPIRATION) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_office_expiration: {e}")))?; conn.execute("notify_office_expiration", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_office_expiration: {e}")))?; conn.prepare( "get_users_with_expiring_offices", QUERY_GET_USERS_WITH_EXPIRING_OFFICES, ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_with_expiring_offices: {e}")))?; let rows = conn.execute("get_users_with_expiring_offices", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_with_expiring_offices: {e}")))?; for row in rows { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); broker.publish(msg); } } Ok(()) } fn notify_election_created( pool: &ConnectionPool, broker: &MessageBroker, user_ids: &[i32], ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("notify_election_created", QUERY_NOTIFY_ELECTION_CREATED) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_election_created: {e}")))?; for uid in user_ids { conn.execute("notify_election_created", &[uid]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_election_created uid={}: {}", uid, e)))?; } conn.prepare( "get_users_in_regions_with_elections", QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS, ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_in_regions_with_elections: {e}")))?; let rows = conn.execute("get_users_in_regions_with_elections", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_in_regions_with_elections: {e}")))?; for row in rows { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); broker.publish(msg); } } Ok(()) } fn notify_office_filled( pool: &ConnectionPool, broker: &MessageBroker, new_offices: &[Office], ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("notify_office_filled", QUERY_NOTIFY_OFFICE_FILLED) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_office_filled: {e}")))?; for office in new_offices { // Debug-Logging mit allen Feldern, damit sie aktiv genutzt werden eprintln!( "[PoliticsWorker] Office filled: id={}, type={}, character={}, region={}", office.office_id, office.office_type_id, office.character_id, office.region_id ); conn.execute("notify_office_filled", &[&office.character_id]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_office_filled office_id={} character_id={}: {}", office.office_id, office.character_id, e)))?; } conn.prepare( "get_users_with_filled_offices", QUERY_GET_USERS_WITH_FILLED_OFFICES, ).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_with_filled_offices: {e}")))?; let rows = conn.execute("get_users_with_filled_offices", &[]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_with_filled_offices: {e}")))?; for row in rows { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::().ok()) { let msg = format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); broker.publish(msg); } } Ok(()) } fn process_elections(pool: &ConnectionPool) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("process_elections", QUERY_PROCESS_ELECTIONS)?; let rows = conn.execute("process_elections", &[])?; Ok(rows .into_iter() .filter_map(map_row_to_office) .collect()) } /// Täglich: freie Sitze, Bewerbungen (Spieler bleiben offen), NPC-Vorgesetzte entscheiden per Score, /// NPC-Bewerbungen erzeugen, Interimsbesetzung ohne Vorgesetzten (niedrige Hierarchie). fn perform_church_office_task( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { eprintln!("[PoliticsWorker] Starte Church Office Task (täglich)"); let available_offices = Self::find_available_church_offices(pool)?; eprintln!( "[PoliticsWorker] Gefunden: {} verfügbare Church Office Positionen", available_offices.len() ); for office in &available_offices { if let Some(supervisor) = Self::find_church_supervisor(pool, office.region_id, office.office_type_id)? { Self::process_church_supervisor_queue(pool, broker, supervisor.supervisor_character_id)?; let remaining_seats = office.seats_per_region - office.occupied_seats; if remaining_seats > 0 { let pending_count = Self::count_pending_church_apps(pool, office.office_type_id, office.region_id)?; if pending_count < remaining_seats { let need = remaining_seats - pending_count; Self::create_church_application_jobs( pool, office.office_type_id, office.region_id, supervisor.supervisor_character_id, need, )?; } } } else if office.hierarchy_level <= INTERIM_MAX_CHURCH_HIERARCHY { Self::try_interim_church_appointment(pool, broker, office)?; } else { eprintln!( "[PoliticsWorker] Kein Supervisor, Interim deaktiviert (hierarchy_level={}): office_type_id={}, region_id={}", office.hierarchy_level, office.office_type_id, office.region_id ); } } eprintln!("[PoliticsWorker] Church Office Task abgeschlossen"); Ok(()) } fn publish_falukant_church_update(broker: &MessageBroker, user_id: i32, reason: &str) { let church = format!( r#"{{"event":"falukantUpdateChurch","user_id":{},"reason":"{}"}}"#, user_id, reason ); broker.publish(church); let status = format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id); broker.publish(status); } fn count_pending_church_apps( pool: &ConnectionPool, office_type_id: i32, region_id: i32, ) -> Result { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "cnt_pending_church", QUERY_COUNT_PENDING_CHURCH_APPS_BY_OFFICE_REGION, ) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare cnt_pending_church: {e}")))?; let rows = conn .execute("cnt_pending_church", &[&office_type_id, ®ion_id]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec cnt_pending_church: {e}")))?; Ok(rows .first() .and_then(|r| r.get("cnt")) .and_then(|v| v.parse::().ok()) .unwrap_or(0)) } fn get_church_occupied_count( pool: &ConnectionPool, office_type_id: i32, region_id: i32, ) -> Result { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "cnt_occ_church", QUERY_GET_CHURCH_OFFICE_OCCUPIED_COUNT, ) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare cnt_occ_church: {e}")))?; let rows = conn .execute("cnt_occ_church", &[&office_type_id, ®ion_id]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec cnt_occ_church: {e}")))?; Ok(rows .first() .and_then(|r| r.get("cnt")) .and_then(|v| v.parse::().ok()) .unwrap_or(0)) } fn church_candidate_score( supervisor_rep: f64, applicant_rep: f64, highest_ever: i32, current_max: i32, title_level: i32, age_days: i32, wait_days: f64, ) -> f64 { let age_years = age_days / 365; let age_bonus = if (25..=70).contains(&age_years) { 12.0 } else { 0.0 }; let base = (highest_ever as f64) * 4.0 + (current_max as f64) * 3.0 + applicant_rep * 0.45 + (title_level as f64) * 1.1 + age_bonus + wait_days * 0.15; let inf = (supervisor_rep / 100.0).clamp(0.0, 1.0); let noise = (1.0 - inf) * 28.0 * rand::random::(); base + noise } fn parse_pg_bool(v: Option<&str>) -> bool { match v { Some("t") | Some("true") => true, Some("f") | Some("false") => false, Some(s) => s.parse::().unwrap_or(false), None => false, } } fn character_eligible_for_church_office( pool: &ConnectionPool, character_id: i32, office_type_id: i32, ) -> Result { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "get_church_office_requirements", QUERY_GET_CHURCH_OFFICE_REQUIREMENTS, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare get_church_office_requirements: {e}" )) })?; conn.prepare( "check_character_eligibility", QUERY_CHECK_CHARACTER_ELIGIBILITY, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare check_character_eligibility: {e}" )) })?; let req_rows = conn .execute("get_church_office_requirements", &[&office_type_id]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec get_church_office_requirements: {e}" )) })?; for req_row in req_rows { let prerequisite_office_type_id = req_row .get("prerequisite_office_type_id") .and_then(|v| v.parse::().ok()); let min_title_level = req_row .get("min_title_level") .and_then(|v| v.parse::().ok()); let elig_rows = conn .execute( "check_character_eligibility", &[ &character_id, &prerequisite_office_type_id, &min_title_level, ], ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec check_character_eligibility: {e}" )) })?; for elig_row in elig_rows { let has_prerequisite = Self::parse_pg_bool(elig_row.get("has_prerequisite").map(|s| s.as_str())); let meets_title_requirement = Self::parse_pg_bool(elig_row.get("meets_title_requirement").map(|s| s.as_str())); if !has_prerequisite || !meets_title_requirement { return Ok(false); } } } Ok(true) } fn remove_lower_ranked_church_offices_for_character( pool: &ConnectionPool, character_id: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "rm_lower_church", QUERY_REMOVE_LOWER_CHURCH_OFFICES_FOR_CHARACTER, ) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare rm_lower_church: {e}")))?; conn.execute("rm_lower_church", &[&character_id]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec rm_lower_church: {e}")))?; Ok(()) } fn try_interim_church_appointment( pool: &ConnectionPool, broker: &MessageBroker, office: &AvailableChurchOffice, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "find_interim_npc", QUERY_FIND_INTERIM_CHURCH_NPC_CANDIDATE, ) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare find_interim_npc: {e}")))?; let rows = conn .execute("find_interim_npc", &[&office.region_id, &office.office_type_id]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec find_interim_npc: {e}")))?; let candidate_id = rows .first() .and_then(|r| r.get("character_id")) .and_then(|v| v.parse::().ok()) .unwrap_or(-1); if candidate_id < 0 { return Ok(()); } if !Self::character_eligible_for_church_office(pool, candidate_id, office.office_type_id)? { return Ok(()); } drop(conn); let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "interim_ins", QUERY_INTERIM_APPOINT_CHURCH_OFFICE, ) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare interim_ins: {e}")))?; let ins = conn .execute( "interim_ins", &[ &office.office_type_id, &candidate_id, &office.region_id, &office.seats_per_region, ], ) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec interim_ins: {e}")))?; if ins.is_empty() { return Ok(()); } conn.prepare( "upd_hi_interim", QUERY_UPDATE_CHARACTER_HIGHEST_CHURCH_FROM_OFFICE_TYPE, ) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare upd_hi_interim: {e}")))?; conn.execute("upd_hi_interim", &[&candidate_id, &office.office_type_id]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec upd_hi_interim: {e}")))?; Self::remove_lower_ranked_church_offices_for_character(pool, candidate_id)?; eprintln!( "[PoliticsWorker] Interims-Kirchenamt: character_id={}, office_type_id={}, region_id={}", candidate_id, office.office_type_id, office.region_id ); if let Some(uid) = Self::get_user_id_for_character(pool, candidate_id)? { Self::publish_falukant_church_update(broker, uid, "vacancy_fill"); } Ok(()) } fn find_available_church_offices( pool: &ConnectionPool, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "find_available_church_offices", QUERY_FIND_AVAILABLE_CHURCH_OFFICES, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare find_available_church_offices: {e}" )) })?; let rows = conn .execute("find_available_church_offices", &[]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec find_available_church_offices: {e}" )) })?; let mut offices = Vec::new(); for row in rows { let office_type_id = parse_i32(&row, "office_type_id", -1); let hierarchy_level = parse_i32(&row, "hierarchy_level", 99); let region_id = parse_i32(&row, "region_id", -1); let seats_per_region = parse_i32(&row, "seats_per_region", 0); let occupied_seats = parse_i32(&row, "occupied_seats", 0); if office_type_id >= 0 && region_id >= 0 { offices.push(AvailableChurchOffice { office_type_id, hierarchy_level, seats_per_region, region_id, occupied_seats, }); } } Ok(offices) } fn find_church_supervisor( pool: &ConnectionPool, region_id: i32, office_type_id: i32, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("find_church_supervisor", QUERY_FIND_CHURCH_SUPERVISOR) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare find_church_supervisor: {e}" )) })?; let rows = conn .execute("find_church_supervisor", &[®ion_id, &office_type_id]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec find_church_supervisor: {e}" )) })?; for row in rows { let supervisor_character_id = parse_i32(&row, "supervisor_character_id", -1); if supervisor_character_id >= 0 { return Ok(Some(ChurchSupervisor { supervisor_character_id, })); } } Ok(None) } /// Spieler-Vorgesetzte: nur Event (keine automatische Entscheidung). NPC: Score-basierte Auswahl. fn process_church_supervisor_queue( pool: &ConnectionPool, broker: &MessageBroker, supervisor_id: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare("is_npc_sup", QUERY_IS_CHARACTER_NPC) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare is_npc_sup: {e}")))?; let rows = conn .execute("is_npc_sup", &[&supervisor_id]) .map_err(|e| DbError::new(format!("[PoliticsWorker] exec is_npc_sup: {e}")))?; let is_npc = rows .first() .and_then(|r| r.get("is_npc")) .map(|v| v == "t" || v == "true") .unwrap_or(false); if !is_npc { if let Some(uid) = Self::get_user_id_for_character(pool, supervisor_id)? { Self::publish_falukant_church_update(broker, uid, "applications"); } return Ok(()); } Self::npc_resolve_church_applications_for_supervisor(pool, broker, supervisor_id) } fn npc_resolve_church_applications_for_supervisor( pool: &ConnectionPool, broker: &MessageBroker, supervisor_id: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "score_church_apps", QUERY_GET_PENDING_CHURCH_APPLICATIONS_FOR_SCORING, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare score_church_apps: {e}" )) })?; let rows = conn .execute("score_church_apps", &[&supervisor_id]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec score_church_apps: {e}" )) })?; conn.prepare("reject_church_application", QUERY_REJECT_CHURCH_APPLICATION) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare reject_church_application: {e}" )) })?; conn.prepare("approve_church_application", QUERY_APPROVE_CHURCH_APPLICATION) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare approve_church_application: {e}" )) })?; let mut scored: Vec = Vec::new(); for row in rows { let application_id = parse_i32(&row, "application_id", -1); let office_type_id = parse_i32(&row, "office_type_id", -1); let applicant_character_id = parse_i32(&row, "applicant_character_id", -1); let region_id = parse_i32(&row, "region_id", -1); let seats_per_region = parse_i32(&row, "seats_per_region", 1); let supervisor_reputation = row .get("supervisor_reputation") .and_then(|v| v.parse::().ok()) .unwrap_or(50.0); let applicant_reputation = row .get("applicant_reputation") .and_then(|v| v.parse::().ok()) .unwrap_or(50.0); let applicant_highest_ever = parse_i32(&row, "applicant_highest_ever", 0); let applicant_title_level = parse_i32(&row, "applicant_title_level", 0); let applicant_current_max = parse_i32(&row, "applicant_current_max_hierarchy", 0); let applicant_age_days = parse_i32(&row, "applicant_age_days", 0); if application_id < 0 || applicant_character_id < 0 { continue; } if !Self::character_eligible_for_church_office(pool, applicant_character_id, office_type_id)? { conn.execute("reject_church_application", &[&application_id]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec reject_church_application: {e}" )) })?; eprintln!( "[PoliticsWorker] Church Application {} abgelehnt (NPC: nicht qualifiziert)", application_id ); continue; } let wait_days = 0.0_f64; let score = Self::church_candidate_score( supervisor_reputation, applicant_reputation, applicant_highest_ever, applicant_current_max, applicant_title_level, applicant_age_days, wait_days, ); scored.push(ChurchAppScoreRow { application_id, office_type_id, applicant_character_id, region_id, seats_per_region, score, }); } drop(conn); let mut groups: HashMap<(i32, i32), Vec> = HashMap::new(); for s in scored { groups .entry((s.office_type_id, s.region_id)) .or_default() .push(s); } for ((_ot, _reg), mut group) in groups { group.sort_by(|a, b| { b.score .partial_cmp(&a.score) .unwrap_or(std::cmp::Ordering::Equal) }); let seats = group.first().map(|g| g.seats_per_region).unwrap_or(1); let office_type_id = group.first().map(|g| g.office_type_id).unwrap_or(-1); let region_id = group.first().map(|g| g.region_id).unwrap_or(-1); if office_type_id < 0 || region_id < 0 { continue; } let mut occupied = Self::get_church_occupied_count(pool, office_type_id, region_id)?; let mut approved_here = 0usize; for app in group { if occupied >= seats { break; } let approve_rows = { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; conn.prepare( "approve_church_application", QUERY_APPROVE_CHURCH_APPLICATION, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare approve_church_application: {e}" )) })?; conn.execute("approve_church_application", &[&app.application_id]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec approve_church_application: {e}" )) })? }; if !approve_rows.is_empty() { approved_here += 1; occupied += 1; eprintln!( "[PoliticsWorker] Church Application {} genehmigt (NPC-Score, office_type_id={}, character_id={})", app.application_id, app.office_type_id, app.applicant_character_id ); if let Some(uid) = Self::get_user_id_for_character(pool, app.applicant_character_id)? { Self::publish_falukant_church_update(broker, uid, "npc_decision"); } } } if approved_here > 0 { eprintln!( "[PoliticsWorker] NPC-Kirche: {} Zusagen für office_type_id={}, region_id={}", approved_here, office_type_id, region_id ); } } Ok(()) } fn create_church_application_jobs( pool: &ConnectionPool, office_type_id: i32, region_id: i32, supervisor_id: i32, count: i32, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; // Charaktere für diese Region finden conn.prepare( "get_characters_for_church_office", QUERY_GET_CHARACTERS_FOR_CHURCH_OFFICE, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare get_characters_for_church_office: {e}" )) })?; let rows = conn .execute("get_characters_for_church_office", &[®ion_id, &count]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec get_characters_for_church_office: {e}" )) })?; conn.prepare( "create_church_application_job", QUERY_CREATE_CHURCH_APPLICATION_JOB, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare create_church_application_job: {e}" )) })?; let mut created = 0; for row in rows { if let Some(character_id) = row .get("character_id") .and_then(|v| v.parse::().ok()) { let app_rows = conn .execute( "create_church_application_job", &[&office_type_id, &character_id, ®ion_id, &supervisor_id], ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec create_church_application_job: {e}" )) })?; if !app_rows.is_empty() { created += 1; eprintln!( "[PoliticsWorker] Church Application Job erstellt: office_type_id={}, character_id={}, region_id={}", office_type_id, character_id, region_id ); } } } eprintln!( "[PoliticsWorker] {} Church Application Jobs erstellt für office_type_id={}, region_id={}", created, office_type_id, region_id ); Ok(()) } /// Automatische Annahme von Church Applications, die älter als 36 Stunden sind fn auto_approve_old_church_applications( pool: &ConnectionPool, broker: &MessageBroker, ) -> Result<(), DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; // Alle alten pending Applications abrufen conn.prepare( "get_old_pending_church_applications", QUERY_GET_OLD_PENDING_CHURCH_APPLICATIONS, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare get_old_pending_church_applications: {e}" )) })?; let rows = conn .execute("get_old_pending_church_applications", &[]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec get_old_pending_church_applications: {e}" )) })?; let mut old_applications = Vec::new(); for row in rows { let application_id = parse_i32(&row, "application_id", -1); let character_id = parse_i32(&row, "character_id", -1); if application_id >= 0 { old_applications.push((application_id, character_id)); } } if old_applications.is_empty() { return Ok(()); } eprintln!( "[PoliticsWorker] Gefunden: {} alte Church Applications (36h+), automatische Annahme", old_applications.len() ); // Prepare auto-approve query conn.prepare( "auto_approve_church_application", QUERY_AUTO_APPROVE_CHURCH_APPLICATION, ) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] prepare auto_approve_church_application: {e}" )) })?; // Alle alten Applications automatisch annehmen for (application_id, character_id) in &old_applications { let approve_rows = conn .execute("auto_approve_church_application", &[application_id]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec auto_approve_church_application: {e}" )) })?; if !approve_rows.is_empty() { eprintln!( "[PoliticsWorker] Church Application {} automatisch angenommen (36h+ alt, character_id={})", application_id, character_id ); if let Some(user_id) = Self::get_user_id_for_character(pool, *character_id)? { Self::publish_falukant_church_update(broker, user_id, "appointment"); } } } Ok(()) } fn get_user_id_for_character( pool: &ConnectionPool, character_id: i32, ) -> Result, DbError> { let mut conn = pool .get() .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; let query = "SELECT user_id FROM falukant_data.character WHERE id = $1"; conn.prepare("get_user_id_for_character", query) .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_user_id_for_character: {e}")))?; let rows = conn .execute("get_user_id_for_character", &[&character_id]) .map_err(|e| { DbError::new(format!( "[PoliticsWorker] exec get_user_id_for_character: {e}" )) })?; for row in rows { if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::().ok()) { return Ok(Some(user_id)); } } Ok(None) } } impl Worker for PoliticsWorker { fn start_worker_thread(&mut self) { let pool = self.base.pool.clone(); let broker = self.base.broker.clone(); self.base .start_worker_with_loop(move |state: Arc| { PoliticsWorker::run_loop(pool.clone(), broker.clone(), state); }); } fn stop_worker_thread(&mut self) { self.base.stop_worker(); } fn enable_watchdog(&mut self) { self.base.start_watchdog(); } } fn parse_i32(row: &Row, key: &str, default: i32) -> i32 { row.get(key) .and_then(|v| v.parse::().ok()) .unwrap_or(default) } fn map_row_to_office(row: Row) -> Option { Some(Office { office_id: row.get("office_id")?.parse().ok()?, office_type_id: row.get("office_type_id")?.parse().ok()?, character_id: row.get("character_id")?.parse().ok()?, region_id: row.get("region_id")?.parse().ok()?, }) }