"""Gemeinsame Hilfsfunktionen: Zahlenformat, URL, HTTP, CSV, SQLite-Export.""" from __future__ import annotations import csv import io import os import re import sqlite3 import ssl import tempfile from urllib.error import HTTPError, URLError from urllib.request import Request, urlopen _PERCENT_TAIL = re.compile(r"\s*%+\s*$") HTTP_TIMEOUT_SEC = 45 # --------------------------------------------------------------------------- # Zahlen-Parser (de-DE Zelltext) # --------------------------------------------------------------------------- def parse_float(s: str, *, default: float = 0.0) -> float: """ Zelltext (Google Sheets, de-DE) -> float. Tausenderpunkt, Dezimalkomma, Prozent. """ t = (s or "").strip() if not t: return default t = t.replace("\u00a0", " ").replace("\u202f", " ") t = _PERCENT_TAIL.sub("", t).strip() if not t: return default if "," in t: left, right = t.rsplit(",", 1) left = left.replace(".", "") right = right.replace(".", "").strip() t = (left + "." + right) if right else left else: nd = t.count(".") if nd > 1: t = t.replace(".", "") elif nd == 1: head, tail = t.split(".", 1) if head.isdigit() and tail.isdigit() and len(tail) == 3 and 1 <= len(head) <= 3: t = head + tail return float(t) def parse_int(s: str) -> int: """Zelltext -> int (Tausenderpunkte werden entfernt).""" t = (s or "").strip().replace("\u00a0", "").replace("\u202f", "").replace(" ", "").replace(".", "") if not t: raise ValueError("leer") return int(t) # --------------------------------------------------------------------------- # URL-Normalisierung (Google Sheets) # --------------------------------------------------------------------------- def normalize_csv_source_url(url: str) -> str: """Google-Sheet-URL (/edit?gid=...) in CSV-Export-URL umwandeln.""" url = url.strip() if not url: return url if re.search(r"/export\?format=csv", url): return url m = re.match( r"^https://docs\.google\.com/spreadsheets/d/([a-zA-Z0-9_-]+)/edit", url, ) if m: sid = m.group(1) gid = "0" gm = re.search(r"[?&]gid=(\d+)", url) if gm: gid = gm.group(1) return f"https://docs.google.com/spreadsheets/d/{sid}/export?format=csv&gid={gid}" return url def normalize_https_csv_url_or_error(raw: str) -> tuple[str | None, str | None]: """(url, None) bei Erfolg, (None, Fehlermeldung) bei Fehler.""" u = normalize_csv_source_url((raw or "").strip()) if not u.startswith("https://"): return None, "URL ungueltig (https erforderlich)." return u, None # --------------------------------------------------------------------------- # HTTP + CSV # --------------------------------------------------------------------------- def http_get_body(url: str) -> str | None: """HTTPS-GET, Body als String oder None bei Fehler.""" if not url.startswith("https://"): return None req = Request(url, headers={"User-Agent": "montecarlo-import/1.0"}) ctx = ssl.create_default_context() try: with urlopen(req, timeout=HTTP_TIMEOUT_SEC, context=ctx) as r: # noqa: S310 if r.status < 200 or r.status >= 300: return None raw = r.read() except (HTTPError, URLError, TimeoutError, OSError): return None return raw.decode("utf-8", errors="replace") def read_csv_string(contents: str) -> dict: """CSV-Text mit Header -> {header: [...], rows: [[...]]}.""" if not contents.strip(): return {"header": [], "rows": []} f = io.StringIO(contents) rows = list(csv.reader(f)) if not rows: return {"header": [], "rows": []} return {"header": rows[0], "rows": rows[1:]} def read_csv_string_no_header(contents: str) -> dict: """CSV-Text ohne Header -> generierte col_1..col_N Spaltennamen.""" if not contents.strip(): return {"header": [], "rows": []} f = io.StringIO(contents) rows = list(csv.reader(f)) if not rows: return {"header": [], "rows": []} max_cols = max((len(r) for r in rows), default=0) if max_cols < 1: return {"header": [], "rows": []} header = [f"col_{c}" for c in range(1, max_cols + 1)] return {"header": header, "rows": rows} # --------------------------------------------------------------------------- # SQLite-Export # --------------------------------------------------------------------------- _SAFE_NAME_RE = re.compile(r"[^a-zA-Z0-9_]") def _sqlite_col_type(val) -> str: if isinstance(val, bool): return "INTEGER" if isinstance(val, int): return "INTEGER" if isinstance(val, float): return "REAL" return "TEXT" def to_sqlite_bytes(header: list[str], rows: list[list], table_name: str = "data") -> bytes: """header + rows → komplette SQLite-Datei als bytes.""" safe_name = _SAFE_NAME_RE.sub("_", table_name) or "data" safe_cols = [_SAFE_NAME_RE.sub("_", h) or f"col_{i}" for i, h in enumerate(header)] col_types = ["TEXT"] * len(safe_cols) if rows: for i, val in enumerate(rows[0]): if i < len(col_types): col_types[i] = _sqlite_col_type(val) fd, tmp = tempfile.mkstemp(suffix=".sqlite") os.close(fd) try: conn = sqlite3.connect(tmp) col_defs = ", ".join(f'"{c}" {t}' for c, t in zip(safe_cols, col_types)) conn.execute(f'CREATE TABLE "{safe_name}" ({col_defs})') placeholders = ", ".join("?" * len(safe_cols)) n = len(safe_cols) conn.executemany( f'INSERT INTO "{safe_name}" VALUES ({placeholders})', [r[:n] for r in rows], ) conn.commit() conn.close() with open(tmp, "rb") as f: return f.read() finally: try: os.unlink(tmp) except OSError: pass