# edited by glg
import ast
import json
import re
import sys
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Sequence, Tuple


REPO_ROOT = Path(__file__).resolve().parents[2]
RUBRIC_DIR = REPO_ROOT / "docs" / "quality" / "modules_v90"

if str(REPO_ROOT) not in sys.path:
    sys.path.insert(0, str(REPO_ROOT))


GENERIC_EXCEPTION_PATTERN = re.compile(r"except\s+Exception\b")
BARE_EXCEPTION_PATTERN = re.compile(r"(?m)^\s*except\s*:\s*$")


@dataclass(frozen=True)
class KpiResult:
    kpi_id: str
    name: str
    weight: int
    score: int
    issues: Tuple[str, ...]


def _read_text(path: Path) -> str:
    return path.read_text(encoding="utf-8-sig")


def _read_json(path: Path) -> Dict[str, object]:
    return json.loads(_read_text(path))


def _compute_weighted_score(weight: int, passed_count: int, total_count: int) -> int:
    if total_count <= 0:
        return int(weight)
    if passed_count >= total_count:
        return int(weight)
    ratio = float(passed_count) / float(total_count)
    return int(round(float(weight) * ratio))


def _parse_python(path: Path) -> ast.AST:
    return ast.parse(_read_text(path))


def _resolve_function_node(abs_path: Path, class_name: str, function_name: str) -> Optional[ast.FunctionDef]:
    tree = _parse_python(abs_path)
    for node in ast.walk(tree):
        if not isinstance(node, ast.ClassDef) or node.name != class_name:
            continue
        for child in node.body:
            if isinstance(child, ast.FunctionDef) and child.name == function_name:
                return child
    return None


def _resolve_function_length(abs_path: Path, class_name: str, function_name: str) -> int:
    node = _resolve_function_node(abs_path, class_name, function_name)
    if node is None:
        return -1
    end_lineno = int(getattr(node, "end_lineno", node.lineno))
    return int(end_lineno - node.lineno + 1)


def _exception_type_contains_exception_name(exc_type: ast.AST) -> bool:
    if isinstance(exc_type, ast.Name):
        return str(exc_type.id) == "Exception"
    if isinstance(exc_type, ast.Tuple):
        return any(_exception_type_contains_exception_name(item) for item in list(exc_type.elts or []))
    return False


def _contains_generic_catch(function_node: ast.FunctionDef) -> bool:
    for node in ast.walk(function_node):
        if not isinstance(node, ast.ExceptHandler):
            continue
        if node.type is None:
            return True
        if _exception_type_contains_exception_name(node.type):
            return True
    return False


def _iter_python_files(module_root: Path) -> Iterable[Path]:
    for path in sorted(module_root.rglob("*.py")):
        if "__pycache__" in path.parts:
            continue
        yield path


def _count_file_lines(path: Path) -> int:
    return len(_read_text(path).splitlines())


def _scan_structural_health(module_root: Path) -> Dict[str, int]:
    max_file_loc = 0
    files_over_1500 = 0
    service_to_controller_imports = 0
    model_to_upper_layer_imports = 0
    view_to_upper_layer_imports = 0

    for path in _iter_python_files(module_root):
        rel = path.relative_to(module_root)
        source_layer = rel.parts[0] if len(rel.parts) > 1 else "(root)"
        loc = _count_file_lines(path)
        max_file_loc = max(max_file_loc, int(loc))
        if int(loc) > 1500:
            files_over_1500 += 1

        try:
            tree = _parse_python(path)
        except SyntaxError:
            continue

        imports: List[str] = []
        for node in ast.walk(tree):
            if isinstance(node, ast.ImportFrom):
                imports.append(str(node.module or ""))
            elif isinstance(node, ast.Import):
                imports.extend(str(item.name or "") for item in list(node.names or []))

        for module in imports:
            if not module.startswith("pypos.modules."):
                continue
            parts = module.split(".")
            if len(parts) < 4:
                continue
            target_layer = str(parts[3] or "").strip()
            if source_layer == "services" and target_layer in {"controllers", "views", "widgets"}:
                service_to_controller_imports += 1
            if source_layer == "models" and target_layer in {"services", "controllers", "views", "widgets"}:
                model_to_upper_layer_imports += 1
            if source_layer == "views" and target_layer in {"controllers", "services", "models"}:
                view_to_upper_layer_imports += 1

    return {
        "max_file_loc": int(max_file_loc),
        "files_over_1500": int(files_over_1500),
        "service_to_controller_imports": int(service_to_controller_imports),
        "model_to_upper_layer_imports": int(model_to_upper_layer_imports),
        "view_to_upper_layer_imports": int(view_to_upper_layer_imports),
    }


