"""PRISK Monte-Carlo Simulation – vektorisiert mit NumPy/SciPy. Keine Row-Instanzen in der Schleife; Berechnung ueber NumPy-Arrays. """ from __future__ import annotations import os import sqlite3 import tempfile from collections import defaultdict from dataclasses import dataclass, field import numpy as np from ..row.row import Row try: from scipy.stats import beta as beta_dist except ImportError: beta_dist = None _HEADER = ["SimulationNr", "AssetNr", "RiskNr", "RiskRandom", "RiskValue"] @dataclass class PriskSimulation(Row): """Row-Metadaten (Instanzen werden im Normalbetrieb nicht erzeugt).""" inputColumns = ("SimulationNr", "AssetNr", "RiskNr") computedColumns = ("RiskRandom", "RiskValue") SimulationNr: int = 1 AssetNr: str = None RiskNr: str = None RiskRandom: float = None RiskValue: float = None _prisk_row: object = field(default=None, init=False, repr=False) class PriskSimulationTable: """Monte-Carlo: ITERATIONS × PRISK-Zeilen (vektorisiert).""" rowClass = PriskSimulation def __init__(self) -> None: self._iterations = 0 self._by_asset: dict[str, dict] = {} self._asset_order: list[str] = [] self._total = 0 @classmethod def run(cls) -> PriskSimulationTable: sim = cls() settings = Row._settings() iterations = int(settings.get("ITERATIONS", 1000)) sim._iterations = iterations if beta_dist is None: raise ValueError("scipy fehlt – pip install scipy") prisk_table = settings["PRISK"] for prisk_row in prisk_table.rows: prisk_row.compute() risks_by_asset: dict[str, list] = defaultdict(list) for prisk_row in prisk_table.rows: risks_by_asset[prisk_row.AssetNr].append(prisk_row) for asset_nr, risks in risks_by_asset.items(): risk_nrs: list[str] = [] randoms_list: list[np.ndarray] = [] values_list: list[np.ndarray] = [] base_vals_list: list[np.ndarray] = [] for prisk_row in risks: u = np.random.randint(1, 10000, size=iterations) / 10000.0 a, b = prisk_row.Alpha, prisk_row.Beta if a is not None and b is not None and a > 0 and b > 0: vr = prisk_row._value_range() v = prisk_row.Min + vr * beta_dist.ppf(u, a, b) else: v = np.zeros(iterations) mod_val = float(prisk_row.Mod) if prisk_row.Mod is not None else 0.0 base_vals_list.append(np.full(iterations, mod_val)) risk_nrs.append(prisk_row.RiskNr) randoms_list.append(u) values_list.append(v) sim._by_asset[asset_nr] = { "risk_nrs": risk_nrs, "randoms": np.array(randoms_list), "values": np.array(values_list), "base_vals": np.array(base_vals_list), } sim._asset_order.append(asset_nr) sim._total = sum( len(d["risk_nrs"]) * iterations for d in sim._by_asset.values() ) return sim def getData(self, limit: int = 0) -> dict: cap = limit if limit > 0 else self._total footer: list = [] if self._total > 0: all_r = np.concatenate([d["randoms"].ravel() for d in self._by_asset.values()]) all_v = np.concatenate([d["values"].ravel() for d in self._by_asset.values()]) footer = [self._total, "", "", float(all_r.mean()), float(all_v.mean())] rows: list[list] = [] for j in range(self._iterations): for asset_nr in self._asset_order: d = self._by_asset[asset_nr] for i, risk_nr in enumerate(d["risk_nrs"]): rows.append([j + 1, asset_nr, risk_nr, float(d["randoms"][i][j]), float(d["values"][i][j])]) if len(rows) >= cap: return {"header": list(_HEADER), "rows": rows, "total": self._total, "footer": footer} return {"header": list(_HEADER), "rows": rows, "total": self._total, "footer": footer} def toSqlite(self, table_name: str = "prisk_simulation") -> bytes: if self._total == 0: from helpers import to_sqlite_bytes return to_sqlite_bytes([], [], table_name) fd, tmp = tempfile.mkstemp(suffix=".sqlite") os.close(fd) try: conn = sqlite3.connect(tmp) conn.execute( f'CREATE TABLE "{table_name}" ' '("SimulationNr" INTEGER, "AssetNr" TEXT, "RiskNr" TEXT, ' '"RiskRandom" REAL, "RiskValue" REAL)' ) sims = list(range(1, self._iterations + 1)) for asset_nr in self._asset_order: d = self._by_asset[asset_nr] assets = [asset_nr] * self._iterations for i, risk_nr in enumerate(d["risk_nrs"]): risks = [risk_nr] * self._iterations conn.executemany( f'INSERT INTO "{table_name}" VALUES (?,?,?,?,?)', zip(sims, assets, risks, d["randoms"][i].tolist(), d["values"][i].tolist()), ) conn.commit() conn.close() with open(tmp, "rb") as f: return f.read() finally: try: os.unlink(tmp) except OSError: pass