1128 lines
41 KiB
Rust
1128 lines
41 KiB
Rust
use crate::db::{ConnectionPool, DbError, Row};
|
||
use crate::message_broker::MessageBroker;
|
||
use std::collections::HashSet;
|
||
use std::sync::atomic::Ordering;
|
||
use std::sync::Arc;
|
||
use std::time::{Duration, Instant};
|
||
use chrono::{Local, Timelike};
|
||
|
||
use super::base::{BaseWorker, Worker, WorkerState};
|
||
use crate::worker::sql::{
|
||
QUERY_COUNT_OFFICES_PER_REGION,
|
||
QUERY_FIND_OFFICE_GAPS,
|
||
QUERY_SELECT_NEEDED_ELECTIONS,
|
||
QUERY_INSERT_CANDIDATES,
|
||
QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES,
|
||
QUERY_PROCESS_EXPIRED_AND_FILL,
|
||
QUERY_USERS_IN_CITIES_OF_REGIONS,
|
||
QUERY_NOTIFY_OFFICE_EXPIRATION,
|
||
QUERY_NOTIFY_ELECTION_CREATED,
|
||
QUERY_NOTIFY_OFFICE_FILLED,
|
||
QUERY_GET_USERS_WITH_EXPIRING_OFFICES,
|
||
QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS,
|
||
QUERY_GET_USERS_WITH_FILLED_OFFICES,
|
||
QUERY_PROCESS_ELECTIONS,
|
||
QUERY_TRIM_EXCESS_OFFICES_GLOBAL,
|
||
QUERY_FIND_AVAILABLE_CHURCH_OFFICES,
|
||
QUERY_FIND_CHURCH_SUPERVISOR,
|
||
QUERY_GET_CHURCH_OFFICE_REQUIREMENTS,
|
||
QUERY_GET_PENDING_CHURCH_APPLICATIONS,
|
||
QUERY_CHECK_CHARACTER_ELIGIBILITY,
|
||
QUERY_APPROVE_CHURCH_APPLICATION,
|
||
QUERY_REJECT_CHURCH_APPLICATION,
|
||
QUERY_CREATE_CHURCH_APPLICATION_JOB,
|
||
QUERY_GET_CHARACTERS_FOR_CHURCH_OFFICE,
|
||
QUERY_REMOVE_LOWER_RANKED_POLITICAL_OFFICES,
|
||
QUERY_REMOVE_LOWER_RANKED_CHURCH_OFFICES,
|
||
};
|
||
|
||
pub struct PoliticsWorker {
|
||
base: BaseWorker,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct OfficeCounts {
|
||
region_id: i32,
|
||
required: i32,
|
||
occupied: i32,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct Election {
|
||
election_id: i32,
|
||
region_id: i32,
|
||
posts_to_fill: i32,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct OfficeGap {
|
||
office_type_id: i32,
|
||
region_id: i32,
|
||
gaps: i32,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct Office {
|
||
office_id: i32,
|
||
office_type_id: i32,
|
||
character_id: i32,
|
||
region_id: i32,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct AvailableChurchOffice {
|
||
office_type_id: i32,
|
||
seats_per_region: i32,
|
||
region_id: i32,
|
||
occupied_seats: i32,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct ChurchSupervisor {
|
||
supervisor_character_id: i32,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct ChurchOfficeRequirement {
|
||
prerequisite_office_type_id: Option<i32>,
|
||
min_title_level: Option<i32>,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct ChurchApplication {
|
||
application_id: i32,
|
||
office_type_id: i32,
|
||
applicant_character_id: i32,
|
||
}
|
||
|
||
// --- SQL-Konstanten (1:1 aus politics_worker.h übernommen) ------------------
|
||
|
||
|
||
impl PoliticsWorker {
|
||
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
|
||
Self {
|
||
base: BaseWorker::new("PoliticsWorker", pool, broker),
|
||
}
|
||
}
|
||
|
||
fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc<WorkerState>) {
|
||
let mut last_execution: Option<Instant> = None;
|
||
let mut last_church_office_run: Option<Instant> = None;
|
||
|
||
while state.running_worker.load(Ordering::Relaxed) {
|
||
let now = Instant::now();
|
||
let should_run = match last_execution {
|
||
None => true,
|
||
Some(prev) => now.saturating_duration_since(prev) >= Duration::from_secs(24 * 3600),
|
||
};
|
||
|
||
if should_run {
|
||
if let Err(err) = Self::perform_daily_politics_task(&pool, &broker) {
|
||
eprintln!("[PoliticsWorker] Fehler bei performDailyPoliticsTask: {err}");
|
||
}
|
||
last_execution = Some(now);
|
||
}
|
||
|
||
// Church Office Job um 13 Uhr
|
||
if Self::is_time_13_00() {
|
||
let should_run_church = match last_church_office_run {
|
||
None => true,
|
||
Some(prev) => {
|
||
// Prüfe ob seit letztem Lauf mindestens 23 Stunden vergangen sind
|
||
// (um sicherzustellen, dass es nur einmal pro Tag läuft)
|
||
now.saturating_duration_since(prev) >= Duration::from_secs(23 * 3600)
|
||
}
|
||
};
|
||
|
||
if should_run_church {
|
||
if let Err(err) = Self::perform_church_office_task(&pool, &broker) {
|
||
eprintln!("[PoliticsWorker] Fehler bei performChurchOfficeTask: {err}");
|
||
}
|
||
last_church_office_run = Some(now);
|
||
}
|
||
}
|
||
|
||
// Entspricht ungefähr der 5-Sekunden-Schleife im C++-Code
|
||
for _ in 0..5 {
|
||
if !state.running_worker.load(Ordering::Relaxed) {
|
||
break;
|
||
}
|
||
std::thread::sleep(Duration::from_secs(1));
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Prüft ob die aktuelle Uhrzeit 13:00 ist (mit Toleranz von ±1 Minute)
|
||
fn is_time_13_00() -> bool {
|
||
let now = Local::now();
|
||
let hour = now.hour();
|
||
let minute = now.minute();
|
||
hour == 13 && minute <= 1
|
||
}
|
||
|
||
fn perform_daily_politics_task(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
) -> Result<(), DbError> {
|
||
// 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur)
|
||
Self::evaluate_political_positions(pool)?;
|
||
|
||
// 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen,
|
||
// ohne bestehende Amtsinhaber bei Reduktion zu entfernen.
|
||
Self::sync_offices_with_types(pool)?;
|
||
|
||
// 3) Ämter, die bald auslaufen, benachrichtigen
|
||
Self::notify_office_expirations(pool, broker)?;
|
||
|
||
// 4) Abgelaufene Ämter verarbeiten und neue besetzen
|
||
let new_offices_direct = Self::process_expired_offices_and_fill(pool)?;
|
||
if !new_offices_direct.is_empty() {
|
||
// Entferne niedrigerrangige politische Ämter wenn Character mehrere hat
|
||
Self::remove_lower_ranked_political_offices(pool)?;
|
||
Self::notify_office_filled(pool, broker, &new_offices_direct)?;
|
||
}
|
||
|
||
// 5) Neue Wahlen planen
|
||
let new_elections = Self::schedule_elections(pool)?;
|
||
|
||
// 6) Für alle Wahlen ohne Kandidaten (inkl. manuell
|
||
// angelegter) Kandidaten eintragen.
|
||
let mut elections_to_fill = Self::find_elections_needing_candidates(pool)?;
|
||
// Die gerade neu angelegten Wahlen sind typischerweise auch
|
||
// in obiger Liste enthalten. Falls nicht, fügen wir sie
|
||
// sicherheitshalber hinzu.
|
||
for e in new_elections.iter() {
|
||
if !elections_to_fill.iter().any(|x| x.election_id == e.election_id) {
|
||
elections_to_fill.push(e.clone());
|
||
}
|
||
}
|
||
|
||
if !elections_to_fill.is_empty() {
|
||
Self::insert_candidates_for_elections(pool, &elections_to_fill)?;
|
||
|
||
// Benachrichtige User in betroffenen Regionen
|
||
let region_ids: HashSet<i32> =
|
||
elections_to_fill.iter().map(|e| e.region_id).collect();
|
||
let user_ids =
|
||
Self::get_user_ids_in_cities_of_regions(pool, ®ion_ids)?;
|
||
Self::notify_election_created(pool, broker, &user_ids)?;
|
||
}
|
||
|
||
// 7) Wahlen auswerten und neu besetzte Ämter melden
|
||
let new_offices_from_elections = Self::process_elections(pool)?;
|
||
if !new_offices_from_elections.is_empty() {
|
||
// Entferne niedrigerrangige politische Ämter wenn Character mehrere hat
|
||
Self::remove_lower_ranked_political_offices(pool)?;
|
||
Self::notify_office_filled(pool, broker, &new_offices_from_elections)?;
|
||
}
|
||
|
||
// 8) Als letzter Schritt sicherstellen, dass es für keinen
|
||
// Amtstyp/Region-Kombi mehr besetzte Ämter gibt als laut
|
||
// `seats_per_region` erlaubt. Dieser Abgleich wird nach allen
|
||
// Lösch- und Besetzungsvorgängen ausgeführt.
|
||
Self::trim_excess_offices_global(pool)?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn evaluate_political_positions(
|
||
pool: &ConnectionPool,
|
||
) -> Result<Vec<OfficeCounts>, DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(
|
||
"count_offices_per_region",
|
||
QUERY_COUNT_OFFICES_PER_REGION,
|
||
).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare count_offices_per_region: {e}")))?;
|
||
let rows = conn.execute("count_offices_per_region", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec count_offices_per_region: {e}")))?;
|
||
|
||
let mut result = Vec::with_capacity(rows.len());
|
||
for row in rows {
|
||
let region_id = parse_i32(&row, "region_id", -1);
|
||
let required = parse_i32(&row, "required_count", 0);
|
||
let occupied = parse_i32(&row, "occupied_count", 0);
|
||
if region_id >= 0 {
|
||
let oc = OfficeCounts {
|
||
region_id,
|
||
required,
|
||
occupied,
|
||
};
|
||
// Felder aktiv nutzen: einfache Debug-Ausgabe kann später
|
||
// durch Logging ersetzt werden.
|
||
eprintln!(
|
||
"[PoliticsWorker] Region {}: required={}, occupied={}",
|
||
oc.region_id, oc.required, oc.occupied
|
||
);
|
||
result.push(oc);
|
||
}
|
||
}
|
||
|
||
Ok(result)
|
||
}
|
||
|
||
/// Entfernt global alle überzähligen Ämter in Relation zur
|
||
/// konfigurierten Sitzzahl pro Amtstyp/Region.
|
||
fn trim_excess_offices_global(pool: &ConnectionPool) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(
|
||
"trim_excess_offices_global",
|
||
QUERY_TRIM_EXCESS_OFFICES_GLOBAL,
|
||
).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare trim_excess_offices_global: {e}")))?;
|
||
|
||
if let Err(err) = conn.execute("trim_excess_offices_global", &[]) {
|
||
eprintln!(
|
||
"[PoliticsWorker] Fehler bei trim_excess_offices_global: {err}"
|
||
);
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// Gleicht die konfigurierte Anzahl der Ämter (seats_per_region pro
|
||
/// Amtstyp/Region) mit den tatsächlich vorhandenen Einträgen in
|
||
/// `falukant_data.political_office` ab und legt für fehlende Sitze
|
||
/// zusätzliche Wahlen an.
|
||
///
|
||
/// Wichtig: Wenn `seats_per_region` gesenkt wurde und damit aktuell
|
||
/// mehr Amtsinhaber existieren als Sitze vorgesehen sind, werden
|
||
/// **keine** Amtsinhaber entfernt oder Wahlen zum Abbau erzeugt – die
|
||
/// entsprechenden Kombinationen tauchen schlicht nicht in der Gaps‑Liste
|
||
/// auf. Erst wenn durch natürliches Auslaufen weniger Ämter existieren
|
||
/// als vorgesehen, entstehen wieder Gaps.
|
||
fn sync_offices_with_types(pool: &ConnectionPool) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("find_office_gaps", QUERY_FIND_OFFICE_GAPS)
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare find_office_gaps: {e}")))?;
|
||
let rows = conn.execute("find_office_gaps", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec find_office_gaps: {e}")))?;
|
||
|
||
if rows.is_empty() {
|
||
return Ok(());
|
||
}
|
||
|
||
let mut gaps: Vec<OfficeGap> = Vec::with_capacity(rows.len());
|
||
for row in rows {
|
||
let office_type_id = parse_i32(&row, "office_type_id", -1);
|
||
let region_id = parse_i32(&row, "region_id", -1);
|
||
let gaps_count = parse_i32(&row, "gaps", 0);
|
||
if office_type_id >= 0 && region_id >= 0 && gaps_count > 0 {
|
||
gaps.push(OfficeGap {
|
||
office_type_id,
|
||
region_id,
|
||
gaps: gaps_count,
|
||
});
|
||
}
|
||
}
|
||
|
||
if gaps.is_empty() {
|
||
return Ok(());
|
||
}
|
||
|
||
// Für jede Lücke eine Wahl am aktuellen Tag anlegen, sofern es nicht
|
||
// bereits eine Wahl für diesen Amtstyp/Region an einem heutigen oder
|
||
// zukünftigen Datum gibt. Das eigentliche Anlegen der Wahl und das
|
||
// Eintragen von Kandidaten läuft weiterhin über die bestehende
|
||
// Logik in der Datenbank (process_elections / schedule_elections).
|
||
//
|
||
// Hier nutzen wir bewusst eine einfache, wiederholbare Logik im
|
||
// Rust‑Code, statt alles in eine riesige DB‑Funktion zu packen.
|
||
|
||
// Wir lehnen uns an die Struktur von `schedule_elections` an,
|
||
// erzeugen aber unsere eigenen Einträge in `falukant_data.election`.
|
||
let insert_sql = r#"
|
||
INSERT INTO falukant_data.election
|
||
(office_type_id, date, posts_to_fill, created_at, updated_at, region_id)
|
||
SELECT
|
||
$1::int AS office_type_id,
|
||
CURRENT_DATE AS date,
|
||
$2::int AS posts_to_fill,
|
||
NOW() AS created_at,
|
||
NOW() AS updated_at,
|
||
$3::int AS region_id
|
||
WHERE NOT EXISTS (
|
||
SELECT 1
|
||
FROM falukant_data.election e
|
||
WHERE e.office_type_id = $1::int
|
||
AND e.region_id = $3::int
|
||
AND e.date::date >= CURRENT_DATE
|
||
);
|
||
"#;
|
||
|
||
conn.prepare("insert_gap_election", insert_sql)?;
|
||
|
||
for gap in gaps {
|
||
// Sicherheitshalber nur positive Gaps berücksichtigen.
|
||
if gap.gaps <= 0 {
|
||
continue;
|
||
}
|
||
|
||
if let Err(err) = conn.execute(
|
||
"insert_gap_election",
|
||
&[&gap.office_type_id, &gap.gaps, &gap.region_id],
|
||
) {
|
||
eprintln!(
|
||
"[PoliticsWorker] Fehler beim Anlegen von Gap‑Wahlen \
|
||
(office_type_id={}, region_id={}): {err}",
|
||
gap.office_type_id, gap.region_id
|
||
);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn schedule_elections(pool: &ConnectionPool) -> Result<Vec<Election>, DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("select_needed_elections", QUERY_SELECT_NEEDED_ELECTIONS)
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare select_needed_elections: {e}")))?;
|
||
let rows = conn.execute("select_needed_elections", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec select_needed_elections: {e}")))?;
|
||
|
||
let mut elections = Vec::with_capacity(rows.len());
|
||
for row in rows {
|
||
let election_id = parse_i32(&row, "election_id", -1);
|
||
let region_id = parse_i32(&row, "region_id", -1);
|
||
let posts_to_fill = parse_i32(&row, "posts_to_fill", 0);
|
||
if election_id >= 0 && region_id >= 0 {
|
||
elections.push(Election {
|
||
election_id,
|
||
region_id,
|
||
posts_to_fill,
|
||
});
|
||
}
|
||
}
|
||
|
||
Ok(elections)
|
||
}
|
||
|
||
/// Findet alle existierenden Wahlen (inkl. manuell angelegter),
|
||
/// für die noch keine Kandidaten eingetragen wurden.
|
||
fn find_elections_needing_candidates(pool: &ConnectionPool) -> Result<Vec<Election>, DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(
|
||
"select_elections_needing_candidates",
|
||
QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES,
|
||
).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare select_elections_needing_candidates: {e}")))?;
|
||
let rows = conn.execute("select_elections_needing_candidates", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec select_elections_needing_candidates: {e}")))?;
|
||
|
||
let mut elections = Vec::with_capacity(rows.len());
|
||
for row in rows {
|
||
let election_id = parse_i32(&row, "election_id", -1);
|
||
let region_id = parse_i32(&row, "region_id", -1);
|
||
let posts_to_fill = parse_i32(&row, "posts_to_fill", 0);
|
||
if election_id >= 0 && region_id >= 0 && posts_to_fill > 0 {
|
||
elections.push(Election {
|
||
election_id,
|
||
region_id,
|
||
posts_to_fill,
|
||
});
|
||
}
|
||
}
|
||
|
||
Ok(elections)
|
||
}
|
||
|
||
fn insert_candidates_for_elections(
|
||
pool: &ConnectionPool,
|
||
elections: &[Election],
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("insert_candidates", QUERY_INSERT_CANDIDATES)
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare insert_candidates: {e}")))?;
|
||
|
||
for e in elections {
|
||
conn.execute(
|
||
"insert_candidates",
|
||
&[&e.election_id, &e.region_id, &e.posts_to_fill],
|
||
).map_err(|err| DbError::new(format!("[PoliticsWorker] exec insert_candidates eid={} rid={}: {}", e.election_id, e.region_id, err)))?;
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn process_expired_offices_and_fill(
|
||
pool: &ConnectionPool,
|
||
) -> Result<Vec<Office>, DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("process_expired_and_fill", QUERY_PROCESS_EXPIRED_AND_FILL)
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare process_expired_and_fill: {e}")))?;
|
||
let rows = conn.execute("process_expired_and_fill", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec process_expired_and_fill: {e}")))?;
|
||
|
||
Ok(rows
|
||
.into_iter()
|
||
.filter_map(map_row_to_office)
|
||
.collect())
|
||
}
|
||
|
||
fn get_user_ids_in_cities_of_regions(
|
||
pool: &ConnectionPool,
|
||
region_ids: &HashSet<i32>,
|
||
) -> Result<Vec<i32>, DbError> {
|
||
if region_ids.is_empty() {
|
||
return Ok(Vec::new());
|
||
}
|
||
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("get_users_in_cities", QUERY_USERS_IN_CITIES_OF_REGIONS)?;
|
||
|
||
let mut user_ids = Vec::new();
|
||
for rid in region_ids {
|
||
let rows = conn.execute("get_users_in_cities", &[rid])?;
|
||
for row in rows {
|
||
if let Some(uid) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
|
||
user_ids.push(uid);
|
||
}
|
||
}
|
||
}
|
||
|
||
Ok(user_ids)
|
||
}
|
||
|
||
fn notify_office_expirations(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("notify_office_expiration", QUERY_NOTIFY_OFFICE_EXPIRATION)
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_office_expiration: {e}")))?;
|
||
conn.execute("notify_office_expiration", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_office_expiration: {e}")))?;
|
||
|
||
conn.prepare(
|
||
"get_users_with_expiring_offices",
|
||
QUERY_GET_USERS_WITH_EXPIRING_OFFICES,
|
||
).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_with_expiring_offices: {e}")))?;
|
||
let rows = conn.execute("get_users_with_expiring_offices", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_with_expiring_offices: {e}")))?;
|
||
|
||
for row in rows {
|
||
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
|
||
let msg =
|
||
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id);
|
||
broker.publish(msg);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn notify_election_created(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
user_ids: &[i32],
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("notify_election_created", QUERY_NOTIFY_ELECTION_CREATED)
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_election_created: {e}")))?;
|
||
|
||
for uid in user_ids {
|
||
conn.execute("notify_election_created", &[uid])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_election_created uid={}: {}", uid, e)))?;
|
||
}
|
||
|
||
conn.prepare(
|
||
"get_users_in_regions_with_elections",
|
||
QUERY_GET_USERS_IN_REGIONS_WITH_ELECTIONS,
|
||
).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_in_regions_with_elections: {e}")))?;
|
||
let rows = conn.execute("get_users_in_regions_with_elections", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_in_regions_with_elections: {e}")))?;
|
||
|
||
for row in rows {
|
||
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
|
||
let msg =
|
||
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id);
|
||
broker.publish(msg);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn notify_office_filled(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
new_offices: &[Office],
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("notify_office_filled", QUERY_NOTIFY_OFFICE_FILLED)
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare notify_office_filled: {e}")))?;
|
||
|
||
for office in new_offices {
|
||
// Debug-Logging mit allen Feldern, damit sie aktiv genutzt werden
|
||
eprintln!(
|
||
"[PoliticsWorker] Office filled: id={}, type={}, character={}, region={}",
|
||
office.office_id, office.office_type_id, office.character_id, office.region_id
|
||
);
|
||
conn.execute("notify_office_filled", &[&office.character_id])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec notify_office_filled office_id={} character_id={}: {}", office.office_id, office.character_id, e)))?;
|
||
}
|
||
|
||
conn.prepare(
|
||
"get_users_with_filled_offices",
|
||
QUERY_GET_USERS_WITH_FILLED_OFFICES,
|
||
).map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_users_with_filled_offices: {e}")))?;
|
||
let rows = conn.execute("get_users_with_filled_offices", &[])
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec get_users_with_filled_offices: {e}")))?;
|
||
|
||
for row in rows {
|
||
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
|
||
let msg =
|
||
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id);
|
||
broker.publish(msg);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn process_elections(pool: &ConnectionPool) -> Result<Vec<Office>, DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("process_elections", QUERY_PROCESS_ELECTIONS)?;
|
||
let rows = conn.execute("process_elections", &[])?;
|
||
|
||
Ok(rows
|
||
.into_iter()
|
||
.filter_map(map_row_to_office)
|
||
.collect())
|
||
}
|
||
|
||
/// Verarbeitet Church Office Jobs um 13 Uhr:
|
||
/// - Findet verfügbare Positionen
|
||
/// - Verarbeitet bestehende Bewerbungen
|
||
/// - Erstellt neue Bewerbungen falls nötig
|
||
fn perform_church_office_task(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
) -> Result<(), DbError> {
|
||
eprintln!("[PoliticsWorker] Starte Church Office Task um 13 Uhr");
|
||
|
||
// 1) Verfügbare Church Office Positionen finden
|
||
let available_offices = Self::find_available_church_offices(pool)?;
|
||
eprintln!(
|
||
"[PoliticsWorker] Gefunden: {} verfügbare Church Office Positionen",
|
||
available_offices.len()
|
||
);
|
||
|
||
// 2) Für jede verfügbare Position Bewerbungen verarbeiten
|
||
for office in &available_offices {
|
||
// Supervisor finden
|
||
if let Some(supervisor) = Self::find_church_supervisor(pool, office.region_id, office.office_type_id)? {
|
||
// Bestehende Bewerbungen für diesen Supervisor verarbeiten
|
||
Self::process_church_applications(pool, broker, supervisor.supervisor_character_id)?;
|
||
|
||
// Falls noch Plätze frei sind, neue Bewerbungen erstellen
|
||
let remaining_seats = office.seats_per_region - office.occupied_seats;
|
||
if remaining_seats > 0 {
|
||
Self::create_church_application_jobs(
|
||
pool,
|
||
office.office_type_id,
|
||
office.region_id,
|
||
supervisor.supervisor_character_id,
|
||
remaining_seats,
|
||
)?;
|
||
}
|
||
} else {
|
||
eprintln!(
|
||
"[PoliticsWorker] Kein Supervisor gefunden für office_type_id={}, region_id={}",
|
||
office.office_type_id, office.region_id
|
||
);
|
||
}
|
||
}
|
||
|
||
eprintln!("[PoliticsWorker] Church Office Task abgeschlossen");
|
||
Ok(())
|
||
}
|
||
|
||
fn find_available_church_offices(
|
||
pool: &ConnectionPool,
|
||
) -> Result<Vec<AvailableChurchOffice>, DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(
|
||
"find_available_church_offices",
|
||
QUERY_FIND_AVAILABLE_CHURCH_OFFICES,
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare find_available_church_offices: {e}"
|
||
))
|
||
})?;
|
||
let rows = conn
|
||
.execute("find_available_church_offices", &[])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec find_available_church_offices: {e}"
|
||
))
|
||
})?;
|
||
|
||
let mut offices = Vec::new();
|
||
for row in rows {
|
||
let office_type_id = parse_i32(&row, "office_type_id", -1);
|
||
let region_id = parse_i32(&row, "region_id", -1);
|
||
let seats_per_region = parse_i32(&row, "seats_per_region", 0);
|
||
let occupied_seats = parse_i32(&row, "occupied_seats", 0);
|
||
|
||
if office_type_id >= 0 && region_id >= 0 {
|
||
offices.push(AvailableChurchOffice {
|
||
office_type_id,
|
||
seats_per_region,
|
||
region_id,
|
||
occupied_seats,
|
||
});
|
||
}
|
||
}
|
||
|
||
Ok(offices)
|
||
}
|
||
|
||
fn find_church_supervisor(
|
||
pool: &ConnectionPool,
|
||
region_id: i32,
|
||
office_type_id: i32,
|
||
) -> Result<Option<ChurchSupervisor>, DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("find_church_supervisor", QUERY_FIND_CHURCH_SUPERVISOR)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare find_church_supervisor: {e}"
|
||
))
|
||
})?;
|
||
let rows = conn
|
||
.execute("find_church_supervisor", &[®ion_id, &office_type_id])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec find_church_supervisor: {e}"
|
||
))
|
||
})?;
|
||
|
||
for row in rows {
|
||
let supervisor_character_id = parse_i32(&row, "supervisor_character_id", -1);
|
||
|
||
if supervisor_character_id >= 0 {
|
||
return Ok(Some(ChurchSupervisor {
|
||
supervisor_character_id,
|
||
}));
|
||
}
|
||
}
|
||
|
||
Ok(None)
|
||
}
|
||
|
||
fn process_church_applications(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
supervisor_id: i32,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
// Bewerbungen für diesen Supervisor abrufen
|
||
conn.prepare(
|
||
"get_pending_church_applications",
|
||
QUERY_GET_PENDING_CHURCH_APPLICATIONS,
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare get_pending_church_applications: {e}"
|
||
))
|
||
})?;
|
||
let rows = conn
|
||
.execute("get_pending_church_applications", &[&supervisor_id])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec get_pending_church_applications: {e}"
|
||
))
|
||
})?;
|
||
|
||
let mut applications = Vec::new();
|
||
for row in rows {
|
||
let application_id = parse_i32(&row, "application_id", -1);
|
||
let office_type_id = parse_i32(&row, "office_type_id", -1);
|
||
let applicant_character_id = parse_i32(&row, "applicant_character_id", -1);
|
||
|
||
if application_id >= 0 {
|
||
applications.push(ChurchApplication {
|
||
application_id,
|
||
office_type_id,
|
||
applicant_character_id,
|
||
});
|
||
}
|
||
}
|
||
|
||
// Voraussetzungen prüfen und Bewerbungen verarbeiten
|
||
conn.prepare(
|
||
"get_church_office_requirements",
|
||
QUERY_GET_CHURCH_OFFICE_REQUIREMENTS,
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare get_church_office_requirements: {e}"
|
||
))
|
||
})?;
|
||
|
||
conn.prepare(
|
||
"check_character_eligibility",
|
||
QUERY_CHECK_CHARACTER_ELIGIBILITY,
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare check_character_eligibility: {e}"
|
||
))
|
||
})?;
|
||
|
||
conn.prepare("approve_church_application", QUERY_APPROVE_CHURCH_APPLICATION)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare approve_church_application: {e}"
|
||
))
|
||
})?;
|
||
|
||
conn.prepare("reject_church_application", QUERY_REJECT_CHURCH_APPLICATION)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare reject_church_application: {e}"
|
||
))
|
||
})?;
|
||
|
||
for app in &applications {
|
||
// Voraussetzungen für dieses Amt abrufen
|
||
let req_rows = conn
|
||
.execute("get_church_office_requirements", &[&app.office_type_id])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec get_church_office_requirements: {e}"
|
||
))
|
||
})?;
|
||
|
||
let mut requirements = Vec::new();
|
||
for req_row in req_rows {
|
||
let prerequisite_office_type_id = req_row
|
||
.get("prerequisite_office_type_id")
|
||
.and_then(|v| v.parse::<i32>().ok());
|
||
let min_title_level = req_row
|
||
.get("min_title_level")
|
||
.and_then(|v| v.parse::<i32>().ok());
|
||
|
||
requirements.push(ChurchOfficeRequirement {
|
||
prerequisite_office_type_id,
|
||
min_title_level,
|
||
});
|
||
}
|
||
|
||
// Prüfe ob Character die Voraussetzungen erfüllt
|
||
let mut eligible = true;
|
||
for req in &requirements {
|
||
let elig_rows = conn
|
||
.execute(
|
||
"check_character_eligibility",
|
||
&[
|
||
&app.applicant_character_id,
|
||
&req.prerequisite_office_type_id,
|
||
&req.min_title_level,
|
||
],
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec check_character_eligibility: {e}"
|
||
))
|
||
})?;
|
||
|
||
for elig_row in elig_rows {
|
||
let has_prerequisite: bool = elig_row
|
||
.get("has_prerequisite")
|
||
.and_then(|v| v.parse::<bool>().ok())
|
||
.unwrap_or(false);
|
||
let meets_title_requirement: bool = elig_row
|
||
.get("meets_title_requirement")
|
||
.and_then(|v| v.parse::<bool>().ok())
|
||
.unwrap_or(false);
|
||
|
||
if !has_prerequisite || !meets_title_requirement {
|
||
eligible = false;
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
|
||
// Bewerbung genehmigen oder ablehnen
|
||
if eligible {
|
||
let approve_rows = conn
|
||
.execute("approve_church_application", &[&app.application_id])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec approve_church_application: {e}"
|
||
))
|
||
})?;
|
||
|
||
if !approve_rows.is_empty() {
|
||
eprintln!(
|
||
"[PoliticsWorker] Church Application {} genehmigt (office_type_id={}, character_id={})",
|
||
app.application_id, app.office_type_id, app.applicant_character_id
|
||
);
|
||
|
||
// Entferne niedrigerrangige Church Offices wenn Character mehrere hat
|
||
Self::remove_lower_ranked_church_offices(pool)?;
|
||
|
||
// Benachrichtigung senden
|
||
if let Some(user_id) = Self::get_user_id_for_character(pool, app.applicant_character_id)? {
|
||
let msg = format!(
|
||
r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#,
|
||
user_id
|
||
);
|
||
broker.publish(msg);
|
||
}
|
||
}
|
||
} else {
|
||
conn.execute("reject_church_application", &[&app.application_id])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec reject_church_application: {e}"
|
||
))
|
||
})?;
|
||
eprintln!(
|
||
"[PoliticsWorker] Church Application {} abgelehnt (Voraussetzungen nicht erfüllt)",
|
||
app.application_id
|
||
);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn create_church_application_jobs(
|
||
pool: &ConnectionPool,
|
||
office_type_id: i32,
|
||
region_id: i32,
|
||
supervisor_id: i32,
|
||
count: i32,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
// Charaktere für diese Region finden
|
||
conn.prepare(
|
||
"get_characters_for_church_office",
|
||
QUERY_GET_CHARACTERS_FOR_CHURCH_OFFICE,
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare get_characters_for_church_office: {e}"
|
||
))
|
||
})?;
|
||
|
||
let rows = conn
|
||
.execute("get_characters_for_church_office", &[®ion_id, &count])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec get_characters_for_church_office: {e}"
|
||
))
|
||
})?;
|
||
|
||
conn.prepare(
|
||
"create_church_application_job",
|
||
QUERY_CREATE_CHURCH_APPLICATION_JOB,
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare create_church_application_job: {e}"
|
||
))
|
||
})?;
|
||
|
||
let mut created = 0;
|
||
for row in rows {
|
||
if let Some(character_id) = row
|
||
.get("character_id")
|
||
.and_then(|v| v.parse::<i32>().ok())
|
||
{
|
||
let app_rows = conn
|
||
.execute(
|
||
"create_church_application_job",
|
||
&[&office_type_id, &character_id, ®ion_id, &supervisor_id],
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec create_church_application_job: {e}"
|
||
))
|
||
})?;
|
||
|
||
if !app_rows.is_empty() {
|
||
created += 1;
|
||
eprintln!(
|
||
"[PoliticsWorker] Church Application Job erstellt: office_type_id={}, character_id={}, region_id={}",
|
||
office_type_id, character_id, region_id
|
||
);
|
||
}
|
||
}
|
||
}
|
||
|
||
eprintln!(
|
||
"[PoliticsWorker] {} Church Application Jobs erstellt für office_type_id={}, region_id={}",
|
||
created, office_type_id, region_id
|
||
);
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn get_user_id_for_character(
|
||
pool: &ConnectionPool,
|
||
character_id: i32,
|
||
) -> Result<Option<i32>, DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
let query = "SELECT user_id FROM falukant_data.character WHERE id = $1";
|
||
conn.prepare("get_user_id_for_character", query)
|
||
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare get_user_id_for_character: {e}")))?;
|
||
let rows = conn
|
||
.execute("get_user_id_for_character", &[&character_id])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec get_user_id_for_character: {e}"
|
||
))
|
||
})?;
|
||
|
||
for row in rows {
|
||
if let Some(user_id) = row.get("user_id").and_then(|v| v.parse::<i32>().ok()) {
|
||
return Ok(Some(user_id));
|
||
}
|
||
}
|
||
|
||
Ok(None)
|
||
}
|
||
|
||
/// Entfernt niedrigerrangige politische Ämter wenn ein Character mehrere hat
|
||
fn remove_lower_ranked_political_offices(pool: &ConnectionPool) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(
|
||
"remove_lower_ranked_political_offices",
|
||
QUERY_REMOVE_LOWER_RANKED_POLITICAL_OFFICES,
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare remove_lower_ranked_political_offices: {e}"
|
||
))
|
||
})?;
|
||
|
||
conn.execute("remove_lower_ranked_political_offices", &[])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec remove_lower_ranked_political_offices: {e}"
|
||
))
|
||
})?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// Entfernt niedrigerrangige Church Offices wenn ein Character mehrere hat
|
||
fn remove_lower_ranked_church_offices(pool: &ConnectionPool) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(
|
||
"remove_lower_ranked_church_offices",
|
||
QUERY_REMOVE_LOWER_RANKED_CHURCH_OFFICES,
|
||
)
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] prepare remove_lower_ranked_church_offices: {e}"
|
||
))
|
||
})?;
|
||
|
||
conn.execute("remove_lower_ranked_church_offices", &[])
|
||
.map_err(|e| {
|
||
DbError::new(format!(
|
||
"[PoliticsWorker] exec remove_lower_ranked_church_offices: {e}"
|
||
))
|
||
})?;
|
||
|
||
Ok(())
|
||
}
|
||
}
|
||
|
||
impl Worker for PoliticsWorker {
|
||
fn start_worker_thread(&mut self) {
|
||
let pool = self.base.pool.clone();
|
||
let broker = self.base.broker.clone();
|
||
|
||
self.base
|
||
.start_worker_with_loop(move |state: Arc<WorkerState>| {
|
||
PoliticsWorker::run_loop(pool.clone(), broker.clone(), state);
|
||
});
|
||
}
|
||
|
||
fn stop_worker_thread(&mut self) {
|
||
self.base.stop_worker();
|
||
}
|
||
|
||
fn enable_watchdog(&mut self) {
|
||
self.base.start_watchdog();
|
||
}
|
||
}
|
||
|
||
fn parse_i32(row: &Row, key: &str, default: i32) -> i32 {
|
||
row.get(key)
|
||
.and_then(|v| v.parse::<i32>().ok())
|
||
.unwrap_or(default)
|
||
}
|
||
|
||
fn map_row_to_office(row: Row) -> Option<Office> {
|
||
Some(Office {
|
||
office_id: row.get("office_id")?.parse().ok()?,
|
||
office_type_id: row.get("office_type_id")?.parse().ok()?,
|
||
character_id: row.get("character_id")?.parse().ok()?,
|
||
region_id: row.get("region_id")?.parse().ok()?,
|
||
})
|
||
}
|
||
|
||
|