import sqlite3
import re
import logging
import os
import sys
import time
import threading
from typing import Iterable
from pypos.core.utils.path_utils import get_db_path
from pypos.core.utils.path_utils import get_install_base_dir, get_app_resource_dir
from pypos.core.database.schema_migrator import run_schema_migrations_once
from pypos.core.utils.config_utils import read_app_settings
from pypos.core.utils.sql_query_builder import render_sql_template

LOGGER = logging.getLogger(__name__)
_SQLITE_DRIVER_FALLBACK_WARNED = set()
_NUMERIC_PARSE_EXCEPTIONS = (TypeError, ValueError)
_SQLITE_OPERATION_EXCEPTIONS = (sqlite3.Error, OSError, RuntimeError, ValueError, TypeError)
_INDEX_DONE_FOR_DB = set()
_INDEX_DONE_LOCK = threading.RLock()


# edited by glg
def _to_int(value, default=0):
    try:
        return int(value)
    except _NUMERIC_PARSE_EXCEPTIONS:
        return int(default)


# edited by glg
def _to_float(value, default=0.0):
    try:
        return float(value)
    except _NUMERIC_PARSE_EXCEPTIONS:
        return float(default)


# edited by glg
def _to_bool(value, default=False):
    if isinstance(value, bool):
        return bool(value)
    if value is None:
        return bool(default)
    text = str(value).strip().lower()
    if text in {"1", "true", "yes", "on"}:
        return True
    if text in {"0", "false", "no", "off"}:
        return False
    return bool(default)


# edited by glg
def _is_destructive_maintenance_allowed(strict=False):
    cfg = read_app_settings() or {}
    # edited by glg
    # Fail-closed: operasi destruktif default OFF jika key belum tersedia.
    enabled = _to_bool(cfg.get("maintenance_destructive_db_ops_enabled"), default=False)
    allow_non_strict = _to_bool(cfg.get("maintenance_destructive_db_ops_allow_non_strict"), default=False)
    if strict:
        return bool(enabled)
    return bool(enabled and allow_non_strict)


# edited by glg
def _get_destructive_maintenance_reason():
    cfg = read_app_settings() or {}
    return str(cfg.get("maintenance_destructive_db_ops_reason") or "").strip()


# edited by glg
def _enforce_destructive_policy(action_name, strict=False):
    allowed = _is_destructive_maintenance_allowed(strict=bool(strict))
    reason = _get_destructive_maintenance_reason() or "-"
    if not allowed:
        message = (
            f"{action_name} diblokir oleh policy maintenance. "
            "Aktifkan maintenance_destructive_db_ops_enabled "
            "(dan maintenance_destructive_db_ops_allow_non_strict untuk mode non-strict)."
        )
        if strict:
            raise PermissionError(message)
        LOGGER.warning("[DB_MAINTENANCE] %s", message)
        return False
    LOGGER.info(
        "[DB_MAINTENANCE] action=%s strict=%s reason=%s",
        str(action_name or "-"),
        int(bool(strict)),
        reason,
    )
    return True

def get_receive_on_account_debit():
    db_path = get_db_path()
    conn = connect_sqlite(db_path)
    cursor = conn.cursor()
    cursor.execute("""
        SELECT DISTINCT nama 
        FROM bank 
        WHERE nama LIKE '%EDC%'
    """)
    results = [row[0] for row in cursor.fetchall()]
    conn.close()
    return results

def get_debit_card_names():
    db_path = get_db_path()
    conn = connect_sqlite(db_path)
    cursor = conn.cursor()
    cursor.execute("""
        SELECT DISTINCT nama 
        FROM bank 
        WHERE nama NOT LIKE '%EDC%' 
          AND nama NOT LIKE '%Tunai%' 
          AND nama LIKE '%debit%'
    """)
    results = [row[0] for row in cursor.fetchall()]
    conn.close()
    return results

