diff --git a/docs/FALUKANT_DIRECTOR_AUTO_ADJUST_INCOME.md b/docs/FALUKANT_DIRECTOR_AUTO_ADJUST_INCOME.md new file mode 100644 index 0000000..b1df9a3 --- /dev/null +++ b/docs/FALUKANT_DIRECTOR_AUTO_ADJUST_INCOME.md @@ -0,0 +1,31 @@ +# Director: Auto-Adjust Income + +Dieses Feature koppelt das Gehalt (`income`) eines Direktors optional automatisch an sein Wunschgehalt (`wished_income`). + +## Aktivierung + +Feld am Director: + +- `auto_adjust_income` (`boolean`, default `false`) + +Bei `false` bleibt die normale Zufriedenheitslogik aktiv. + +## Verhalten im Daemon + +Die Prüfung läuft im täglichen Director-Satisfaction-Tick. + +- Wenn `auto_adjust_income = true`: + - der Daemon setzt `income = wished_income` (untere Grenze `0`). + - die Zufriedenheit wird in diesem Tick **nicht** abgesenkt. +- Wenn `auto_adjust_income = false`: + - Zufriedenheit sinkt täglich abhängig vom Gehaltsgap um `1..3` Punkte. + +## Zusammenhang mit Kündigungen + +- Kündigungsprüfung bleibt separat (daily resignation check). +- Zusätzlich gilt Karenzzeit: kein Rücktritt in den ersten 3 Tagen nach Einstellung/Update. + +## UI-/API-Hinweis + +Der Toggle „Gehalt automatisch anpassen“ wird über die bestehenden Director-Settings gespeichert. +Nach erfolgreicher Änderung aktualisiert der nächste tägliche Tick den effektiven `income`-Wert. diff --git a/docs/FALUKANT_WORKER_SCHEDULE_SOCKET.md b/docs/FALUKANT_WORKER_SCHEDULE_SOCKET.md new file mode 100644 index 0000000..99779a6 --- /dev/null +++ b/docs/FALUKANT_WORKER_SCHEDULE_SOCKET.md @@ -0,0 +1,95 @@ +# Worker-Schedules per WebSocket + +Dieses Feature stellt eine Übersicht bereit, wann Worker-Tasks voraussichtlich wieder laufen. + +## Zugriffsschutz + +Der Zugriff ist nur erlaubt für Nutzer mit mindestens einem dieser Rechte: + +- `admin` +- `worker_schedule_read` + +Migration für das neue Recht: + +- `migrations/018_worker_schedule_socket_right.sql` + +## WebSocket-Events + +Voraussetzung: Der Client hat vorher `setUserId` gesendet. + +### 1) Übersicht + +Request: + +```json +{ + "event": "getWorkerSchedules", + "data": {} +} +``` + +Response: + +```json +{ + "event": "getWorkerSchedulesResponse", + "ok": true, + "generated_at": 1715157600, + "schedules": [ + { + "worker": "DirectorWorker", + "tasks": [ + { + "task": "salary_payout", + "cadence_seconds": 86400, + "cadence_label": "86400s interval", + "next_run_latest_ts": 1715244000, + "next_run_latest_in_seconds": 86400 + } + ] + } + ] +} +``` + +Hinweis: Sekündliche/minütliche Tasks sind absichtlich nicht enthalten. + +### 2) Detaillierte Übersicht + +Request: + +```json +{ + "event": "getWorkerSchedulesDetailed", + "data": {} +} +``` + +Response (zusätzlich Live-Status aus dem Worker-Runtime-Registry): + +```json +{ + "event": "getWorkerSchedulesDetailedResponse", + "ok": true, + "generated_at": 1715157600, + "schedules": [ + { + "worker": "DirectorWorker", + "running_worker": true, + "running_watchdog": true, + "current_step": "DirectorWorker: pay_salary", + "last_step_change_ts": 1715157000, + "tasks": [] + } + ] +} +``` + +## Rechteprüfung (technisch) + +Der Daemon akzeptiert sowohl: + +- `setUserId = community.user.id` +- `setUserId = falukant_data.falukant_user.id` + +In beiden Fällen wird auf `community.user_right` + `"type".user_right` geprüft. diff --git a/migrations/018_worker_schedule_socket_right.sql b/migrations/018_worker_schedule_socket_right.sql new file mode 100644 index 0000000..954f355 --- /dev/null +++ b/migrations/018_worker_schedule_socket_right.sql @@ -0,0 +1,9 @@ +-- Recht für WebSocket-Endpoint `getWorkerSchedules`. +-- Zugriff nur für Admins oder User mit diesem Recht. +INSERT INTO "type".user_right (title) +SELECT 'worker_schedule_read' +WHERE NOT EXISTS ( + SELECT 1 + FROM "type".user_right + WHERE LOWER(title) = 'worker_schedule_read' +); diff --git a/migrations/README.md b/migrations/README.md index d03e458..3dcf669 100644 --- a/migrations/README.md +++ b/migrations/README.md @@ -45,3 +45,11 @@ Spalte **`falukant_log.production.completion_count`**: zählt **abgeschlossene P ## `016_falukant_log_political_office_history.sql` Tabelle **`falukant_log.political_office_history`**: Archiv abgeschlossener politischer Amtszeiten (`character_id`, `office_type_id`, `region_id`, `start_date`, `end_date`). Der Daemon schreibt **vor** jedem relevanten `DELETE` auf **`falukant_data.political_office`** (Amtsende/Neuwahl-Pfad, Übersitz-Trim, Charaktertod). **`falukant_data.process_elections()`** (PostgreSQL) liegt außerhalb des Rust-Repos — falls dort Zeilen gelöscht werden, analog **`INSERT` in diese Historie** in der DB-Funktion ergänzen. + +## `018_worker_schedule_socket_right.sql` + +Neues Benutzerrecht **`worker_schedule_read`** in **`type.user_right`**. +Dieses Recht erlaubt den WebSocket-Endpoint `getWorkerSchedules` (Daemon-Laufplan für Worker-Tasks mit Intervallen > 60 Sekunden). Zugriff haben: + +- Nutzer mit Rechtstitel `admin` +- Nutzer mit Rechtstitel `worker_schedule_read` diff --git a/src/websocket_server.rs b/src/websocket_server.rs index 6435ee8..484cfac 100644 --- a/src/websocket_server.rs +++ b/src/websocket_server.rs @@ -1,6 +1,6 @@ use crate::db::ConnectionPool; use crate::message_broker::MessageBroker; -use crate::worker::Worker; +use crate::worker::{get_worker_runtime_snapshot, Worker, WorkerRuntimeSnapshot}; use futures_util::{FutureExt, SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use serde_json::Value as Json; @@ -60,12 +60,29 @@ struct WebSocketLogEntry { event: Option, // event-Feld aus der Nachricht (falls JSON) } +#[derive(Debug, Clone, Serialize)] +struct WorkerTaskSchedule { + task: String, + cadence_seconds: u64, + cadence_label: String, + next_run_latest_ts: u64, + next_run_latest_in_seconds: u64, +} + +#[derive(Debug, Clone, Serialize)] +struct WorkerSchedule { + worker: String, + tasks: Vec, +} + #[derive(Default)] struct WebSocketLog { entries: Vec, } const WS_LOG_MAX_ENTRIES: usize = 50_000; +const RIGHT_ADMIN: &str = "admin"; +const RIGHT_WORKER_SCHEDULE: &str = "worker_schedule_read"; async fn append_ws_log( log: &Arc>, @@ -281,7 +298,7 @@ async fn run_accept_loop( addr: String, running: Arc, tx: broadcast::Sender, - _pool: ConnectionPool, + pool: ConnectionPool, registry: Arc>, ws_log: Arc>, tls_acceptor: Option, @@ -312,11 +329,19 @@ async fn run_accept_loop( let ws_log_clone = ws_log.clone(); let tls_acceptor_clone = tls_acceptor.clone(); + let pool_clone = pool.clone(); tokio::spawn(async move { if let Some(acc) = tls_acceptor_clone { match acc.accept(stream).await { Ok(tls_stream) => { - handle_connection(tls_stream, peer_addr, rx, registry_clone, ws_log_clone) + handle_connection( + tls_stream, + peer_addr, + rx, + pool_clone, + registry_clone, + ws_log_clone, + ) .await } Err(err) => { @@ -326,7 +351,8 @@ async fn run_accept_loop( } } } else { - handle_connection(stream, peer_addr, rx, registry_clone, ws_log_clone).await; + handle_connection(stream, peer_addr, rx, pool_clone, registry_clone, ws_log_clone) + .await; } }); } @@ -336,6 +362,7 @@ async fn handle_connection( stream: S, peer_addr: SocketAddr, mut broker_rx: broadcast::Receiver, + pool: ConnectionPool, registry: Arc>, ws_log: Arc>, ) where @@ -367,6 +394,7 @@ async fn handle_connection( let user_id = Arc::new(tokio::sync::Mutex::new(Option::::None)); let user_id_for_incoming = user_id.clone(); let user_id_for_broker = user_id.clone(); + let pool_for_incoming = pool.clone(); let registry_for_incoming = registry.clone(); let client_tx_incoming = client_tx.clone(); let ws_log_for_incoming = ws_log.clone(); @@ -446,6 +474,77 @@ async fn handle_connection( .to_string(); let _ = client_tx_incoming.send(payload).await; } + "getWorkerSchedules" => { + let uid_opt = { + let guard = user_id_for_incoming.lock().await; + guard.clone() + }; + let allowed = uid_opt + .as_deref() + .map(|uid| user_can_read_worker_schedules(&pool_for_incoming, uid)) + .unwrap_or(false); + if !allowed { + let payload = serde_json::json!({ + "event": "getWorkerSchedulesResponse", + "ok": false, + "error": "forbidden" + }) + .to_string(); + let _ = client_tx_incoming.send(payload).await; + continue; + } + + let now_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let schedules = build_worker_schedule_overview(now_secs); + let payload = serde_json::json!({ + "event": "getWorkerSchedulesResponse", + "ok": true, + "generated_at": now_secs, + "schedules": schedules + }) + .to_string(); + let _ = client_tx_incoming.send(payload).await; + } + "getWorkerSchedulesDetailed" => { + let uid_opt = { + let guard = user_id_for_incoming.lock().await; + guard.clone() + }; + let allowed = uid_opt + .as_deref() + .map(|uid| user_can_read_worker_schedules(&pool_for_incoming, uid)) + .unwrap_or(false); + if !allowed { + let payload = serde_json::json!({ + "event": "getWorkerSchedulesDetailedResponse", + "ok": false, + "error": "forbidden" + }) + .to_string(); + let _ = client_tx_incoming.send(payload).await; + continue; + } + + let now_secs = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs(); + let schedules = build_worker_schedule_overview(now_secs); + let runtime = get_worker_runtime_snapshot(); + let detailed = build_worker_schedule_detailed(schedules, runtime); + + let payload = serde_json::json!({ + "event": "getWorkerSchedulesDetailedResponse", + "ok": true, + "generated_at": now_secs, + "schedules": detailed + }) + .to_string(); + let _ = client_tx_incoming.send(payload).await; + } _ => { // Unbekannte Events ignorieren } @@ -604,3 +703,141 @@ async fn handle_connection( println!("[WebSocketServer] Verbindung geschlossen: {}", peer_addr); } +fn user_can_read_worker_schedules(pool: &ConnectionPool, user_id_raw: &str) -> bool { + let uid = match user_id_raw.parse::() { + Ok(v) if v > 0 => v, + _ => return false, + }; + + let mut conn = match pool.get() { + Ok(c) => c, + Err(_) => return false, + }; + + let sql = r#" +SELECT EXISTS ( + SELECT 1 + FROM community.user_right ur + JOIN "type".user_right tr ON tr.id = ur.right_type_id + WHERE ur.user_id = $1::int + AND LOWER(COALESCE(tr.title, '')) IN ($2::text, $3::text) +) AS allowed_direct, +EXISTS ( + SELECT 1 + FROM falukant_data.falukant_user fu + JOIN community.user_right ur ON ur.user_id = fu.user_id + JOIN "type".user_right tr ON tr.id = ur.right_type_id + WHERE fu.id = $1::int + AND LOWER(COALESCE(tr.title, '')) IN ($2::text, $3::text) +) AS allowed_falukant; +"#; + + if conn.prepare("ws_can_read_worker_schedules", sql).is_err() { + return false; + } + let admin = RIGHT_ADMIN.to_string(); + let allowed_right = RIGHT_WORKER_SCHEDULE.to_string(); + let rows = match conn.execute( + "ws_can_read_worker_schedules", + &[&uid, &admin, &allowed_right], + ) { + Ok(r) => r, + Err(_) => return false, + }; + let Some(row) = rows.first() else { + return false; + }; + let direct = row + .get("allowed_direct") + .map(|v| v == "true" || v == "t" || v == "1") + .unwrap_or(false); + let falukant = row + .get("allowed_falukant") + .map(|v| v == "true" || v == "t" || v == "1") + .unwrap_or(false); + direct || falukant +} + +fn build_worker_schedule_overview(now_secs: u64) -> Vec { + fn task(name: &str, cadence_seconds: u64, now_secs: u64) -> WorkerTaskSchedule { + WorkerTaskSchedule { + task: name.to_string(), + cadence_seconds, + cadence_label: format!("{}s interval", cadence_seconds), + next_run_latest_ts: now_secs.saturating_add(cadence_seconds), + next_run_latest_in_seconds: cadence_seconds, + } + } + + vec![ + WorkerSchedule { + worker: "DirectorWorker".to_string(), + tasks: vec![ + task("salary_payout", 24 * 60 * 60, now_secs), + task("satisfaction_or_auto_income_adjust", 24 * 60 * 60, now_secs), + task("resignation_check", 24 * 60 * 60, now_secs), + ], + }, + WorkerSchedule { + worker: "ValueRecalculationWorker".to_string(), + tasks: vec![ + task("calculate_product_knowledge", 24 * 60 * 60, now_secs), + task("calculate_regional_sell_price", 24 * 60 * 60, now_secs), + task("calculate_hourly_price_recalculation", 60 * 60, now_secs), + ], + }, + WorkerSchedule { + worker: "FalukantFamilyWorker".to_string(), + tasks: vec![ + task("family_daily", 24 * 60 * 60, now_secs), + task("family_monthly", 24 * 60 * 60, now_secs), + task("lover_installment_and_servants_month_slice", 2 * 60 * 60, now_secs), + ], + }, + WorkerSchedule { + worker: "PoliticsWorker".to_string(), + tasks: vec![ + task("daily_politics", 24 * 60 * 60, now_secs), + task("auto_approve_church_applications", 60 * 60, now_secs), + ], + }, + WorkerSchedule { + worker: "UserCharacterWorker".to_string(), + tasks: vec![ + task("hourly_tasks", 60 * 60, now_secs), + task("hourly_pregnancies", 60 * 60, now_secs), + task("daily_marriage_fertility", 24 * 60 * 60, now_secs), + ], + }, + WorkerSchedule { + worker: "HouseWorker".to_string(), + tasks: vec![task("daily_house_updates", 24 * 60 * 60, now_secs)], + }, + ] +} + +fn build_worker_schedule_detailed( + schedules: Vec, + runtime: Vec, +) -> Vec { + let mut by_worker: HashMap = HashMap::new(); + for r in runtime { + by_worker.insert(r.worker.clone(), r); + } + + schedules + .into_iter() + .map(|ws| { + let rt = by_worker.get(&ws.worker); + serde_json::json!({ + "worker": ws.worker, + "running_worker": rt.map(|r| r.running_worker).unwrap_or(false), + "running_watchdog": rt.map(|r| r.running_watchdog).unwrap_or(false), + "current_step": rt.map(|r| r.current_step.clone()).unwrap_or_else(|| "unknown".to_string()), + "last_step_change_ts": rt.map(|r| r.last_step_change_ts).unwrap_or(0), + "tasks": ws.tasks + }) + }) + .collect() +} + diff --git a/src/worker/base.rs b/src/worker/base.rs index a32ba82..7615adf 100644 --- a/src/worker/base.rs +++ b/src/worker/base.rs @@ -2,9 +2,12 @@ use crate::db::{ConnectionPool, DbError}; use crate::worker::sql::{QUERY_UPDATE_MONEY, QUERY_GET_MONEY}; use crate::message_broker::MessageBroker; use std::sync::atomic::{AtomicBool, Ordering}; +use std::collections::HashMap; +use std::sync::OnceLock; use std::sync::{Arc, Mutex}; use std::thread; use std::time::Duration; +use std::time::{SystemTime, UNIX_EPOCH}; pub trait Worker: Send { fn start_worker_thread(&mut self); @@ -12,6 +15,66 @@ pub trait Worker: Send { fn enable_watchdog(&mut self); } +#[derive(Debug, Clone)] +pub struct WorkerRuntimeSnapshot { + pub worker: String, + pub current_step: String, + pub last_step_change_ts: u64, + pub running_worker: bool, + pub running_watchdog: bool, +} + +#[derive(Debug, Clone)] +struct WorkerRuntimeEntry { + current_step: String, + last_step_change_ts: u64, + running_worker: bool, + running_watchdog: bool, +} + +static WORKER_RUNTIME_REGISTRY: OnceLock>> = OnceLock::new(); + +fn runtime_registry() -> &'static Mutex> { + WORKER_RUNTIME_REGISTRY.get_or_init(|| Mutex::new(HashMap::new())) +} + +fn now_unix_secs() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or_default() + .as_secs() +} + +fn upsert_runtime_entry(name: &str, f: impl FnOnce(&mut WorkerRuntimeEntry)) { + if let Ok(mut reg) = runtime_registry().lock() { + let entry = reg.entry(name.to_string()).or_insert_with(|| WorkerRuntimeEntry { + current_step: format!("{name}: idle"), + last_step_change_ts: now_unix_secs(), + running_worker: false, + running_watchdog: false, + }); + f(entry); + } +} + +pub fn get_worker_runtime_snapshot() -> Vec { + let Ok(reg) = runtime_registry().lock() else { + return Vec::new(); + }; + let mut out: Vec = reg + .iter() + .map(|(worker, e)| WorkerRuntimeSnapshot { + worker: worker.clone(), + current_step: e.current_step.clone(), + last_step_change_ts: e.last_step_change_ts, + running_worker: e.running_worker, + running_watchdog: e.running_watchdog, + }) + .collect(); + out.sort_by(|a, b| a.worker.cmp(&b.worker)); + out +} + pub(crate) struct WorkerState { pub(crate) running_worker: AtomicBool, pub(crate) running_watchdog: AtomicBool, @@ -44,6 +107,10 @@ pub struct BaseWorker { impl BaseWorker { pub fn new(name: &str, pool: ConnectionPool, broker: MessageBroker) -> Self { + upsert_runtime_entry(name, |entry| { + entry.current_step = format!("{name}: idle"); + entry.last_step_change_ts = now_unix_secs(); + }); Self { name: name.to_string(), pool, @@ -55,9 +122,15 @@ impl BaseWorker { } pub fn set_current_step>(&self, step: S) { + let new_step = step.into(); if let Ok(mut guard) = self.state.current_step.lock() { - *guard = step.into(); + *guard = new_step.clone(); } + let worker_name = self.name.clone(); + upsert_runtime_entry(&worker_name, |entry| { + entry.current_step = new_step; + entry.last_step_change_ts = now_unix_secs(); + }); } pub(crate) fn start_worker_with_loop(&mut self, loop_fn: F) @@ -68,6 +141,11 @@ impl BaseWorker { eprintln!("[{}] Worker thread already running, skipping start.", self.name); return; } + let worker_name = self.name.clone(); + upsert_runtime_entry(&worker_name, |entry| { + entry.running_worker = true; + entry.last_step_change_ts = now_unix_secs(); + }); let state = Arc::clone(&self.state); @@ -80,6 +158,11 @@ impl BaseWorker { // Erst den Worker stoppen, dann auch den Watchdog beenden, damit keine // Hintergrund-Threads weiterlaufen. self.state.running_worker.store(false, Ordering::Relaxed); + let worker_name = self.name.clone(); + upsert_runtime_entry(&worker_name, |entry| { + entry.running_worker = false; + entry.last_step_change_ts = now_unix_secs(); + }); self.stop_watchdog(); if let Some(handle) = self.worker_thread.take() { let _ = handle.join(); @@ -95,6 +178,11 @@ impl BaseWorker { eprintln!("[{}] Watchdog already enabled, skipping.", self.name); return; } + let worker_name = self.name.clone(); + upsert_runtime_entry(&worker_name, |entry| { + entry.running_watchdog = true; + entry.last_step_change_ts = now_unix_secs(); + }); let state = Arc::clone(&self.state); @@ -124,6 +212,11 @@ impl BaseWorker { pub(crate) fn stop_watchdog(&mut self) { self.state.running_watchdog.store(false, Ordering::Relaxed); + let worker_name = self.name.clone(); + upsert_runtime_entry(&worker_name, |entry| { + entry.running_watchdog = false; + entry.last_step_change_ts = now_unix_secs(); + }); if let Some(handle) = self.watchdog_thread.take() { let _ = handle.join(); } diff --git a/src/worker/mod.rs b/src/worker/mod.rs index d4155f1..d88d439 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -22,6 +22,7 @@ mod notify; mod sql; pub use base::Worker; +pub use base::{get_worker_runtime_snapshot, WorkerRuntimeSnapshot}; pub use crate::db::ConnectionPool; pub use character_creation::CharacterCreationWorker; pub use director::DirectorWorker;