def _count_generic_exceptions(module_root: Path) -> int:
    total = 0
    for path in _iter_python_files(module_root):
        text = _read_text(path)
        total += len(GENERIC_EXCEPTION_PATTERN.findall(text))
        total += len(BARE_EXCEPTION_PATTERN.findall(text))
    return int(total)


def _collect_test_names(test_files: Sequence[Path], name_filters: Sequence[str]) -> List[str]:
    filters = [str(item or "").strip().lower() for item in list(name_filters or []) if str(item or "").strip()]
    names: List[str] = []
    for file_path in list(test_files or []):
        if not file_path.exists():
            continue
        try:
            tree = _parse_python(file_path)
        except SyntaxError:
            continue
        for node in ast.walk(tree):
            if not isinstance(node, ast.FunctionDef):
                continue
            name = str(node.name or "").strip()
            if not name.startswith("test_"):
                continue
            if filters and not any(part in name.lower() for part in filters):
                continue
            names.append(name)
    return names


def _check_kpi_function_budget(rubric: Dict[str, object]) -> KpiResult:
    entry = next((item for item in list(rubric.get("kpis") or []) if item.get("id") == "KPI-01"), {})
    weight = int(entry.get("weight") or 0)
    items = list(rubric.get("critical_functions") or [])
    passed = 0
    issues: List[str] = []
    for item in items:
        file_path = str(item.get("file_path") or "").strip()
        class_name = str(item.get("class_name") or "").strip()
        function_name = str(item.get("function_name") or "").strip()
        max_loc = int(item.get("max_loc") or 0)
        abs_path = REPO_ROOT / file_path
        if not abs_path.exists():
            issues.append(f"KPI-01: file tidak ditemukan ({file_path})")
            continue
        loc = _resolve_function_length(abs_path, class_name, function_name)
        if loc < 0:
            issues.append(f"KPI-01: fungsi tidak ditemukan ({class_name}.{function_name})")
            continue
        if max_loc > 0 and loc > max_loc:
            issues.append(f"KPI-01: {class_name}.{function_name}={loc} melebihi max_loc={max_loc}")
            continue
        passed += 1
    return KpiResult(
        kpi_id="KPI-01",
        name=str(entry.get("name") or "Function Budget"),
        weight=weight,
        score=_compute_weighted_score(weight, passed, len(items)),
        issues=tuple(issues),
    )


def _check_kpi_exception_hygiene(rubric: Dict[str, object]) -> KpiResult:
    entry = next((item for item in list(rubric.get("kpis") or []) if item.get("id") == "KPI-02"), {})
    weight = int(entry.get("weight") or 0)
    cfg = dict(rubric.get("exception_hygiene") or {})
    module_root = REPO_ROOT / str(rubric.get("module_root") or "").strip()
    current = _count_generic_exceptions(module_root)
    baseline = max(0, int(cfg.get("baseline_generic_exception") or 0))
    max_allowed = max(0, int(cfg.get("max_allowed_generic_exception") or baseline))
    min_reduction_pct = float(cfg.get("min_reduction_pct") or 0.0)
    reduction_pct = 100.0 if baseline <= 0 else ((float(baseline - current) / float(baseline)) * 100.0)

    checks: List[bool] = []
    issues: List[str] = []

    allowed_ok = int(current) <= int(max_allowed)
    checks.append(allowed_ok)
    if not allowed_ok:
        issues.append(
            f"KPI-02: generic exception terlalu tinggi current={current} max_allowed={max_allowed}"
        )

    if baseline > 0 and min_reduction_pct > 0:
        reduction_ok = reduction_pct >= min_reduction_pct
        checks.append(reduction_ok)
        if not reduction_ok:
            issues.append(
                f"KPI-02: penurunan generic exception belum cukup reduction={reduction_pct:.2f}% "
                f"target={min_reduction_pct:.2f}%"
            )

    passed = sum(1 for item in checks if item)
    return KpiResult(
        kpi_id="KPI-02",
        name=str(entry.get("name") or "Exception Hygiene"),
        weight=weight,
        score=_compute_weighted_score(weight, passed, len(checks)),
        issues=tuple(issues),
    )


def _check_kpi_error_contract(rubric: Dict[str, object]) -> KpiResult:
    entry = next((item for item in list(rubric.get("kpis") or []) if item.get("id") == "KPI-03"), {})
    weight = int(entry.get("weight") or 0)
    targets = list(rubric.get("error_contract_targets") or [])
    passed = 0
    issues: List[str] = []
    for target in targets:
        file_path = str(target.get("file_path") or "").strip()
        required_tokens = [
            str(tok or "").strip()
            for tok in list(target.get("required_tokens") or ["error_code", "reason", "trace_id"])
            if str(tok or "").strip()
        ]
        abs_path = REPO_ROOT / file_path
        if not abs_path.exists():
            issues.append(f"KPI-03: file target error contract tidak ditemukan ({file_path})")
            continue
        text = _read_text(abs_path)
        missing = [token for token in required_tokens if token not in text]
        if missing:
            issues.append(f"KPI-03: token error contract hilang di {file_path}: {','.join(missing)}")
            continue
        passed += 1
    return KpiResult(
        kpi_id="KPI-03",
        name=str(entry.get("name") or "Error Contract"),
        weight=weight,
        score=_compute_weighted_score(weight, passed, len(targets)),
        issues=tuple(issues),
    )