def get_receive_on_account_names():
    db_path = get_db_path()
    conn = connect_sqlite(db_path)
    cursor = conn.cursor()
    cursor.execute("""
        SELECT DISTINCT nama 
        FROM bank 
        WHERE nama LIKE '%EDC%'
    """)
    results = [row[0] for row in cursor.fetchall()]
    conn.close()
    return results

def get_credit_card_names():
    db_path = get_db_path()
    conn = connect_sqlite(db_path)
    cursor = conn.cursor()
    cursor.execute("""
        SELECT DISTINCT nama 
        FROM bank 
        WHERE nama NOT LIKE '%EDC%' 
          AND nama NOT LIKE '%Tunai%' 
          AND nama NOT LIKE '%debit%'
    """)
    results = [row[0] for row in cursor.fetchall()]
    conn.close()
    return results

def parse_int_safely(value):
    """
    Mengubah string yang mengandung angka, simbol %, Rp, spasi, dll. menjadi int.
    Contoh:
        "0 %"       -> 0
        "Rp 5.000"  -> 5000
        ""          -> 0
    """
    if value is None:
        return 0
    if isinstance(value, (int, float)):
        return int(value)
    digits_only = re.sub(r"[^\d]", "", str(value))
    return int(digits_only) if digits_only else 0

def parse_float_safely(value, default=0.0):
    if value is None:
        return float(default)
    if isinstance(value, (int, float)):
        return float(value)
    s = str(value).strip()
    if not s:
        return float(default)
    # pertahankan tanda minus
    neg = s.startswith("-")
    s = s.replace(" ", "")
    s = re.sub(r"[^0-9.,]", "", s)
    if "," in s and "." in s:
        if s.rfind(",") > s.rfind("."):
            s = s.replace(".", "")
            s = s.replace(",", ".")
        else:
            s = s.replace(",", "")
    elif "," in s and "." not in s:
        s = s.replace(",", ".")
    elif s.count(".") > 1:
        parts = s.split(".")
        if all(len(p) == 3 for p in parts[1:]):
            s = "".join(parts)
    try:
        val = float(s)
    except _NUMERIC_PARSE_EXCEPTIONS:
        return float(default)
    return -val if neg else val

def parse_int_from_text(value, default=0):
    try:
        return int(parse_float_safely(value, default))
    except (TypeError, ValueError, OverflowError):
        return int(default)


# edited by glg
def build_sqlite_readonly_uri(db_path=None):
    db_abs = os.path.abspath(db_path or get_db_path())
    uri_path = db_abs.replace("\\", "/")
    return f"file:{uri_path}?mode=ro"


# edited by glg
def _resolve_sqlite_driver(config):
    requested = str((config or {}).get("sqlite_driver", "sqlite3") or "sqlite3").strip().lower()
    normalized = requested.replace("-", "_")
    if normalized in {"", "sqlite3"}:
        return sqlite3, "sqlite3"
    if normalized in {"sqlcipher", "sqlcipher3", "pysqlcipher3"}:
        try:
            from pysqlcipher3 import dbapi2 as sqlcipher_dbapi
            return sqlcipher_dbapi, "pysqlcipher3"
        except (ImportError, ModuleNotFoundError) as exc:
            warn_key = f"{normalized}:{type(exc).__name__}:{str(exc)}"
            if warn_key not in _SQLITE_DRIVER_FALLBACK_WARNED:
                _SQLITE_DRIVER_FALLBACK_WARNED.add(warn_key)
                LOGGER.warning(
                    "SQLite driver '%s' tidak tersedia (%s). Fallback ke sqlite3.",
                    requested,
                    exc,
                )
            return sqlite3, "sqlite3"

    if normalized not in _SQLITE_DRIVER_FALLBACK_WARNED:
        _SQLITE_DRIVER_FALLBACK_WARNED.add(normalized)
        LOGGER.warning(
            "SQLite driver '%s' tidak dikenali. Fallback ke sqlite3.",
            requested,
        )
    return sqlite3, "sqlite3"


