Implement worker schedule access control and runtime tracking: Added a new user right worker_schedule_read to manage access to the getWorkerSchedules WebSocket endpoint. Enhanced the WebSocket server to handle requests for worker schedules, including detailed responses. Introduced runtime tracking for workers, allowing for monitoring of their current state and activity. Updated relevant SQL and Rust structures to support these features.
All checks were successful
Deploy yourpart (blue-green) / deploy (push) Successful in 1m36s
All checks were successful
Deploy yourpart (blue-green) / deploy (push) Successful in 1m36s
This commit is contained in:
@@ -1,6 +1,6 @@
|
||||
use crate::db::ConnectionPool;
|
||||
use crate::message_broker::MessageBroker;
|
||||
use crate::worker::Worker;
|
||||
use crate::worker::{get_worker_runtime_snapshot, Worker, WorkerRuntimeSnapshot};
|
||||
use futures_util::{FutureExt, SinkExt, StreamExt};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value as Json;
|
||||
@@ -60,12 +60,29 @@ struct WebSocketLogEntry {
|
||||
event: Option<String>, // event-Feld aus der Nachricht (falls JSON)
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct WorkerTaskSchedule {
|
||||
task: String,
|
||||
cadence_seconds: u64,
|
||||
cadence_label: String,
|
||||
next_run_latest_ts: u64,
|
||||
next_run_latest_in_seconds: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct WorkerSchedule {
|
||||
worker: String,
|
||||
tasks: Vec<WorkerTaskSchedule>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct WebSocketLog {
|
||||
entries: Vec<WebSocketLogEntry>,
|
||||
}
|
||||
|
||||
const WS_LOG_MAX_ENTRIES: usize = 50_000;
|
||||
const RIGHT_ADMIN: &str = "admin";
|
||||
const RIGHT_WORKER_SCHEDULE: &str = "worker_schedule_read";
|
||||
|
||||
async fn append_ws_log(
|
||||
log: &Arc<Mutex<WebSocketLog>>,
|
||||
@@ -281,7 +298,7 @@ async fn run_accept_loop(
|
||||
addr: String,
|
||||
running: Arc<AtomicBool>,
|
||||
tx: broadcast::Sender<String>,
|
||||
_pool: ConnectionPool,
|
||||
pool: ConnectionPool,
|
||||
registry: Arc<Mutex<ConnectionRegistry>>,
|
||||
ws_log: Arc<Mutex<WebSocketLog>>,
|
||||
tls_acceptor: Option<TlsAcceptor>,
|
||||
@@ -312,11 +329,19 @@ async fn run_accept_loop(
|
||||
let ws_log_clone = ws_log.clone();
|
||||
let tls_acceptor_clone = tls_acceptor.clone();
|
||||
|
||||
let pool_clone = pool.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Some(acc) = tls_acceptor_clone {
|
||||
match acc.accept(stream).await {
|
||||
Ok(tls_stream) => {
|
||||
handle_connection(tls_stream, peer_addr, rx, registry_clone, ws_log_clone)
|
||||
handle_connection(
|
||||
tls_stream,
|
||||
peer_addr,
|
||||
rx,
|
||||
pool_clone,
|
||||
registry_clone,
|
||||
ws_log_clone,
|
||||
)
|
||||
.await
|
||||
}
|
||||
Err(err) => {
|
||||
@@ -326,7 +351,8 @@ async fn run_accept_loop(
|
||||
}
|
||||
}
|
||||
} else {
|
||||
handle_connection(stream, peer_addr, rx, registry_clone, ws_log_clone).await;
|
||||
handle_connection(stream, peer_addr, rx, pool_clone, registry_clone, ws_log_clone)
|
||||
.await;
|
||||
}
|
||||
});
|
||||
}
|
||||
@@ -336,6 +362,7 @@ async fn handle_connection<S>(
|
||||
stream: S,
|
||||
peer_addr: SocketAddr,
|
||||
mut broker_rx: broadcast::Receiver<String>,
|
||||
pool: ConnectionPool,
|
||||
registry: Arc<Mutex<ConnectionRegistry>>,
|
||||
ws_log: Arc<Mutex<WebSocketLog>>,
|
||||
) where
|
||||
@@ -367,6 +394,7 @@ async fn handle_connection<S>(
|
||||
let user_id = Arc::new(tokio::sync::Mutex::new(Option::<String>::None));
|
||||
let user_id_for_incoming = user_id.clone();
|
||||
let user_id_for_broker = user_id.clone();
|
||||
let pool_for_incoming = pool.clone();
|
||||
let registry_for_incoming = registry.clone();
|
||||
let client_tx_incoming = client_tx.clone();
|
||||
let ws_log_for_incoming = ws_log.clone();
|
||||
@@ -446,6 +474,77 @@ async fn handle_connection<S>(
|
||||
.to_string();
|
||||
let _ = client_tx_incoming.send(payload).await;
|
||||
}
|
||||
"getWorkerSchedules" => {
|
||||
let uid_opt = {
|
||||
let guard = user_id_for_incoming.lock().await;
|
||||
guard.clone()
|
||||
};
|
||||
let allowed = uid_opt
|
||||
.as_deref()
|
||||
.map(|uid| user_can_read_worker_schedules(&pool_for_incoming, uid))
|
||||
.unwrap_or(false);
|
||||
if !allowed {
|
||||
let payload = serde_json::json!({
|
||||
"event": "getWorkerSchedulesResponse",
|
||||
"ok": false,
|
||||
"error": "forbidden"
|
||||
})
|
||||
.to_string();
|
||||
let _ = client_tx_incoming.send(payload).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let now_secs = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
let schedules = build_worker_schedule_overview(now_secs);
|
||||
let payload = serde_json::json!({
|
||||
"event": "getWorkerSchedulesResponse",
|
||||
"ok": true,
|
||||
"generated_at": now_secs,
|
||||
"schedules": schedules
|
||||
})
|
||||
.to_string();
|
||||
let _ = client_tx_incoming.send(payload).await;
|
||||
}
|
||||
"getWorkerSchedulesDetailed" => {
|
||||
let uid_opt = {
|
||||
let guard = user_id_for_incoming.lock().await;
|
||||
guard.clone()
|
||||
};
|
||||
let allowed = uid_opt
|
||||
.as_deref()
|
||||
.map(|uid| user_can_read_worker_schedules(&pool_for_incoming, uid))
|
||||
.unwrap_or(false);
|
||||
if !allowed {
|
||||
let payload = serde_json::json!({
|
||||
"event": "getWorkerSchedulesDetailedResponse",
|
||||
"ok": false,
|
||||
"error": "forbidden"
|
||||
})
|
||||
.to_string();
|
||||
let _ = client_tx_incoming.send(payload).await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let now_secs = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs();
|
||||
let schedules = build_worker_schedule_overview(now_secs);
|
||||
let runtime = get_worker_runtime_snapshot();
|
||||
let detailed = build_worker_schedule_detailed(schedules, runtime);
|
||||
|
||||
let payload = serde_json::json!({
|
||||
"event": "getWorkerSchedulesDetailedResponse",
|
||||
"ok": true,
|
||||
"generated_at": now_secs,
|
||||
"schedules": detailed
|
||||
})
|
||||
.to_string();
|
||||
let _ = client_tx_incoming.send(payload).await;
|
||||
}
|
||||
_ => {
|
||||
// Unbekannte Events ignorieren
|
||||
}
|
||||
@@ -604,3 +703,141 @@ async fn handle_connection<S>(
|
||||
println!("[WebSocketServer] Verbindung geschlossen: {}", peer_addr);
|
||||
}
|
||||
|
||||
fn user_can_read_worker_schedules(pool: &ConnectionPool, user_id_raw: &str) -> bool {
|
||||
let uid = match user_id_raw.parse::<i32>() {
|
||||
Ok(v) if v > 0 => v,
|
||||
_ => return false,
|
||||
};
|
||||
|
||||
let mut conn = match pool.get() {
|
||||
Ok(c) => c,
|
||||
Err(_) => return false,
|
||||
};
|
||||
|
||||
let sql = r#"
|
||||
SELECT EXISTS (
|
||||
SELECT 1
|
||||
FROM community.user_right ur
|
||||
JOIN "type".user_right tr ON tr.id = ur.right_type_id
|
||||
WHERE ur.user_id = $1::int
|
||||
AND LOWER(COALESCE(tr.title, '')) IN ($2::text, $3::text)
|
||||
) AS allowed_direct,
|
||||
EXISTS (
|
||||
SELECT 1
|
||||
FROM falukant_data.falukant_user fu
|
||||
JOIN community.user_right ur ON ur.user_id = fu.user_id
|
||||
JOIN "type".user_right tr ON tr.id = ur.right_type_id
|
||||
WHERE fu.id = $1::int
|
||||
AND LOWER(COALESCE(tr.title, '')) IN ($2::text, $3::text)
|
||||
) AS allowed_falukant;
|
||||
"#;
|
||||
|
||||
if conn.prepare("ws_can_read_worker_schedules", sql).is_err() {
|
||||
return false;
|
||||
}
|
||||
let admin = RIGHT_ADMIN.to_string();
|
||||
let allowed_right = RIGHT_WORKER_SCHEDULE.to_string();
|
||||
let rows = match conn.execute(
|
||||
"ws_can_read_worker_schedules",
|
||||
&[&uid, &admin, &allowed_right],
|
||||
) {
|
||||
Ok(r) => r,
|
||||
Err(_) => return false,
|
||||
};
|
||||
let Some(row) = rows.first() else {
|
||||
return false;
|
||||
};
|
||||
let direct = row
|
||||
.get("allowed_direct")
|
||||
.map(|v| v == "true" || v == "t" || v == "1")
|
||||
.unwrap_or(false);
|
||||
let falukant = row
|
||||
.get("allowed_falukant")
|
||||
.map(|v| v == "true" || v == "t" || v == "1")
|
||||
.unwrap_or(false);
|
||||
direct || falukant
|
||||
}
|
||||
|
||||
fn build_worker_schedule_overview(now_secs: u64) -> Vec<WorkerSchedule> {
|
||||
fn task(name: &str, cadence_seconds: u64, now_secs: u64) -> WorkerTaskSchedule {
|
||||
WorkerTaskSchedule {
|
||||
task: name.to_string(),
|
||||
cadence_seconds,
|
||||
cadence_label: format!("{}s interval", cadence_seconds),
|
||||
next_run_latest_ts: now_secs.saturating_add(cadence_seconds),
|
||||
next_run_latest_in_seconds: cadence_seconds,
|
||||
}
|
||||
}
|
||||
|
||||
vec![
|
||||
WorkerSchedule {
|
||||
worker: "DirectorWorker".to_string(),
|
||||
tasks: vec![
|
||||
task("salary_payout", 24 * 60 * 60, now_secs),
|
||||
task("satisfaction_or_auto_income_adjust", 24 * 60 * 60, now_secs),
|
||||
task("resignation_check", 24 * 60 * 60, now_secs),
|
||||
],
|
||||
},
|
||||
WorkerSchedule {
|
||||
worker: "ValueRecalculationWorker".to_string(),
|
||||
tasks: vec![
|
||||
task("calculate_product_knowledge", 24 * 60 * 60, now_secs),
|
||||
task("calculate_regional_sell_price", 24 * 60 * 60, now_secs),
|
||||
task("calculate_hourly_price_recalculation", 60 * 60, now_secs),
|
||||
],
|
||||
},
|
||||
WorkerSchedule {
|
||||
worker: "FalukantFamilyWorker".to_string(),
|
||||
tasks: vec![
|
||||
task("family_daily", 24 * 60 * 60, now_secs),
|
||||
task("family_monthly", 24 * 60 * 60, now_secs),
|
||||
task("lover_installment_and_servants_month_slice", 2 * 60 * 60, now_secs),
|
||||
],
|
||||
},
|
||||
WorkerSchedule {
|
||||
worker: "PoliticsWorker".to_string(),
|
||||
tasks: vec![
|
||||
task("daily_politics", 24 * 60 * 60, now_secs),
|
||||
task("auto_approve_church_applications", 60 * 60, now_secs),
|
||||
],
|
||||
},
|
||||
WorkerSchedule {
|
||||
worker: "UserCharacterWorker".to_string(),
|
||||
tasks: vec![
|
||||
task("hourly_tasks", 60 * 60, now_secs),
|
||||
task("hourly_pregnancies", 60 * 60, now_secs),
|
||||
task("daily_marriage_fertility", 24 * 60 * 60, now_secs),
|
||||
],
|
||||
},
|
||||
WorkerSchedule {
|
||||
worker: "HouseWorker".to_string(),
|
||||
tasks: vec![task("daily_house_updates", 24 * 60 * 60, now_secs)],
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn build_worker_schedule_detailed(
|
||||
schedules: Vec<WorkerSchedule>,
|
||||
runtime: Vec<WorkerRuntimeSnapshot>,
|
||||
) -> Vec<serde_json::Value> {
|
||||
let mut by_worker: HashMap<String, WorkerRuntimeSnapshot> = HashMap::new();
|
||||
for r in runtime {
|
||||
by_worker.insert(r.worker.clone(), r);
|
||||
}
|
||||
|
||||
schedules
|
||||
.into_iter()
|
||||
.map(|ws| {
|
||||
let rt = by_worker.get(&ws.worker);
|
||||
serde_json::json!({
|
||||
"worker": ws.worker,
|
||||
"running_worker": rt.map(|r| r.running_worker).unwrap_or(false),
|
||||
"running_watchdog": rt.map(|r| r.running_watchdog).unwrap_or(false),
|
||||
"current_step": rt.map(|r| r.current_step.clone()).unwrap_or_else(|| "unknown".to_string()),
|
||||
"last_step_change_ts": rt.map(|r| r.last_step_change_ts).unwrap_or(0),
|
||||
"tasks": ws.tasks
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
|
||||
@@ -2,9 +2,12 @@ use crate::db::{ConnectionPool, DbError};
|
||||
use crate::worker::sql::{QUERY_UPDATE_MONEY, QUERY_GET_MONEY};
|
||||
use crate::message_broker::MessageBroker;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::OnceLock;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
pub trait Worker: Send {
|
||||
fn start_worker_thread(&mut self);
|
||||
@@ -12,6 +15,66 @@ pub trait Worker: Send {
|
||||
fn enable_watchdog(&mut self);
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct WorkerRuntimeSnapshot {
|
||||
pub worker: String,
|
||||
pub current_step: String,
|
||||
pub last_step_change_ts: u64,
|
||||
pub running_worker: bool,
|
||||
pub running_watchdog: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct WorkerRuntimeEntry {
|
||||
current_step: String,
|
||||
last_step_change_ts: u64,
|
||||
running_worker: bool,
|
||||
running_watchdog: bool,
|
||||
}
|
||||
|
||||
static WORKER_RUNTIME_REGISTRY: OnceLock<Mutex<HashMap<String, WorkerRuntimeEntry>>> = OnceLock::new();
|
||||
|
||||
fn runtime_registry() -> &'static Mutex<HashMap<String, WorkerRuntimeEntry>> {
|
||||
WORKER_RUNTIME_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
|
||||
}
|
||||
|
||||
fn now_unix_secs() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
fn upsert_runtime_entry(name: &str, f: impl FnOnce(&mut WorkerRuntimeEntry)) {
|
||||
if let Ok(mut reg) = runtime_registry().lock() {
|
||||
let entry = reg.entry(name.to_string()).or_insert_with(|| WorkerRuntimeEntry {
|
||||
current_step: format!("{name}: idle"),
|
||||
last_step_change_ts: now_unix_secs(),
|
||||
running_worker: false,
|
||||
running_watchdog: false,
|
||||
});
|
||||
f(entry);
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_worker_runtime_snapshot() -> Vec<WorkerRuntimeSnapshot> {
|
||||
let Ok(reg) = runtime_registry().lock() else {
|
||||
return Vec::new();
|
||||
};
|
||||
let mut out: Vec<WorkerRuntimeSnapshot> = reg
|
||||
.iter()
|
||||
.map(|(worker, e)| WorkerRuntimeSnapshot {
|
||||
worker: worker.clone(),
|
||||
current_step: e.current_step.clone(),
|
||||
last_step_change_ts: e.last_step_change_ts,
|
||||
running_worker: e.running_worker,
|
||||
running_watchdog: e.running_watchdog,
|
||||
})
|
||||
.collect();
|
||||
out.sort_by(|a, b| a.worker.cmp(&b.worker));
|
||||
out
|
||||
}
|
||||
|
||||
pub(crate) struct WorkerState {
|
||||
pub(crate) running_worker: AtomicBool,
|
||||
pub(crate) running_watchdog: AtomicBool,
|
||||
@@ -44,6 +107,10 @@ pub struct BaseWorker {
|
||||
|
||||
impl BaseWorker {
|
||||
pub fn new(name: &str, pool: ConnectionPool, broker: MessageBroker) -> Self {
|
||||
upsert_runtime_entry(name, |entry| {
|
||||
entry.current_step = format!("{name}: idle");
|
||||
entry.last_step_change_ts = now_unix_secs();
|
||||
});
|
||||
Self {
|
||||
name: name.to_string(),
|
||||
pool,
|
||||
@@ -55,9 +122,15 @@ impl BaseWorker {
|
||||
}
|
||||
|
||||
pub fn set_current_step<S: Into<String>>(&self, step: S) {
|
||||
let new_step = step.into();
|
||||
if let Ok(mut guard) = self.state.current_step.lock() {
|
||||
*guard = step.into();
|
||||
*guard = new_step.clone();
|
||||
}
|
||||
let worker_name = self.name.clone();
|
||||
upsert_runtime_entry(&worker_name, |entry| {
|
||||
entry.current_step = new_step;
|
||||
entry.last_step_change_ts = now_unix_secs();
|
||||
});
|
||||
}
|
||||
|
||||
pub(crate) fn start_worker_with_loop<F>(&mut self, loop_fn: F)
|
||||
@@ -68,6 +141,11 @@ impl BaseWorker {
|
||||
eprintln!("[{}] Worker thread already running, skipping start.", self.name);
|
||||
return;
|
||||
}
|
||||
let worker_name = self.name.clone();
|
||||
upsert_runtime_entry(&worker_name, |entry| {
|
||||
entry.running_worker = true;
|
||||
entry.last_step_change_ts = now_unix_secs();
|
||||
});
|
||||
|
||||
let state = Arc::clone(&self.state);
|
||||
|
||||
@@ -80,6 +158,11 @@ impl BaseWorker {
|
||||
// Erst den Worker stoppen, dann auch den Watchdog beenden, damit keine
|
||||
// Hintergrund-Threads weiterlaufen.
|
||||
self.state.running_worker.store(false, Ordering::Relaxed);
|
||||
let worker_name = self.name.clone();
|
||||
upsert_runtime_entry(&worker_name, |entry| {
|
||||
entry.running_worker = false;
|
||||
entry.last_step_change_ts = now_unix_secs();
|
||||
});
|
||||
self.stop_watchdog();
|
||||
if let Some(handle) = self.worker_thread.take() {
|
||||
let _ = handle.join();
|
||||
@@ -95,6 +178,11 @@ impl BaseWorker {
|
||||
eprintln!("[{}] Watchdog already enabled, skipping.", self.name);
|
||||
return;
|
||||
}
|
||||
let worker_name = self.name.clone();
|
||||
upsert_runtime_entry(&worker_name, |entry| {
|
||||
entry.running_watchdog = true;
|
||||
entry.last_step_change_ts = now_unix_secs();
|
||||
});
|
||||
|
||||
let state = Arc::clone(&self.state);
|
||||
|
||||
@@ -124,6 +212,11 @@ impl BaseWorker {
|
||||
|
||||
pub(crate) fn stop_watchdog(&mut self) {
|
||||
self.state.running_watchdog.store(false, Ordering::Relaxed);
|
||||
let worker_name = self.name.clone();
|
||||
upsert_runtime_entry(&worker_name, |entry| {
|
||||
entry.running_watchdog = false;
|
||||
entry.last_step_change_ts = now_unix_secs();
|
||||
});
|
||||
if let Some(handle) = self.watchdog_thread.take() {
|
||||
let _ = handle.join();
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ mod notify;
|
||||
mod sql;
|
||||
|
||||
pub use base::Worker;
|
||||
pub use base::{get_worker_runtime_snapshot, WorkerRuntimeSnapshot};
|
||||
pub use crate::db::ConnectionPool;
|
||||
pub use character_creation::CharacterCreationWorker;
|
||||
pub use director::DirectorWorker;
|
||||
|
||||
Reference in New Issue
Block a user