620 lines
18 KiB
Rust
620 lines
18 KiB
Rust
use crate::db::{ConnectionPool, DbError, Rows};
|
||
use crate::message_broker::MessageBroker;
|
||
use rand::distributions::{Distribution, Uniform};
|
||
use rand::rngs::StdRng;
|
||
use rand::{thread_rng, Rng, SeedableRng};
|
||
use std::collections::{HashMap, HashSet};
|
||
use std::sync::atomic::{AtomicBool, Ordering};
|
||
use std::sync::Arc;
|
||
use std::thread;
|
||
use std::time::Duration;
|
||
|
||
use super::base::{BaseWorker, Worker, WorkerState};
|
||
|
||
pub struct CharacterCreationWorker {
|
||
pub(crate) base: BaseWorker,
|
||
rng: StdRng,
|
||
dist: Uniform<i32>,
|
||
first_name_cache: HashMap<String, HashSet<i32>>,
|
||
last_name_cache: HashSet<i32>,
|
||
death_check_running: Arc<AtomicBool>,
|
||
death_thread: Option<thread::JoinHandle<()>>,
|
||
}
|
||
|
||
// SQL-Queries analog zur C++-Implementierung
|
||
const QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED: &str = r#"
|
||
SELECT created_at
|
||
FROM falukant_data."character"
|
||
WHERE user_id IS NULL
|
||
AND created_at::date = CURRENT_DATE
|
||
ORDER BY created_at DESC
|
||
LIMIT 1;
|
||
"#;
|
||
|
||
const QUERY_GET_TOWN_REGION_IDS: &str = r#"
|
||
SELECT fdr.id
|
||
FROM falukant_data.region fdr
|
||
JOIN falukant_type.region ftr ON fdr.region_type_id = ftr.id
|
||
WHERE ftr.label_tr = 'city';
|
||
"#;
|
||
|
||
const QUERY_LOAD_FIRST_NAMES: &str = r#"
|
||
SELECT id, gender
|
||
FROM falukant_predefine.firstname;
|
||
"#;
|
||
|
||
const QUERY_LOAD_LAST_NAMES: &str = r#"
|
||
SELECT id
|
||
FROM falukant_predefine.lastname;
|
||
"#;
|
||
|
||
const QUERY_INSERT_CHARACTER: &str = r#"
|
||
INSERT INTO falukant_data.character(
|
||
user_id,
|
||
region_id,
|
||
first_name,
|
||
last_name,
|
||
birthdate,
|
||
gender,
|
||
created_at,
|
||
updated_at,
|
||
title_of_nobility
|
||
) VALUES (
|
||
NULL,
|
||
$1,
|
||
$2,
|
||
$3,
|
||
NOW(),
|
||
$4,
|
||
NOW(),
|
||
NOW(),
|
||
$5
|
||
);
|
||
"#;
|
||
|
||
const QUERY_GET_ELIGIBLE_NPC_FOR_DEATH: &str = r#"
|
||
WITH aged AS (
|
||
SELECT
|
||
c.id,
|
||
(current_date - c.birthdate::date) AS age,
|
||
c.user_id
|
||
FROM
|
||
falukant_data.character c
|
||
WHERE
|
||
c.user_id IS NULL
|
||
AND (current_date - c.birthdate::date) > 60
|
||
),
|
||
always_sel AS (
|
||
SELECT *
|
||
FROM aged
|
||
WHERE age > 85
|
||
),
|
||
random_sel AS (
|
||
SELECT *
|
||
FROM aged
|
||
WHERE age <= 85
|
||
ORDER BY random()
|
||
LIMIT 10
|
||
)
|
||
SELECT *
|
||
FROM always_sel
|
||
UNION ALL
|
||
SELECT *
|
||
FROM random_sel;
|
||
"#;
|
||
|
||
const QUERY_DELETE_DIRECTOR: &str = r#"
|
||
DELETE FROM falukant_data.director
|
||
WHERE director_character_id = $1
|
||
RETURNING employer_user_id;
|
||
"#;
|
||
|
||
const QUERY_DELETE_RELATIONSHIP: &str = r#"
|
||
WITH deleted AS (
|
||
DELETE FROM falukant_data.relationship
|
||
WHERE character1_id = $1
|
||
OR character2_id = $1
|
||
RETURNING
|
||
CASE
|
||
WHEN character1_id = $1 THEN character2_id
|
||
ELSE character1_id
|
||
END AS related_character_id,
|
||
relationship_type_id
|
||
)
|
||
SELECT
|
||
c.user_id AS related_user_id
|
||
FROM deleted d
|
||
JOIN falukant_data.character c
|
||
ON c.id = d.related_character_id;
|
||
"#;
|
||
|
||
const QUERY_DELETE_CHILD_RELATION: &str = r#"
|
||
WITH deleted AS (
|
||
DELETE FROM falukant_data.child_relation
|
||
WHERE child_character_id = $1
|
||
RETURNING
|
||
father_character_id,
|
||
mother_character_id
|
||
)
|
||
SELECT
|
||
cf.user_id AS father_user_id,
|
||
cm.user_id AS mother_user_id
|
||
FROM deleted d
|
||
JOIN falukant_data.character cf
|
||
ON cf.id = d.father_character_id
|
||
JOIN falukant_data.character cm
|
||
ON cm.id = d.mother_character_id;
|
||
"#;
|
||
|
||
const QUERY_INSERT_NOTIFICATION: &str = r#"
|
||
INSERT INTO falukant_log.notification (
|
||
user_id,
|
||
tr,
|
||
shown,
|
||
created_at,
|
||
updated_at
|
||
) VALUES ($1, 'director_death', FALSE, NOW(), NOW());
|
||
"#;
|
||
|
||
const QUERY_MARK_CHARACTER_DECEASED: &str = r#"
|
||
DELETE FROM falukant_data.character
|
||
WHERE id = $1;
|
||
"#;
|
||
|
||
impl CharacterCreationWorker {
|
||
pub fn new(pool: ConnectionPool, broker: MessageBroker) -> Self {
|
||
Self::new_internal(pool, broker, true)
|
||
}
|
||
|
||
/// Interner Konstruktor, der optional den NPC-Todes-Monitor startet.
|
||
fn new_internal(pool: ConnectionPool, broker: MessageBroker, start_death_thread: bool) -> Self {
|
||
let base = BaseWorker::new("CharacterCreationWorker", pool.clone(), broker.clone());
|
||
let rng = StdRng::from_entropy();
|
||
let dist = Uniform::from(2..=3);
|
||
let death_check_running = Arc::new(AtomicBool::new(start_death_thread));
|
||
|
||
let death_thread = if start_death_thread {
|
||
let death_flag = Arc::clone(&death_check_running);
|
||
let pool_clone = pool;
|
||
let broker_clone = broker;
|
||
Some(thread::spawn(move || {
|
||
while death_flag.load(Ordering::Relaxed) {
|
||
if let Err(err) =
|
||
CharacterCreationWorker::monitor_character_deaths(&pool_clone, &broker_clone)
|
||
{
|
||
eprintln!(
|
||
"[CharacterCreationWorker] Fehler beim Überprüfen von NPC-Todesfällen: {err}"
|
||
);
|
||
}
|
||
|
||
// Warte 1 Stunde, aber mit frühem Abbruch, wenn death_flag false wird
|
||
for _ in 0..3600 {
|
||
if !death_flag.load(Ordering::Relaxed) {
|
||
break;
|
||
}
|
||
thread::sleep(Duration::from_secs(1));
|
||
}
|
||
}
|
||
}))
|
||
} else {
|
||
None
|
||
};
|
||
|
||
Self {
|
||
base,
|
||
rng,
|
||
dist,
|
||
first_name_cache: HashMap::new(),
|
||
last_name_cache: HashSet::new(),
|
||
death_check_running,
|
||
death_thread,
|
||
}
|
||
}
|
||
|
||
/// Variante ohne separaten Todes-Monitor-Thread – wird nur in der Worker-Loop benutzt.
|
||
fn new_for_loop(pool: ConnectionPool, broker: MessageBroker) -> Self {
|
||
Self::new_internal(pool, broker, false)
|
||
}
|
||
|
||
fn is_today_character_created(&self) -> bool {
|
||
match self.fetch_today_characters() {
|
||
Ok(rows) => !rows.is_empty(),
|
||
Err(err) => {
|
||
eprintln!(
|
||
"[CharacterCreationWorker] Fehler in is_today_character_created: {err}"
|
||
);
|
||
false
|
||
}
|
||
}
|
||
}
|
||
|
||
fn fetch_today_characters(&self) -> Result<Rows, crate::db::DbError> {
|
||
const STMT_NAME: &str = "is_previous_day_character_created";
|
||
|
||
let mut conn = self
|
||
.base
|
||
.pool
|
||
.get()
|
||
.map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(STMT_NAME, QUERY_IS_PREVIOUS_DAY_CHARACTER_CREATED)?;
|
||
conn.execute(STMT_NAME, &[])
|
||
}
|
||
|
||
fn create_characters_for_today(&mut self) {
|
||
self.load_names();
|
||
if self.first_name_cache.is_empty() || self.last_name_cache.is_empty() {
|
||
eprintln!(
|
||
"[CharacterCreationWorker] Fehler: Namen konnten nicht geladen werden (Stub-Implementierung)."
|
||
);
|
||
return;
|
||
}
|
||
|
||
let town_ids = self.get_town_region_ids();
|
||
for region_id in town_ids {
|
||
self.create_characters_for_region(region_id);
|
||
}
|
||
}
|
||
|
||
fn create_characters_for_region(&mut self, region_id: i32) {
|
||
let nobility_stands = [1, 2, 3];
|
||
let genders = ["male", "female"];
|
||
|
||
for &nobility in &nobility_stands {
|
||
for &gender in &genders {
|
||
let num_chars = self.rng.sample(self.dist);
|
||
for _ in 0..num_chars {
|
||
self.create_character(region_id, gender, nobility);
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
fn create_character(&mut self, region_id: i32, gender: &str, title_of_nobility: i32) {
|
||
let first_set = self
|
||
.first_name_cache
|
||
.get(gender)
|
||
.cloned()
|
||
.unwrap_or_default();
|
||
let first_name_id = Self::get_random_from_set(&first_set);
|
||
if first_name_id == -1 {
|
||
eprintln!("[CharacterCreationWorker] Fehler: Kein passender Vorname gefunden.");
|
||
return;
|
||
}
|
||
|
||
let last_name_id = Self::get_random_from_set(&self.last_name_cache);
|
||
if last_name_id == -1 {
|
||
eprintln!("[CharacterCreationWorker] Fehler: Kein passender Nachname gefunden.");
|
||
return;
|
||
}
|
||
|
||
if let Err(err) = Self::insert_character(
|
||
&self.base.pool,
|
||
region_id,
|
||
first_name_id,
|
||
last_name_id,
|
||
gender,
|
||
title_of_nobility,
|
||
) {
|
||
eprintln!("[CharacterCreationWorker] Fehler in createCharacter: {err}");
|
||
}
|
||
}
|
||
|
||
fn get_town_region_ids(&self) -> Vec<i32> {
|
||
match self.load_town_region_ids() {
|
||
Ok(rows) => rows
|
||
.into_iter()
|
||
.filter_map(|row| row.get("id")?.parse::<i32>().ok())
|
||
.collect(),
|
||
Err(err) => {
|
||
eprintln!(
|
||
"[CharacterCreationWorker] Fehler in getTownRegionIds: {err}"
|
||
);
|
||
Vec::new()
|
||
}
|
||
}
|
||
}
|
||
|
||
fn load_town_region_ids(&self) -> Result<Rows, crate::db::DbError> {
|
||
const STMT_NAME: &str = "get_town_region_ids";
|
||
|
||
let mut conn = self
|
||
.base
|
||
.pool
|
||
.get()
|
||
.map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(STMT_NAME, QUERY_GET_TOWN_REGION_IDS)?;
|
||
conn.execute(STMT_NAME, &[])
|
||
}
|
||
|
||
fn load_names(&mut self) {
|
||
if self.first_name_cache.is_empty() || self.last_name_cache.is_empty() {
|
||
if let Err(err) = self.load_first_and_last_names() {
|
||
eprintln!("[CharacterCreationWorker] Fehler in loadNames: {err}");
|
||
}
|
||
}
|
||
}
|
||
|
||
fn load_first_and_last_names(&mut self) -> Result<(), crate::db::DbError> {
|
||
let mut conn = self
|
||
.base
|
||
.pool
|
||
.get()
|
||
.map_err(|e| crate::db::DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
// Vornamen
|
||
conn.prepare("load_first_names", QUERY_LOAD_FIRST_NAMES)?;
|
||
let first_rows = conn.execute("load_first_names", &[])?;
|
||
for row in first_rows {
|
||
let id = match row.get("id").and_then(|v| v.parse::<i32>().ok()) {
|
||
Some(id) => id,
|
||
None => continue,
|
||
};
|
||
let gender = row.get("gender").cloned().unwrap_or_default();
|
||
self.first_name_cache.entry(gender).or_default().insert(id);
|
||
}
|
||
|
||
// Nachnamen
|
||
conn.prepare("load_last_names", QUERY_LOAD_LAST_NAMES)?;
|
||
let last_rows = conn.execute("load_last_names", &[])?;
|
||
for row in last_rows {
|
||
if let Some(id) = row.get("id").and_then(|v| v.parse::<i32>().ok()) {
|
||
self.last_name_cache.insert(id);
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn get_random_from_set(set: &HashSet<i32>) -> i32 {
|
||
if set.is_empty() {
|
||
return -1;
|
||
}
|
||
let mut rng = thread_rng();
|
||
let idx = rng.gen_range(0..set.len());
|
||
*set.iter().nth(idx).unwrap_or(&-1)
|
||
}
|
||
|
||
fn run_iteration(&mut self, state: &WorkerState) {
|
||
self.base
|
||
.set_current_step("Check if previous day character was created");
|
||
|
||
if !self.is_today_character_created() {
|
||
self.base
|
||
.set_current_step("Create characters for today");
|
||
self.create_characters_for_today();
|
||
}
|
||
|
||
self.sleep_one_minute(state);
|
||
}
|
||
|
||
fn sleep_one_minute(&self, state: &WorkerState) {
|
||
self.base
|
||
.set_current_step("Sleep for 60 seconds");
|
||
|
||
for _ in 0..60 {
|
||
if !state.running_worker.load(Ordering::Relaxed) {
|
||
break;
|
||
}
|
||
thread::sleep(Duration::from_secs(1));
|
||
}
|
||
|
||
self.base.set_current_step("Loop done");
|
||
}
|
||
|
||
}
|
||
|
||
impl Worker for CharacterCreationWorker {
|
||
fn start_worker_thread(&mut self) {
|
||
let pool = self.base.pool.clone();
|
||
let broker = self.base.broker.clone();
|
||
|
||
self.base
|
||
.start_worker_with_loop(move |state: Arc<WorkerState>| {
|
||
let mut worker = CharacterCreationWorker::new_for_loop(pool.clone(), broker.clone());
|
||
while state.running_worker.load(Ordering::Relaxed) {
|
||
worker.run_iteration(&state);
|
||
}
|
||
});
|
||
}
|
||
|
||
fn stop_worker_thread(&mut self) {
|
||
self.base.stop_worker();
|
||
}
|
||
|
||
fn enable_watchdog(&mut self) {
|
||
self.base.start_watchdog();
|
||
}
|
||
}
|
||
|
||
impl Drop for CharacterCreationWorker {
|
||
fn drop(&mut self) {
|
||
self.death_check_running
|
||
.store(false, Ordering::Relaxed);
|
||
if let Some(handle) = self.death_thread.take() {
|
||
let _ = handle.join();
|
||
}
|
||
}
|
||
}
|
||
|
||
// Zusätzliche Logik: NPC-Todesfälle überwachen und verarbeiten
|
||
|
||
impl CharacterCreationWorker {
|
||
fn insert_character(
|
||
pool: &ConnectionPool,
|
||
region_id: i32,
|
||
first_name_id: i32,
|
||
last_name_id: i32,
|
||
gender: &str,
|
||
title_of_nobility: i32,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("insert_character", QUERY_INSERT_CHARACTER)?;
|
||
conn.execute(
|
||
"insert_character",
|
||
&[
|
||
®ion_id,
|
||
&first_name_id,
|
||
&last_name_id,
|
||
&gender,
|
||
&title_of_nobility,
|
||
],
|
||
)?;
|
||
Ok(())
|
||
}
|
||
|
||
fn monitor_character_deaths(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare(
|
||
"get_eligible_npc_for_death",
|
||
QUERY_GET_ELIGIBLE_NPC_FOR_DEATH,
|
||
)?;
|
||
let rows = conn.execute("get_eligible_npc_for_death", &[])?;
|
||
|
||
for row in rows {
|
||
let character_id = row
|
||
.get("id")
|
||
.and_then(|v| v.parse::<i32>().ok())
|
||
.unwrap_or(-1);
|
||
let age = row
|
||
.get("age")
|
||
.and_then(|v| v.parse::<i32>().ok())
|
||
.unwrap_or(0);
|
||
|
||
if character_id > 0 && Self::calculate_death_probability(age) {
|
||
if let Err(err) = Self::handle_character_death(pool, broker, character_id) {
|
||
eprintln!(
|
||
"[CharacterCreationWorker] Fehler beim Bearbeiten des NPC-Todes (id={character_id}): {err}"
|
||
);
|
||
}
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn calculate_death_probability(age: i32) -> bool {
|
||
if age < 60 {
|
||
return false;
|
||
}
|
||
|
||
let base_probability = 0.01_f64;
|
||
let increase_per_year = 0.01_f64;
|
||
let death_probability =
|
||
base_probability + increase_per_year * (age.saturating_sub(60) as f64);
|
||
|
||
let mut rng = thread_rng();
|
||
let dist = Uniform::from(0.0..1.0);
|
||
let roll: f64 = dist.sample(&mut rng);
|
||
roll < death_probability
|
||
}
|
||
|
||
fn handle_character_death(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
character_id: i32,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
// 1) Director löschen und User benachrichtigen
|
||
conn.prepare("delete_director", QUERY_DELETE_DIRECTOR)?;
|
||
let dir_result = conn.execute("delete_director", &[&character_id])?;
|
||
if let Some(row) = dir_result.get(0) {
|
||
if let Some(user_id) = row
|
||
.get("employer_user_id")
|
||
.and_then(|v| v.parse::<i32>().ok())
|
||
{
|
||
Self::notify_user(pool, broker, user_id, "director_death")?;
|
||
}
|
||
}
|
||
|
||
// 2) Relationships löschen und betroffene User benachrichtigen
|
||
conn.prepare("delete_relationship", QUERY_DELETE_RELATIONSHIP)?;
|
||
let rel_result = conn.execute("delete_relationship", &[&character_id])?;
|
||
for row in rel_result {
|
||
if let Some(related_user_id) = row
|
||
.get("related_user_id")
|
||
.and_then(|v| v.parse::<i32>().ok())
|
||
{
|
||
Self::notify_user(pool, broker, related_user_id, "relationship_death")?;
|
||
}
|
||
}
|
||
|
||
// 3) Child-Relations löschen und Eltern benachrichtigen
|
||
conn.prepare("delete_child_relation", QUERY_DELETE_CHILD_RELATION)?;
|
||
let child_result = conn.execute("delete_child_relation", &[&character_id])?;
|
||
for row in child_result {
|
||
if let Some(father_user_id) = row
|
||
.get("father_user_id")
|
||
.and_then(|v| v.parse::<i32>().ok())
|
||
{
|
||
Self::notify_user(pool, broker, father_user_id, "child_death")?;
|
||
}
|
||
if let Some(mother_user_id) = row
|
||
.get("mother_user_id")
|
||
.and_then(|v| v.parse::<i32>().ok())
|
||
{
|
||
Self::notify_user(pool, broker, mother_user_id, "child_death")?;
|
||
}
|
||
}
|
||
|
||
// 4) Charakter als verstorben markieren
|
||
Self::mark_character_as_deceased(pool, character_id)?;
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn notify_user(
|
||
pool: &ConnectionPool,
|
||
broker: &MessageBroker,
|
||
user_id: i32,
|
||
event_type: &str,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("insert_notification", QUERY_INSERT_NOTIFICATION)?;
|
||
conn.execute("insert_notification", &[&user_id])?;
|
||
|
||
// falukantUpdateStatus
|
||
let update_message =
|
||
format!(r#"{{"event":"falukantUpdateStatus","user_id":{}}}"#, user_id);
|
||
broker.publish(update_message);
|
||
|
||
// ursprüngliche Benachrichtigung
|
||
let message =
|
||
format!(r#"{{"event":"{event_type}","user_id":{}}}"#, user_id);
|
||
broker.publish(message);
|
||
|
||
Ok(())
|
||
}
|
||
|
||
fn mark_character_as_deceased(
|
||
pool: &ConnectionPool,
|
||
character_id: i32,
|
||
) -> Result<(), DbError> {
|
||
let mut conn = pool
|
||
.get()
|
||
.map_err(|e| DbError::new(format!("DB-Verbindung fehlgeschlagen: {e}")))?;
|
||
|
||
conn.prepare("mark_character_deceased", QUERY_MARK_CHARACTER_DECEASED)?;
|
||
conn.execute("mark_character_deceased", &[&character_id])?;
|
||
Ok(())
|
||
}
|
||
}
|
||
|
||
|