# edited by glg
def _apply_sqlcipher_connection_pragmas(conn, config):
    cfg = config if isinstance(config, dict) else {}
    cipher_enabled = _to_int(cfg.get("sqlite_cipher_enabled", 0) or 0, default=0) == 1
    if not cipher_enabled:
        return

    key_env = str(cfg.get("sqlite_cipher_key_env", "PYPOS_SQLCIPHER_KEY") or "").strip() or "PYPOS_SQLCIPHER_KEY"
    cipher_key = str(os.environ.get(key_env, "") or "")
    if not cipher_key:
        raise RuntimeError(f"SQLCipher aktif tetapi env key kosong: {key_env}")
    escaped_key = cipher_key.replace("'", "''")
    conn.execute(f"PRAGMA key = '{escaped_key}'")

    cipher_compat = _to_int(cfg.get("sqlite_cipher_compatibility", 4) or 4, default=4)
    if cipher_compat > 0:
        conn.execute(f"PRAGMA cipher_compatibility = {cipher_compat}")

    kdf_iter = _to_int(cfg.get("sqlite_cipher_kdf_iter", 256000) or 256000, default=256000)
    if kdf_iter > 0:
        conn.execute(f"PRAGMA kdf_iter = {kdf_iter}")

    page_size = _to_int(cfg.get("sqlite_cipher_page_size", 4096) or 4096, default=4096)
    if page_size > 0:
        conn.execute(f"PRAGMA cipher_page_size = {page_size}")


# edited by glg
def open_sqlite_connection(
    db_path=None,
    timeout=None,
    uri=False,
    app_settings=None,
    apply_pragmas=False,
    ensure_indexes=False,
    run_migrations=False,
):
    config = app_settings if isinstance(app_settings, dict) else (read_app_settings() or {})
    db_file = db_path or get_db_path()
    if bool(run_migrations):
        run_schema_migrations_once(db_file, strict=False)
    if timeout is None:
        timeout = int(config.get("sqlite_timeout", 30))
    sqlite_driver, sqlite_driver_name = _resolve_sqlite_driver(config)
    conn = sqlite_driver.connect(db_file, timeout=float(timeout), uri=bool(uri))
    if sqlite_driver_name == "pysqlcipher3":
        _apply_sqlcipher_connection_pragmas(conn, config)
    if bool(apply_pragmas):
        try:
            _apply_sqlite_pragmas(conn, config)
        except _SQLITE_OPERATION_EXCEPTIONS as e:
            LOGGER.warning("Gagal set PRAGMA SQLite: %s", e)
    if bool(ensure_indexes):
        try:
            _ensure_default_indexes(conn)
        except _SQLITE_OPERATION_EXCEPTIONS as e:
            LOGGER.warning("Gagal membuat index default: %s", e)
    return conn


def connect_sqlite(db_path=None, timeout=None):
    config = read_app_settings() or {}
    return open_sqlite_connection(
        db_path=db_path,
        timeout=timeout,
        uri=False,
        app_settings=config,
        apply_pragmas=True,
        ensure_indexes=True,
        run_migrations=True,
    )


