Files
yourpart-daemon/src/worker/underground.rs

987 lines
31 KiB
Rust

use crate::db::{ConnectionPool, DbError, Row, Rows};
use crate::message_broker::MessageBroker;
use rand::distributions::{Distribution, Uniform};
use rand::seq::SliceRandom;
use rand::Rng;
use serde_json::json;
use serde_json::Value as Json;
use std::cmp::{max, min};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use super::base::{BaseWorker, Worker, WorkerState};
pub struct UndergroundWorker {
base: BaseWorker,
}
#[derive(Debug, Clone)]
struct HouseConditions {
id: i32,
roof: i32,
floor: i32,
wall: i32,
windowc: i32,
}
// Query-Konstanten (1:1 aus der C++-Version übernommen)
const Q_SELECT_BY_PERFORMER: &str = r#"
SELECT u.id,
t.tr AS underground_type,
u.performer_id,
u.victim_id,
to_char(u.created_at,'YYYY-MM-DD\"T\"HH24:MI:SS\"Z\"') AS created_at,
COALESCE(u.parameters::text,'{}') AS parameters,
COALESCE(u.result::text,'null') AS result_text
FROM falukant_data.underground u
JOIN falukant_type.underground t ON t.tr = u.underground_type_id
WHERE u.performer_id = $1
ORDER BY u.created_at DESC;
"#;
const Q_SELECT_PENDING: &str = r#"
SELECT u.id,
t.tr AS underground_type,
u.performer_id,
u.victim_id,
COALESCE(u.parameters::text,'{}') AS parameters
FROM falukant_data.underground u
JOIN falukant_type.underground t ON t.tr = u.underground_type_id
WHERE u.result IS NULL
AND u.created_at <= NOW() - INTERVAL '1 day'
ORDER BY u.created_at ASC
LIMIT 200;
"#;
const Q_UPDATE_RESULT: &str = r#"
UPDATE falukant_data.underground
SET result = $2::jsonb,
updated_at = NOW()
WHERE id = $1;
"#;
const Q_SELECT_CHAR_USER: &str = r#"
SELECT user_id
FROM falukant_data.character
WHERE id = $1;
"#;
const Q_SELECT_HOUSE_BY_USER: &str = r#"
SELECT id, roof_condition, floor_condition, wall_condition, window_condition
FROM falukant_data.user_house
WHERE user_id = $1
LIMIT 1;
"#;
const Q_UPDATE_HOUSE: &str = r#"
UPDATE falukant_data.user_house
SET roof_condition = $2,
floor_condition = $3,
wall_condition = $4,
window_condition = $5
WHERE id = $1;
"#;
const Q_SELECT_STOCK_BY_BRANCH: &str = r#"
SELECT id, stock_type_id, quantity
FROM falukant_data.stock
WHERE branch_id = $1
ORDER BY quantity DESC;
"#;
const Q_UPDATE_STOCK_QTY: &str = r#"
UPDATE falukant_data.stock
SET quantity = $2
WHERE id = $1;
"#;
const Q_SELECT_CHAR_HEALTH: &str = r#"
SELECT health
FROM falukant_data.character
WHERE id = $1;
"#;
const Q_UPDATE_CHAR_HEALTH: &str = r#"
UPDATE falukant_data.character
SET health = $2,
updated_at = NOW()
WHERE id = $1;
"#;
const Q_SELECT_FALUKANT_USER: &str = r#"
SELECT id,
money,
COALESCE(main_branch_region_id, 0) AS main_branch_region_id
FROM falukant_data.falukant_user
WHERE user_id = $1
LIMIT 1;
"#;
// Query für Geldänderungen (lokale Variante von BaseWorker::change_falukant_user_money)
const QUERY_UPDATE_MONEY: &str = r#"
SELECT falukant_data.update_money($1, $2, $3);
"#;
impl UndergroundWorker {
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
Self {
base: BaseWorker::new("UndergroundWorker", pool, broker),
}
}
fn run_loop(pool: ConnectionPool, broker: MessageBroker, state: Arc<WorkerState>) {
while state.running_worker.load(Ordering::Relaxed) {
if let Err(err) = Self::tick(&pool, &broker) {
eprintln!("[UndergroundWorker] Fehler in tick: {err}");
}
// Entspricht ~60-Sekunden-Loop mit 1-Sekunden-Schritten
for _ in 0..60 {
if !state.running_worker.load(Ordering::Relaxed) {
break;
}
std::thread::sleep(Duration::from_secs(1));
}
}
}
fn tick(pool: &ConnectionPool, broker: &MessageBroker) -> Result<(), DbError> {
let rows = Self::fetch_pending(pool)?;
for row in rows {
let id = match row.get("id").and_then(|v| v.parse::<i32>().ok()) {
Some(id) => id,
None => continue,
};
match Self::execute_row(pool, &row) {
Ok(res) => {
Self::update_result(pool, id, &res)?;
let event = json!({
"event": "underground_processed",
"id": id,
"type": row.get("underground_type").cloned().unwrap_or_default()
});
broker.publish(event.to_string());
}
Err(err) => {
let error_res = json!({
"status": "error",
"message": err.to_string()
});
let _ = Self::update_result(pool, id, &error_res);
}
}
}
Ok(())
}
fn fetch_pending(pool: &ConnectionPool) -> Result<Rows, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_select_pending", Q_SELECT_PENDING)?;
conn.execute("ug_select_pending", &[])
}
fn execute_row(pool: &ConnectionPool, r: &Row) -> Result<Json, DbError> {
let performer_id = parse_i32(r, "performer_id", -1);
let victim_id = parse_i32(r, "victim_id", -1);
let task_type = r.get("underground_type").cloned().unwrap_or_default();
let params = r.get("parameters").cloned().unwrap_or_else(|| "{}".into());
Ok(Self::handle_task(pool, &task_type, performer_id, victim_id, &params)?)
}
fn handle_task(
pool: &ConnectionPool,
task_type: &str,
performer_id: i32,
victim_id: i32,
params_json: &str,
) -> Result<Json, DbError> {
let p: Json = serde_json::from_str(params_json).unwrap_or_else(|_| json!({}));
match task_type {
"spyin" => Self::spy_in(pool, performer_id, victim_id, &p),
"assassin" => Self::assassin(pool, performer_id, victim_id, &p),
"sabotage" => Self::sabotage(pool, performer_id, victim_id, &p),
"corrupt_politician" => Ok(Self::corrupt_politician(performer_id, victim_id, &p)),
"rob" => Self::rob(pool, performer_id, victim_id, &p),
_ => Ok(json!({
"status": "unknown_type",
"type": task_type
})),
}
}
fn spy_in(
pool: &ConnectionPool,
performer_id: i32,
victim_id: i32,
p: &Json,
) -> Result<Json, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_select_by_performer", Q_SELECT_BY_PERFORMER)?;
let rows = conn.execute("ug_select_by_performer", &[&victim_id])?;
let mut activities = Vec::new();
for r in rows {
let params: Json = r
.get("parameters")
.and_then(|s| serde_json::from_str(s).ok())
.unwrap_or_else(|| json!({}));
let result_text = r.get("result_text").cloned().unwrap_or_else(|| "null".into());
let result: Json = serde_json::from_str(&result_text).unwrap_or(Json::Null);
let mut status = "pending".to_string();
if let Json::Object(obj) = &result {
if let Some(Json::String(s)) = obj.get("status") {
status = s.clone();
} else {
status = "done".to_string();
}
}
let activity = json!({
"id": parse_i32(&r, "id", -1),
"type": r.get("underground_type").cloned().unwrap_or_default(),
"performed_by": parse_i32(&r, "performer_id", -1),
"victim_id": parse_i32(&r, "victim_id", -1),
"created_at": r.get("created_at").cloned().unwrap_or_default(),
"parameters": params,
"result": result,
"status": status
});
activities.push(activity);
}
Ok(json!({
"status": "success",
"action": "spyin",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"victim_illegal_activity_count": activities.len(),
"victim_illegal_activities": activities
}))
}
fn assassin(
pool: &ConnectionPool,
performer_id: i32,
victim_id: i32,
p: &Json,
) -> Result<Json, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_select_char_health", Q_SELECT_CHAR_HEALTH)?;
conn.prepare("ug_update_char_health", Q_UPDATE_CHAR_HEALTH)?;
let rows = conn.execute("ug_select_char_health", &[&victim_id])?;
if rows.is_empty() {
return Ok(json!({
"status": "error",
"action": "assassin",
"performer_id": performer_id,
"victim_id": victim_id,
"message": "victim_not_found",
"details": p
}));
}
let current = parse_i32(&rows[0], "health", 0);
let mut rng = rand::thread_rng();
let dist = Uniform::from(0..=current.max(0));
let new_health = dist.sample(&mut rng);
conn.execute("ug_update_char_health", &[&victim_id, &new_health])?;
Ok(json!({
"status": "success",
"action": "assassin",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"previous_health": current,
"new_health": new_health,
"reduced_by": current - new_health
}))
}
fn sabotage(
pool: &ConnectionPool,
performer_id: i32,
victim_id: i32,
p: &Json,
) -> Result<Json, DbError> {
let target = p
.get("target")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
match target.as_str() {
"house" => Self::sabotage_house(pool, performer_id, victim_id, p),
"storage" => Self::sabotage_storage(pool, performer_id, victim_id, p),
_ => Ok(json!({
"status": "error",
"action": "sabotage",
"message": "unknown_target",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p
})),
}
}
fn get_user_id_for_character(pool: &ConnectionPool, character_id: i32) -> Result<i32, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_select_char_user", Q_SELECT_CHAR_USER)?;
let rows = conn.execute("ug_select_char_user", &[&character_id])?;
Ok(rows
.get(0)
.and_then(|r| r.get("user_id"))
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(-1))
}
fn get_house_by_user(pool: &ConnectionPool, user_id: i32) -> Result<Option<HouseConditions>, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_select_house_by_user", Q_SELECT_HOUSE_BY_USER)?;
let rows = conn.execute("ug_select_house_by_user", &[&user_id])?;
if rows.is_empty() {
return Ok(None);
}
let r = &rows[0];
Ok(Some(HouseConditions {
id: parse_i32(r, "id", -1),
roof: parse_i32(r, "roof_condition", 0),
floor: parse_i32(r, "floor_condition", 0),
wall: parse_i32(r, "wall_condition", 0),
windowc: parse_i32(r, "window_condition", 0),
}))
}
fn update_house(pool: &ConnectionPool, h: &HouseConditions) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_update_house", Q_UPDATE_HOUSE)?;
let roof = h.roof.clamp(0, 100);
let floor = h.floor.clamp(0, 100);
let wall = h.wall.clamp(0, 100);
let windowc = h.windowc.clamp(0, 100);
conn.execute("ug_update_house", &[&h.id, &roof, &floor, &wall, &windowc])?;
Ok(())
}
fn sabotage_house(
pool: &ConnectionPool,
performer_id: i32,
victim_id: i32,
p: &Json,
) -> Result<Json, DbError> {
let user_id = Self::get_user_id_for_character(pool, victim_id)?;
if user_id < 0 {
return Ok(json!({
"status": "error",
"action": "sabotage",
"target": "house",
"message": "victim_not_found",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p
}));
}
let mut house = match Self::get_house_by_user(pool, user_id)? {
Some(h) => h,
None => {
return Ok(json!({
"status": "error",
"action": "sabotage",
"target": "house",
"message": "house_not_found",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p
}))
}
};
// Erlaubte Felder aus Params
let mut allow: Vec<String> = Vec::new();
if let Some(conds) = p.get("conditions").and_then(|v| v.as_array()) {
for s in conds {
if let Some(name) = s.as_str() {
allow.push(name.to_string());
}
}
}
// Statt Referenzen auf Felder zu speichern, arbeiten wir über Indizes,
// um Borrowing-Probleme zu vermeiden.
let all_fields = ["roof_condition", "floor_condition", "wall_condition", "window_condition"];
let candidate_indices: Vec<usize> = (0..all_fields.len())
.filter(|&idx| {
allow.is_empty()
|| allow
.iter()
.any(|name| name == all_fields[idx])
})
.collect();
if candidate_indices.is_empty() {
return Ok(json!({
"status": "error",
"action": "sabotage",
"target": "house",
"message": "no_conditions_selected",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p
}));
}
let k = random_int(1, candidate_indices.len() as i32) as usize;
let picks = random_indices(candidate_indices.len(), k);
let mut changed = Vec::new();
for i in picks {
let idx = candidate_indices[i];
let (name, value_ref) = match idx {
0 => ("roof_condition", &mut house.roof),
1 => ("floor_condition", &mut house.floor),
2 => ("wall_condition", &mut house.wall),
3 => ("window_condition", &mut house.windowc),
_ => continue,
};
if *value_ref > 0 {
let red = random_int(1, *value_ref);
*value_ref = (*value_ref - red).clamp(0, 100);
}
changed.push(name.to_string());
}
Self::update_house(pool, &house)?;
Ok(json!({
"status": "success",
"action": "sabotage",
"target": "house",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"changed_conditions": changed,
"new_conditions": {
"roof_condition": house.roof,
"floor_condition": house.floor,
"wall_condition": house.wall,
"window_condition": house.windowc
}
}))
}
fn select_stock_by_branch(pool: &ConnectionPool, branch_id: i32) -> Result<Rows, DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_select_stock_by_branch", Q_SELECT_STOCK_BY_BRANCH)?;
conn.execute("ug_select_stock_by_branch", &[&branch_id])
}
fn filter_by_stock_types(rows: &Rows, allowed: &[i32]) -> Rows {
if allowed.is_empty() {
return rows.clone();
}
let mut out = Vec::new();
for r in rows {
if let Some(t) = r.get("stock_type_id").and_then(|v| v.parse::<i32>().ok()) {
if allowed.contains(&t) {
out.push(r.clone());
}
}
}
out
}
fn update_stock_qty(pool: &ConnectionPool, id: i32, qty: i64) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_update_stock_qty", Q_UPDATE_STOCK_QTY)?;
// beide Parameter explizit als ToSql-Traitobjekte typisieren, um Mischtypen zu erlauben
use postgres::types::ToSql;
let p1: &(dyn ToSql + Sync) = &id;
let p2: &(dyn ToSql + Sync) = &qty;
conn.execute("ug_update_stock_qty", &[p1, p2])?;
Ok(())
}
fn sabotage_storage(
pool: &ConnectionPool,
performer_id: i32,
victim_id: i32,
p: &Json,
) -> Result<Json, DbError> {
let branch_id = match p.get("branch_id").and_then(|v| v.as_i64()) {
Some(id) => id as i32,
None => {
return Ok(json!({
"status": "error",
"action": "sabotage",
"target": "storage",
"message": "branch_id_required",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p
}))
}
};
let mut allowed = Vec::new();
if let Some(arr) = p.get("stock_type_ids").and_then(|v| v.as_array()) {
for v in arr {
if let Some(id) = v.as_i64() {
allowed.push(id as i32);
}
}
}
let rows_all = Self::select_stock_by_branch(pool, branch_id)?;
let mut rows = Self::filter_by_stock_types(&rows_all, &allowed);
if rows.is_empty() {
return Ok(json!({
"status": "success",
"action": "sabotage",
"target": "storage",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"removed_total": 0,
"affected_rows": []
}));
}
let mut total: i64 = 0;
for r in &rows {
if let Some(q) = r.get("quantity").and_then(|v| v.parse::<i64>().ok()) {
total += q;
}
}
if total <= 0 {
return Ok(json!({
"status": "success",
"action": "sabotage",
"target": "storage",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"removed_total": 0,
"affected_rows": []
}));
}
let cap = total / 4;
if cap <= 0 {
return Ok(json!({
"status": "success",
"action": "sabotage",
"target": "storage",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"removed_total": 0,
"affected_rows": []
}));
}
let mut rng = rand::thread_rng();
let mut to_remove = random_ll(1, cap);
rows.shuffle(&mut rng);
let mut affected = Vec::new();
for r in rows {
if to_remove == 0 {
break;
}
let id = parse_i32(&r, "id", -1);
let q = r
.get("quantity")
.and_then(|v| v.parse::<i64>().ok())
.unwrap_or(0);
if q <= 0 {
continue;
}
let take = random_ll(1, min(q, to_remove));
let newq = q - take;
Self::update_stock_qty(pool, id, newq)?;
to_remove -= take;
let entry = json!({
"id": id,
"stock_type_id": parse_i32(&r, "stock_type_id", -1),
"previous_quantity": q,
"new_quantity": newq,
"removed": take
});
affected.push(entry);
}
let removed_total: i64 = affected
.iter()
.filter_map(|a| a.get("removed").and_then(|v| v.as_i64()))
.sum();
Ok(json!({
"status": "success",
"action": "sabotage",
"target": "storage",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"removed_total": removed_total,
"affected_rows": affected
}))
}
fn corrupt_politician(
performer_id: i32,
victim_id: i32,
p: &Json,
) -> Json {
json!({
"status": "success",
"action": "corrupt_politician",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p
})
}
fn rob(
pool: &ConnectionPool,
performer_id: i32,
victim_id: i32,
p: &Json,
) -> Result<Json, DbError> {
let user_id = Self::get_user_id_for_character(pool, victim_id)?;
if user_id < 0 {
return Ok(json!({
"status": "error",
"action": "rob",
"message": "victim_not_found",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p
}));
}
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_select_falukant_user", Q_SELECT_FALUKANT_USER)?;
let fu = conn.execute("ug_select_falukant_user", &[&user_id])?;
if fu.is_empty() {
return Ok(json!({
"status": "error",
"action": "rob",
"message": "falukant_user_not_found",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p
}));
}
let falukant_user_id = parse_i32(&fu[0], "id", -1);
let money = fu[0]
.get("money")
.and_then(|v| v.parse::<f64>().ok())
.unwrap_or(0.0);
let default_branch = parse_i32(&fu[0], "main_branch_region_id", 0);
let steal_goods = random_int(0, 1) == 1;
if steal_goods {
let branch_id = p
.get("branch_id")
.and_then(|v| v.as_i64())
.map(|v| v as i32)
.unwrap_or(default_branch);
if branch_id <= 0 {
return Ok(json!({
"status": "success",
"action": "rob",
"mode": "goods",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"removed_total": 0,
"affected_rows": []
}));
}
let rows_all = Self::select_stock_by_branch(pool, branch_id)?;
let mut rows = rows_all;
if rows.is_empty() {
return Ok(json!({
"status": "success",
"action": "rob",
"mode": "goods",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"removed_total": 0,
"affected_rows": []
}));
}
let mut total: i64 = 0;
for r in &rows {
if let Some(q) = r.get("quantity").and_then(|v| v.parse::<i64>().ok()) {
total += q;
}
}
if total <= 0 {
return Ok(json!({
"status": "success",
"action": "rob",
"mode": "goods",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"removed_total": 0,
"affected_rows": []
}));
}
let cap = max(1_i64, total / 2);
let mut to_remove = random_ll(1, cap);
let mut rng = rand::thread_rng();
rows.shuffle(&mut rng);
let mut affected = Vec::new();
for r in rows {
if to_remove == 0 {
break;
}
let id = parse_i32(&r, "id", -1);
let q = r
.get("quantity")
.and_then(|v| v.parse::<i64>().ok())
.unwrap_or(0);
if q <= 0 {
continue;
}
let take = random_ll(1, min(q, to_remove));
let newq = q - take;
Self::update_stock_qty(pool, id, newq)?;
to_remove -= take;
affected.push(json!({
"id": id,
"stock_type_id": parse_i32(&r, "stock_type_id", -1),
"previous_quantity": q,
"new_quantity": newq,
"removed": take
}));
}
let removed: i64 = affected
.iter()
.filter_map(|a| a.get("removed").and_then(|v| v.as_i64()))
.sum();
Ok(json!({
"status": "success",
"action": "rob",
"mode": "goods",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"removed_total": removed,
"affected_rows": affected
}))
} else {
if money <= 0.0 {
return Ok(json!({
"status": "success",
"action": "rob",
"mode": "money",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"stolen": 0.0,
"balance_before": 0.0,
"balance_after": 0.0
}));
}
let rate = random_double(0.0, 0.18);
let mut amount = (money * rate * 100.0).round() / 100.0;
if amount < 0.01 {
amount = 0.01;
}
if amount > money {
amount = money;
}
let _msg = json!({
"event": "money_changed",
"reason": "robbery",
"delta": -amount,
"performer_id": performer_id,
"victim_id": victim_id
});
if let Err(err) =
change_falukant_user_money(pool, falukant_user_id, -amount, "robbery")
{
eprintln!(
"[UndergroundWorker] Fehler bei change_falukant_user_money: {err}"
);
}
// Event manuell publizieren
// (BaseWorker kümmert sich aktuell nur um die DB-Änderung)
// Hinweis: Wir haben keinen direkten Zugriff auf broker hier, daher wird das
// Event nur im Rückgabe-JSON signalisiert.
let after = ((money - amount) * 100.0).round() / 100.0;
Ok(json!({
"status": "success",
"action": "rob",
"mode": "money",
"performer_id": performer_id,
"victim_id": victim_id,
"details": p,
"stolen": amount,
"rate": rate,
"balance_before": money,
"balance_after": after
}))
}
}
fn update_result(pool: &ConnectionPool, id: i32, result: &Json) -> Result<(), DbError> {
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_update_result", Q_UPDATE_RESULT)?;
let result_text = result.to_string();
conn.execute("ug_update_result", &[&id, &result_text])?;
Ok(())
}
}
impl Worker for UndergroundWorker {
fn start_worker_thread(&mut self) {
let pool = self.base.pool.clone();
let broker = self.base.broker.clone();
self.base
.start_worker_with_loop(move |state: Arc<WorkerState>| {
UndergroundWorker::run_loop(pool.clone(), broker.clone(), state);
});
}
fn stop_worker_thread(&mut self) {
self.base.stop_worker();
}
fn enable_watchdog(&mut self) {
self.base.start_watchdog();
}
}
// Hilfsfunktionen für Zufall und Parsing
fn random_int(lo: i32, hi: i32) -> i32 {
let mut rng = rand::thread_rng();
rng.gen_range(lo..=hi)
}
fn random_ll(lo: i64, hi: i64) -> i64 {
let mut rng = rand::thread_rng();
rng.gen_range(lo..=hi)
}
fn random_indices(n: usize, k: usize) -> Vec<usize> {
let mut idx: Vec<usize> = (0..n).collect();
let mut rng = rand::thread_rng();
idx.shuffle(&mut rng);
if k < idx.len() {
idx.truncate(k);
}
idx
}
fn random_double(lo: f64, hi: f64) -> f64 {
let mut rng = rand::thread_rng();
rng.gen_range(lo..hi)
}
fn parse_i32(row: &Row, key: &str, default: i32) -> i32 {
row.get(key)
.and_then(|v| v.parse::<i32>().ok())
.unwrap_or(default)
}
fn change_falukant_user_money(
pool: &ConnectionPool,
falukant_user_id: i32,
money_change: f64,
action: &str,
) -> Result<(), DbError> {
use postgres::types::ToSql;
let mut conn = pool
.get()
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
conn.prepare("ug_update_money", QUERY_UPDATE_MONEY)?;
let p1: &(dyn ToSql + Sync) = &falukant_user_id;
let p2: &(dyn ToSql + Sync) = &money_change;
let p3: &(dyn ToSql + Sync) = &action;
conn.execute("ug_update_money", &[p1, p2, p3])?;
// Best-effort insert into money_history for UI visibility
let money_str = format!("{:.2}", money_change);
fn escape_sql_literal(s: &str) -> String { s.replace('\'', "''") }
let escaped_action = escape_sql_literal(action);
let history_sql = format!(
"INSERT INTO falukant_log.money_history (user_id, change, action, created_at) VALUES ({uid}, {money}::numeric, '{act}', NOW());",
uid = falukant_user_id,
money = money_str,
act = escaped_action
);
if let Err(err) = conn.query(&history_sql) {
eprintln!(
"[UndergroundWorker] Warning: inserting money_history failed for user {}: {}",
falukant_user_id, err
);
}
Ok(())
}