def _check_kpi_test_maturity(rubric: Dict[str, object]) -> KpiResult:
    entry = next((item for item in list(rubric.get("kpis") or []) if item.get("id") == "KPI-04"), {})
    weight = int(entry.get("weight") or 0)
    matrix = dict(rubric.get("test_matrix") or {})
    checks: List[bool] = []
    issues: List[str] = []
    for name, rule in matrix.items():
        if not isinstance(rule, dict):
            checks.append(False)
            issues.append(f"KPI-04: rule test matrix tidak valid ({name})")
            continue
        files = [
            REPO_ROOT / str(path or "").strip()
            for path in list(rule.get("files") or [])
            if str(path or "").strip()
        ]
        min_cases = max(0, int(rule.get("min_cases") or 0))
        name_contains_any = list(rule.get("name_contains_any") or [])
        names = _collect_test_names(files, name_contains_any)
        ok = len(names) >= min_cases
        checks.append(ok)
        if not ok:
            issues.append(f"KPI-04: {name} kurang test cases count={len(names)} min={min_cases}")
    passed = sum(1 for item in checks if item)
    return KpiResult(
        kpi_id="KPI-04",
        name=str(entry.get("name") or "Test Maturity"),
        weight=weight,
        score=_compute_weighted_score(weight, passed, len(checks)),
        issues=tuple(issues),
    )


def _check_kpi_governance(rubric: Dict[str, object]) -> KpiResult:
    entry = next((item for item in list(rubric.get("kpis") or []) if item.get("id") == "KPI-05"), {})
    weight = int(entry.get("weight") or 0)
    checks = list(rubric.get("governance_checks") or [])
    passed = 0
    issues: List[str] = []
    for check in checks:
        file_path = str(check.get("file_path") or "").strip()
        tokens = [str(tok or "").strip() for tok in list(check.get("required_tokens") or []) if str(tok or "").strip()]
        abs_path = REPO_ROOT / file_path
        if not abs_path.exists():
            issues.append(f"KPI-05: file governance tidak ditemukan ({file_path})")
            continue
        text = _read_text(abs_path)
        missing = [token for token in tokens if token not in text]
        if missing:
            issues.append(f"KPI-05: token governance hilang di {file_path}: {','.join(missing)}")
            continue
        passed += 1
    return KpiResult(
        kpi_id="KPI-05",
        name=str(entry.get("name") or "Governance Controls"),
        weight=weight,
        score=_compute_weighted_score(weight, passed, len(checks)),
        issues=tuple(issues),
    )