def connect_sqlite_read_fast(db_path=None, timeout=None, busy_timeout_ms=None):
    # edited by glg
    # Koneksi read-fast untuk hot-path UI:
    # - timeout connect lebih pendek
    # - busy_timeout lebih kecil agar fail-fast saat lock berat.
    cfg = read_app_settings()
    if timeout is None:
        timeout = _to_float(cfg.get("sqlite_ui_read_timeout_sec", 1) or 1, default=1.0)
    timeout = max(0.2, float(timeout))

    started_at = time.perf_counter()
    conn = connect_sqlite(db_path=db_path, timeout=timeout)
    try:
        if busy_timeout_ms is None:
            busy_timeout_ms = _to_int(cfg.get("sqlite_ui_read_busy_timeout_ms", 350) or 350, default=350)
        busy_timeout_ms = max(0, int(busy_timeout_ms))
        conn.execute(f"PRAGMA busy_timeout = {busy_timeout_ms}")
    except _SQLITE_OPERATION_EXCEPTIONS as exc:
        LOGGER.warning("Gagal set PRAGMA read-fast SQLite: %s", exc)

    elapsed_ms = (time.perf_counter() - started_at) * 1000.0
    if elapsed_ms >= 150.0:
        LOGGER.info(
            "[PERF] connect_sqlite_read_fast elapsed_ms=%.1f timeout=%.2fs busy_timeout_ms=%s",
            elapsed_ms,
            float(timeout),
            int(busy_timeout_ms or 0),
        )
    return conn


def _apply_sqlite_pragmas(conn, config):
    foreign_keys_enabled = _to_int(config.get("sqlite_foreign_keys", 1), default=1) == 1
    if foreign_keys_enabled:
        conn.execute("PRAGMA foreign_keys = ON")

    use_wal = _to_int(config.get("sqlite_use_wal", 1), default=1) == 1
    if use_wal:
        row = conn.execute("PRAGMA journal_mode=WAL").fetchone()
        journal_mode = str((row or [""])[0] or "").strip().lower()
        if journal_mode != "wal":
            LOGGER.warning("SQLite journal_mode bukan WAL (actual=%s)", journal_mode or "unknown")

    busy_ms = _to_int(config.get("sqlite_busy_timeout_ms", 5000), default=5000)
    if busy_ms > 0:
        conn.execute(f"PRAGMA busy_timeout = {busy_ms}")

    conn.execute("PRAGMA synchronous=NORMAL")
    conn.execute("PRAGMA temp_store=MEMORY")

