Updated politics worker
All checks were successful
Deploy yourpart (blue-green) / deploy (push) Successful in 2m2s

This commit is contained in:
Torsten Schulz (local)
2026-06-05 20:28:06 +02:00
parent facd11b14f
commit e5e17c2c5e
2 changed files with 38 additions and 148 deletions

View File

@@ -13,7 +13,7 @@ use crate::worker::sql::{
QUERY_ENFORCE_ELECTION_LEAD_TIME, QUERY_ENFORCE_ELECTION_LEAD_TIME,
QUERY_INSERT_CANDIDATES, QUERY_INSERT_CANDIDATES,
QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES, QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES,
QUERY_PROCESS_EXPIRED_AND_FILL, QUERY_PROCESS_EXPIRED_AND_SCHEDULE,
QUERY_USERS_IN_CITIES_OF_REGIONS, QUERY_USERS_IN_CITIES_OF_REGIONS,
QUERY_NOTIFY_OFFICE_EXPIRATION, QUERY_NOTIFY_OFFICE_EXPIRATION,
QUERY_NOTIFY_ELECTION_CREATED, QUERY_NOTIFY_ELECTION_CREATED,
@@ -167,20 +167,17 @@ impl PoliticsWorker {
} }
// 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur) // 1) Optional: Positionen evaluieren (aktuell nur Logging/Struktur)
Self::evaluate_political_positions(pool)?; Self::evaluate_political_positions(pool)?;
// 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen, // 2) Schema-Änderungen abgleichen: neue / zusätzliche Ämter anlegen,
// ohne bestehende Amtsinhaber bei Reduktion zu entfernen. // ohne bestehende Amtsinhaber bei Reduktion zu entfernen.
Self::sync_offices_with_types(pool)?; Self::sync_offices_with_types(pool)?;
// 3) Ämter, die bald auslaufen, benachrichtigen // 3) Ämter, die bald auslaufen, benachrichtigen
Self::notify_office_expirations(pool, broker)?; Self::notify_office_expirations(pool, broker)?;
// 4) Abgelaufene Ämter verarbeiten und neue besetzen // 4) Abgelaufene Ämter archivieren und als Neuwahl ausschreiben.
let new_offices_direct = Self::process_expired_offices_and_fill(pool)?; Self::process_expired_offices_and_schedule(pool)?;
if !new_offices_direct.is_empty() {
Self::notify_office_filled(pool, broker, &new_offices_direct)?;
}
// 4b) Nach dem Entfernen abgelaufener Aemter erneut Luecken // 4b) Nach dem Entfernen abgelaufener Aemter erneut Luecken
// synchronisieren. Das behebt Rueckstaende, wenn Aemter schon vor // synchronisieren. Das behebt Rueckstaende, wenn Aemter schon vor
@@ -482,22 +479,17 @@ impl PoliticsWorker {
Ok(()) Ok(())
} }
fn process_expired_offices_and_fill( fn process_expired_offices_and_schedule(pool: &ConnectionPool) -> Result<(), DbError> {
pool: &ConnectionPool,
) -> Result<Vec<Office>, DbError> {
let mut conn = pool let mut conn = pool
.get() .get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?; .map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("process_expired_and_fill", QUERY_PROCESS_EXPIRED_AND_FILL) conn.prepare("process_expired_and_schedule", QUERY_PROCESS_EXPIRED_AND_SCHEDULE)
.map_err(|e| DbError::new(format!("[PoliticsWorker] prepare process_expired_and_fill: {e}")))?; .map_err(|e| DbError::new(format!("[PoliticsWorker] prepare process_expired_and_schedule: {e}")))?;
let rows = conn.execute("process_expired_and_fill", &[]) conn.execute("process_expired_and_schedule", &[])
.map_err(|e| DbError::new(format!("[PoliticsWorker] exec process_expired_and_fill: {e}")))?; .map_err(|e| DbError::new(format!("[PoliticsWorker] exec process_expired_and_schedule: {e}")))?;
Ok(rows Ok(())
.into_iter()
.filter_map(map_row_to_office)
.collect())
} }
fn get_user_ids_in_cities_of_regions( fn get_user_ids_in_cities_of_regions(
@@ -1565,5 +1557,3 @@ fn map_row_to_office(row: Row) -> Option<Office> {
region_id: row.get("region_id")?.parse().ok()?, region_id: row.get("region_id")?.parse().ok()?,
}) })
} }

View File