def _check_kpi_structural_health(rubric: Dict[str, object]) -> KpiResult:
    entry = next((item for item in list(rubric.get("kpis") or []) if item.get("id") == "KPI-06"), {})
    weight = int(entry.get("weight") or 0)
    cfg = dict(rubric.get("structural_health") or {})
    module_root = REPO_ROOT / str(rubric.get("module_root") or "").strip()
    if not module_root.exists():
        return KpiResult(
            kpi_id="KPI-06",
            name=str(entry.get("name") or "Structural Health"),
            weight=weight,
            score=0,
            issues=(f"KPI-06: module_root tidak ditemukan ({module_root})",),
        )

    checks: List[bool] = []
    issues: List[str] = []
    scan = _scan_structural_health(module_root)

    max_file_loc = int(cfg.get("max_file_loc") or 0)
    if max_file_loc > 0:
        ok = int(scan.get("max_file_loc") or 0) <= max_file_loc
        checks.append(ok)
        if not ok:
            issues.append(
                f"KPI-06: max_file_loc terlalu tinggi current={scan.get('max_file_loc')} max_allowed={max_file_loc}"
            )

    max_files_over_1500 = int(cfg.get("max_files_over_1500") or 0)
    if max_files_over_1500 >= 0 and "max_files_over_1500" in cfg:
        ok = int(scan.get("files_over_1500") or 0) <= max_files_over_1500
        checks.append(ok)
        if not ok:
            issues.append(
                "KPI-06: jumlah file >1500 LOC terlalu tinggi "
                f"current={scan.get('files_over_1500')} max_allowed={max_files_over_1500}"
            )

    max_service_to_controller_imports = int(cfg.get("max_service_to_controller_imports") or 0)
    if max_service_to_controller_imports >= 0 and "max_service_to_controller_imports" in cfg:
        ok = int(scan.get("service_to_controller_imports") or 0) <= max_service_to_controller_imports
        checks.append(ok)
        if not ok:
            issues.append(
                "KPI-06: service->controller/view import melebihi batas "
                f"current={scan.get('service_to_controller_imports')} max_allowed={max_service_to_controller_imports}"
            )

    max_model_to_upper_layer_imports = int(cfg.get("max_model_to_upper_layer_imports") or 0)
    if max_model_to_upper_layer_imports >= 0 and "max_model_to_upper_layer_imports" in cfg:
        ok = int(scan.get("model_to_upper_layer_imports") or 0) <= max_model_to_upper_layer_imports
        checks.append(ok)
        if not ok:
            issues.append(
                "KPI-06: model->upper-layer import melebihi batas "
                f"current={scan.get('model_to_upper_layer_imports')} max_allowed={max_model_to_upper_layer_imports}"
            )

    max_view_to_upper_layer_imports = int(cfg.get("max_view_to_upper_layer_imports") or 0)
    if max_view_to_upper_layer_imports >= 0 and "max_view_to_upper_layer_imports" in cfg:
        ok = int(scan.get("view_to_upper_layer_imports") or 0) <= max_view_to_upper_layer_imports
        checks.append(ok)
        if not ok:
            issues.append(
                "KPI-06: view->upper-layer import melebihi batas "
                f"current={scan.get('view_to_upper_layer_imports')} max_allowed={max_view_to_upper_layer_imports}"
            )

    if not checks:
        return KpiResult(
            kpi_id="KPI-06",
            name=str(entry.get("name") or "Structural Health"),
            weight=weight,
            score=int(weight),
            issues=tuple(),
        )

    passed = sum(1 for item in checks if item)
    return KpiResult(
        kpi_id="KPI-06",
        name=str(entry.get("name") or "Structural Health"),
        weight=weight,
        score=_compute_weighted_score(weight, passed, len(checks)),
        issues=tuple(issues),
    )


def evaluate_module_kpi(rubric_path: Path) -> Dict[str, object]:
    rubric = _read_json(rubric_path)
    module_name = str(rubric.get("module_name") or rubric_path.stem).strip()
    score_cfg = dict(rubric.get("score") or {})
    max_score = int(score_cfg.get("max") or 100)
    pass_score = int(score_cfg.get("pass_score") or 90)

    results = [
        _check_kpi_function_budget(rubric),
        _check_kpi_exception_hygiene(rubric),
        _check_kpi_error_contract(rubric),
        _check_kpi_test_maturity(rubric),
        _check_kpi_governance(rubric),
        _check_kpi_structural_health(rubric),
    ]
    score = int(sum(int(item.score) for item in results))
    issues: List[str] = []
    for item in results:
        issues.extend(list(item.issues))
    if score < pass_score:
        issues.append(f"KPI total {module_name} belum lulus: score={score}, required={pass_score}")
    return {
        "module_name": module_name,
        "rubric_path": str(rubric_path.relative_to(REPO_ROOT)),
        "max_score": max_score,
        "pass_score": pass_score,
        "score": score,
        "kpi_results": [item.__dict__ for item in results],
        "issues": issues,
    }


def evaluate_all_modules() -> Dict[str, object]:
    rubric_paths = sorted(RUBRIC_DIR.glob("*_rubric_v90.json"))
    if not rubric_paths:
        return {
            "module_results": [],
            "issues": [f"Rubric modul v90 tidak ditemukan di {RUBRIC_DIR}"],
        }
    module_results = [evaluate_module_kpi(path) for path in rubric_paths]
    issues: List[str] = []
    for item in module_results:
        module_name = str(item.get("module_name") or "-")
        for issue in list(item.get("issues") or []):
            issues.append(f"{module_name}: {issue}")
    return {
        "module_results": module_results,
        "issues": issues,
    }


def run_modules_quality_guard_v90() -> List[str]:
    summary = evaluate_all_modules()
    return list(summary.get("issues") or [])


def main() -> int:
    summary = evaluate_all_modules()
    module_results = list(summary.get("module_results") or [])
    for module in module_results:
        print(
            f"{module.get('module_name')}: score="
            f"{int(module.get('score') or 0)}/{int(module.get('max_score') or 0)} "
            f"(required={int(module.get('pass_score') or 0)})"
        )
    issues = list(summary.get("issues") or [])
    if issues:
        print("Modules quality guard v90 gagal.")
        for item in issues:
            print(f"- {item}")
        return 1
    print("Modules quality guard v90 lulus.")
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