def _ensure_default_indexes(conn):
    try:
        cur = conn.cursor()
        cur.execute("PRAGMA database_list")
        db_file = ""
        for row in cur.fetchall():
            # row: (seq, name, file)
            if len(row) >= 3 and str(row[1]) == "main":
                db_file = str(row[2] or "")
                break
        db_key = db_file or "main"
        with _INDEX_DONE_LOCK:
            if db_key in _INDEX_DONE_FOR_DB:
                cur.close()
                return

        cur.execute("SELECT name FROM sqlite_master WHERE type='table'")
        table_names = {str(row[0]) for row in cur.fetchall() if row and row[0]}

        index_sql = [
            ("produk", "CREATE INDEX IF NOT EXISTS idx_produk_barcode ON produk(barcode)"),
            ("produk", "CREATE INDEX IF NOT EXISTS idx_produk_nama ON produk(nama)"),
            ("per_customers", "CREATE INDEX IF NOT EXISTS idx_per_customers_nama ON per_customers(nama)"),
            ("per_customers", "CREATE INDEX IF NOT EXISTS idx_per_customers_kode ON per_customers(kode)"),
            ("per_customers", "CREATE INDEX IF NOT EXISTS idx_per_customers_member_id ON per_customers(member_id)"),
            ("per_customers", "CREATE INDEX IF NOT EXISTS idx_per_customers_mid ON per_customers(mid)"),
            ("per_customers", "CREATE INDEX IF NOT EXISTS idx_per_customers_tlp_1 ON per_customers(tlp_1)"),
            # edited by glg
            ("per_cabang_device", "CREATE INDEX IF NOT EXISTS idx_per_cabang_device_machine_status_trash_hash ON per_cabang_device(machine_id, status, trash, server_hash)"),
            ("price", "CREATE INDEX IF NOT EXISTS idx_price_produk ON price(produk_id)"),
            ("price", "CREATE INDEX IF NOT EXISTS idx_price_cabang ON price(cabang_id)"),
            ("price", "CREATE INDEX IF NOT EXISTS idx_price_produk_cabang_status_trash ON price(produk_id, cabang_id, status, trash)"),
            ("diskon", "CREATE INDEX IF NOT EXISTS idx_diskon_produk ON diskon(produk_id)"),
            ("diskon", "CREATE INDEX IF NOT EXISTS idx_diskon_cabang ON diskon(cabang_id)"),
            ("diskon", "CREATE INDEX IF NOT EXISTS idx_diskon_filter_aktif ON diskon(status, trash, produk_id, cabang_id)"),
            ("diskon_customer", "CREATE INDEX IF NOT EXISTS idx_diskon_customer_filter ON diskon_customer(status, trash, cabang_id, customer_id, customer_level)"),
            ("diskon_customer", "CREATE INDEX IF NOT EXISTS idx_diskon_customer_periode ON diskon_customer(tanggal_start, tanggal_stop, jam_start, jam_stop)"),
            ("transaksi", "CREATE INDEX IF NOT EXISTS idx_transaksi_quota_filter ON transaksi(trash, jenis_label, dtime)"),
            ("transaksi", "CREATE INDEX IF NOT EXISTS idx_transaksi_nomer_dtime ON transaksi(nomer, dtime)"),
            ("transaksi_data", "CREATE INDEX IF NOT EXISTS idx_transaksi_data_transaksi ON transaksi_data(transaksi_id)"),
            ("transaksi_data", "CREATE INDEX IF NOT EXISTS idx_transaksi_data_transaksi_produk_trash ON transaksi_data(transaksi_id, produk_id, trash)"),
            ("return_transaksi_penjualan", "CREATE INDEX IF NOT EXISTS idx_return_transaksi_id ON return_transaksi_penjualan(transaksi_id)"),
            ("return_transaksi_penjualan", "CREATE INDEX IF NOT EXISTS idx_return_tanggal ON return_transaksi_penjualan(tanggal_return)"),
            ("detail_return_transaksi_penjualan", "CREATE INDEX IF NOT EXISTS idx_detail_return_return_id ON detail_return_transaksi_penjualan(return_id)"),
            ("detail_return_transaksi_penjualan", "CREATE INDEX IF NOT EXISTS idx_detail_return_produk ON detail_return_transaksi_penjualan(produk_id)"),
            ("detail_return_transaksi_penjualan", "CREATE INDEX IF NOT EXISTS idx_detail_return_return_produk ON detail_return_transaksi_penjualan(return_id, produk_id)"),
            ("voucher_return", "CREATE INDEX IF NOT EXISTS idx_voucher_kode ON voucher_return(kode)"),
            ("pembatalan_transaksi_history", "CREATE INDEX IF NOT EXISTS idx_pembatalan_history_transaksi ON pembatalan_transaksi_history(transaksi_id)"),
            ("pembatalan_transaksi_history", "CREATE INDEX IF NOT EXISTS idx_pembatalan_history_cancel_dtime ON pembatalan_transaksi_history(cancel_dtime)"),
        ]

        table_columns_cache = {}

        def _get_index_columns(index_sql_stmt):
            match = re.search(r"\((.+)\)", index_sql_stmt)
            if not match:
                return []
            raw_columns = []
            for token in str(match.group(1)).split(","):
                part = token.strip()
                if not part:
                    continue
                raw_columns.append(part.split()[0].strip('"').strip("'"))
            return [c for c in raw_columns if c]

        for table_name, sql in index_sql:
            if table_name in table_names:
                if table_name not in table_columns_cache:
                    cur.execute(f'PRAGMA table_info("{table_name}")')
                    table_columns_cache[table_name] = {
                        str(row[1]) for row in cur.fetchall() if row and len(row) > 1 and row[1]
                    }
                required_columns = _get_index_columns(sql)
                if required_columns and not set(required_columns).issubset(table_columns_cache.get(table_name, set())):
                    continue
                try:
                    cur.execute(sql)
                except sqlite3.Error as idx_err:
                    LOGGER.warning("Gagal create index %s: %s", table_name, idx_err)

        conn.commit()
        cur.close()
        with _INDEX_DONE_LOCK:
            _INDEX_DONE_FOR_DB.add(db_key)
    except sqlite3.Error as e:
        LOGGER.warning("Gagal membuat index default: %s", e)