@@ -1251,7 +1251,7 @@ pub const QUERY_SELECT_ELECTIONS_NEEDING_CANDIDATES: &str = r#"
); );
"#; "#;
pub const QUERY_PROCESS_EXPIRED_AND_FILL: &str = r#" pub const QUERY_PROCESS_EXPIRED_AND_SCHEDULE: &str = r#"
WITH WITH
doomed AS ( doomed AS (
SELECT po.id, SELECT po.id,
@@ -1275,144 +1275,45 @@ pub const QUERY_PROCESS_EXPIRED_AND_FILL: &str = r#"
RETURNING po.office_type_id AS office_type_id, RETURNING po.office_type_id AS office_type_id,
po.region_id AS region_id po.region_id AS region_id
), ),
distinct_types AS ( gaps_per_region AS (
SELECT DISTINCT office_type_id, region_id FROM expired_offices
),
votes_per_candidate AS (
SELECT
dt.office_type_id,
dt.region_id,
c.character_id,
COUNT(v.id) AS vote_count
FROM distinct_types AS dt
JOIN falukant_data.election AS e
ON e.office_type_id = dt.office_type_id
JOIN falukant_data.vote AS v
ON v.election_id = e.id
JOIN falukant_data.candidate AS c
ON c.election_id = e.id
AND c.id = v.candidate_id
WHERE e.date >= (NOW() - INTERVAL '30 days')
GROUP BY dt.office_type_id, dt.region_id, c.character_id
),
ranked_winners AS (
SELECT
vpc.office_type_id,
vpc.region_id,
vpc.character_id,
ROW_NUMBER() OVER (
PARTITION BY vpc.office_type_id, vpc.region_id
ORDER BY vpc.vote_count DESC
) AS rn
FROM votes_per_candidate AS vpc
),
selected_winners AS (
SELECT
rw.office_type_id,
rw.region_id,
rw.character_id
FROM ranked_winners AS rw
JOIN falukant_type.political_office_type AS pot
ON pot.id = rw.office_type_id
WHERE rw.rn <= pot.seats_per_region
),
insert_winners AS (
INSERT INTO falukant_data.political_office
(office_type_id, character_id, created_at, updated_at, region_id)
SELECT
sw.office_type_id,
sw.character_id,
NOW(),
NOW(),
sw.region_id
FROM selected_winners AS sw
RETURNING id AS new_office_id, office_type_id, character_id, region_id
),
count_inserted AS (
SELECT SELECT
office_type_id, office_type_id,
region_id, region_id,
COUNT(*) AS inserted_count COUNT(*) AS gaps
FROM insert_winners FROM expired_offices
GROUP BY office_type_id, region_id GROUP BY office_type_id, region_id
), ),
needed_to_fill AS ( to_schedule AS (
SELECT SELECT
dt.office_type_id, g.office_type_id,
dt.region_id, g.region_id,
(pot.seats_per_region - COALESCE(ci.inserted_count, 0)) AS gaps g.gaps,
FROM distinct_types AS dt (CURRENT_DATE + INTERVAL '2 days')::date AS election_date
JOIN falukant_type.political_office_type AS pot FROM gaps_per_region AS g
ON pot.id = dt.office_type_id WHERE NOT EXISTS (
LEFT JOIN count_inserted AS ci SELECT 1
ON ci.office_type_id = dt.office_type_id FROM falukant_data.election AS e
AND ci.region_id = dt.region_id WHERE e.office_type_id = g.office_type_id
WHERE (pot.seats_per_region - COALESCE(ci.inserted_count, 0)) > 0 AND e.region_id = g.region_id
), AND e.date::date >= (CURRENT_DATE + INTERVAL '2 days')::date
random_candidates AS (
SELECT
rtf.office_type_id,
rtf.region_id,
ch.id AS character_id,
ROW_NUMBER() OVER (
PARTITION BY rtf.office_type_id, rtf.region_id
ORDER BY RANDOM()
) AS rn
FROM needed_to_fill AS rtf
JOIN falukant_data.character AS ch
ON ch.region_id IN (
WITH RECURSIVE region_tree AS (
SELECT id FROM falukant_data.region WHERE id = rtf.region_id
UNION ALL
SELECT r2.id FROM falukant_data.region r2
JOIN region_tree rt ON r2.parent_id = rt.id
)
SELECT id FROM region_tree
) )
AND ch.user_id IS NULL
AND ch.birthdate <= NOW() - INTERVAL '21 days'
AND ch.title_of_nobility IN (
SELECT id FROM falukant_type.title WHERE label_tr != 'noncivil'
)
AND NOT EXISTS (
SELECT 1
FROM falukant_data.political_office AS po2
JOIN falukant_type.political_office_type AS pot2
ON pot2.id = po2.office_type_id
WHERE po2.character_id = ch.id
AND (po2.created_at + (pot2.term_length * INTERVAL '1 day')) >
NOW() + INTERVAL '2 days'
)
), ),
insert_random AS ( scheduled AS (
INSERT INTO falukant_data.political_office INSERT INTO falukant_data.election
(office_type_id, character_id, created_at, updated_at, region_id) (office_type_id, date, posts_to_fill, created_at, updated_at, region_id)
SELECT SELECT
rc.office_type_id, ts.office_type_id,
rc.character_id, ts.election_date,
ts.gaps,
NOW(), NOW(),
NOW(), NOW(),
rc.region_id ts.region_id
FROM random_candidates AS rc FROM to_schedule AS ts
JOIN needed_to_fill AS rtf RETURNING id
ON rtf.office_type_id = rc.office_type_id
AND rtf.region_id = rc.region_id
WHERE rc.rn <= rtf.gaps
RETURNING id AS new_office_id, office_type_id, character_id, region_id
) )
SELECT SELECT
new_office_id AS office_id, COUNT(*) AS scheduled_count
office_type_id, FROM scheduled;
character_id,
region_id
FROM insert_winners
UNION ALL
SELECT
new_office_id AS office_id,
office_type_id,
character_id,
region_id
FROM insert_random;
"#; "#;
pub const QUERY_USERS_IN_CITIES_OF_REGIONS: &str = r#" pub const QUERY_USERS_IN_CITIES_OF_REGIONS: &str = r#"
@@ -4229,4 +4130,3 @@ pub const QUERY_GET_FALUKANT_USER_CERT_AND_EVENT: &str = r#"
FROM falukant_data.falukant_user FROM falukant_data.falukant_user
WHERE id = $1::int; WHERE id = $1::int;
"#; "#;