def _quote_identifier(identifier: str) -> str:
    return '"' + str(identifier or "").replace('"', '""') + '"'


def _load_clearable_tables(cur, preserve_tables: Iterable[str] = None):
    preserve = {str(name).strip() for name in (preserve_tables or []) if str(name).strip()}
    cur.execute("SELECT name FROM sqlite_master WHERE type='table' AND name NOT LIKE 'sqlite_%'")
    tables = []
    for row in cur.fetchall():
        table_name = str((row or [""])[0] or "").strip()
        if not table_name:
            continue
        if table_name in preserve:
            continue
        tables.append(table_name)
    return tables


def _table_has_rows(cur, table_name: str) -> bool:
    try:
        query = render_sql_template(
            "SELECT 1 FROM {table_name} LIMIT 1",
            table_name=_quote_identifier(table_name),
        )
        cur.execute(query)
        return cur.fetchone() is not None
    except sqlite3.Error:
        return True


def _load_non_empty_tables(cur, table_names):
    non_empty = []
    for table_name in table_names:
        query = render_sql_template(
            "SELECT COUNT(1) FROM {table_name}",
            table_name=_quote_identifier(table_name),
        )
        cur.execute(query)
        row_count = int(cur.fetchone()[0] or 0)
        if row_count > 0:
            non_empty.append((table_name, row_count))
    return non_empty


def clear_all_tables(db_path=None, preserve_tables=None, strict=False):
    if not _enforce_destructive_policy("clear_all_tables", strict=bool(strict)):
        return 0
    conn = connect_sqlite(db_path)
    failed_tables = []
    try:
        conn.execute("PRAGMA foreign_keys = OFF")
        cur = conn.cursor()
        tables = _load_clearable_tables(cur, preserve_tables=preserve_tables)
        target_tables = list(tables)
        if not tables:
            conn.commit()
            return 0

        for _ in range(2):
            failed_tables = []
            for table in tables:
                try:
                    query = render_sql_template(
                        "DELETE FROM {table_name}",
                        table_name=_quote_identifier(table),
                    )
                    cur.execute(query)
                except sqlite3.Error as e:
                    failed_tables.append((table, str(e)))
            if not failed_tables:
                break
            remaining = []
            for table_name, _ in failed_tables:
                if _table_has_rows(cur, table_name):
                    remaining.append(table_name)
            tables = remaining
            if not tables:
                failed_tables = []
                break

        if failed_tables:
            conn.rollback()
            details = ", ".join([f"{name}: {reason}" for name, reason in failed_tables])
            raise RuntimeError(f"Gagal hapus seluruh tabel: {details}")

        try:
            cur.execute("DELETE FROM sqlite_sequence")
        except sqlite3.Error:
            pass

        non_empty_tables = _load_non_empty_tables(cur, target_tables)
        if non_empty_tables:
            conn.rollback()
            details = ", ".join([f"{name}: {rows}" for name, rows in non_empty_tables])
            raise RuntimeError(f"Masih ada data tersisa setelah reset: {details}")

        conn.commit()
        return len(target_tables)
    except _SQLITE_OPERATION_EXCEPTIONS as e:
        LOGGER.warning("Gagal kosongkan database: %s", e)
        if strict:
            raise
        return 0
    finally:
        try:
            conn.execute("PRAGMA foreign_keys = ON")
        except sqlite3.Error:
            pass
        conn.close()


def _resolve_known_db_paths():
    candidates = []
    active_db = get_db_path()
    if active_db:
        candidates.append(os.path.abspath(active_db))

    is_frozen_runtime = bool(getattr(sys, "frozen", False) or getattr(sys, "_MEIPASS", None))
    if not is_frozen_runtime:
        install_db = os.path.abspath(os.path.join(get_install_base_dir(), "db", "beta_sb_pos_sqlite.db"))
        resource_db = os.path.abspath(os.path.join(get_app_resource_dir(), "db", "beta_sb_pos_sqlite.db"))
        legacy_install_db = os.path.abspath(os.path.join(get_install_base_dir(), "db", "beta_sb_pos_sqlite_.db"))
        legacy_resource_db = os.path.abspath(os.path.join(get_app_resource_dir(), "db", "beta_sb_pos_sqlite_.db"))
        candidates.extend([install_db, resource_db, legacy_install_db, legacy_resource_db])

    unique_paths = []
    seen = set()
    for path in candidates:
        normalized = os.path.abspath(path)
        if normalized in seen:
            continue
        if not os.path.exists(normalized):
            continue
        parent_dir = os.path.dirname(normalized) or "."
        if not os.access(parent_dir, os.W_OK):
            continue
        seen.add(normalized)
        unique_paths.append(normalized)
    return unique_paths


def clear_all_known_databases(preserve_tables=None, strict=False):
    if not _enforce_destructive_policy("clear_all_known_databases", strict=bool(strict)):
        return {}
    result = {}
    failed = []
    paths = _resolve_known_db_paths()
    primary_db = os.path.abspath(get_db_path())
    for db_path in paths:
        try:
            cleared = clear_all_tables(db_path=db_path, preserve_tables=preserve_tables, strict=True)
            result[db_path] = {"ok": True, "cleared_tables": cleared}
        except _SQLITE_OPERATION_EXCEPTIONS as e:
            result[db_path] = {"ok": False, "error": str(e)}
            if os.path.abspath(db_path) == primary_db:
                failed.append((db_path, str(e)))

    if failed and strict:
        details = "; ".join([f"{path}: {error}" for path, error in failed])
        raise RuntimeError(f"Gagal reset database pada beberapa path: {details}")
    return result

def get_ppn_from_profile(db_path, default=11):
    try:
        conn = connect_sqlite(db_path)
        cursor = conn.cursor()
        cursor.execute("SELECT ppn FROM company_profile LIMIT 1")
        row = cursor.fetchone()
        conn.close()
        if row:
            raw_ppn = row[0]
            if raw_ppn in (None, "", "None"):
                LOGGER.warning("Nilai PPN kosong, fallback %s", int(default))
                return str(int(default))
            ppn_val = parse_int_from_text(raw_ppn, default=int(default))
            if ppn_val <= 0:
                ppn_val = int(default)
            LOGGER.debug("Nilai PPN dari company_profile: %s", ppn_val)
            return str(ppn_val)  # harus string agar bisa setText()
    except _SQLITE_OPERATION_EXCEPTIONS as e:
        LOGGER.warning("Gagal ambil PPN dari company_profile: %s", e)
    return str(int(default))  # fallback default

def get_company_profile(db_path=None):
    try:
        conn = connect_sqlite(db_path)
        conn.row_factory = sqlite3.Row
        cursor = conn.cursor()
        cursor.execute(
            """
            SELECT
                id, nama, alias, email, tlp, tlp_2, tlp_3,
                alamat, alamat_1, address, kelurahan, kecamatan, kabupaten,
                propinsi, kodepos, extra_struk,
                logo_img, file_name, file_type, file_ext, full_path
            FROM company_profile
            WHERE COALESCE(trash, 0) = 0
            ORDER BY id ASC
            LIMIT 1
            """
        )
        row = cursor.fetchone()
        conn.close()
        if not row:
            return None
        return dict(row)
    except _SQLITE_OPERATION_EXCEPTIONS as e:
        LOGGER.warning("Gagal ambil company_profile: %s", e)
        return None
