from datetime import datetime
import hashlib
import logging
import os
import time
import json
import re

from pypos.core.utils.device_utils import get_active_device_info, get_device_id
from pypos.core.utils.http_json_utils import sanitize_response_text
from pypos.modules.sinkronisasi.config import (
    get_export_tables,
    get_export_flux_empty_cleanup_limit,
    get_export_flux_empty_retention_days,
    get_export_requeue_failed_transient_limit,
    get_export_requeue_failed_transient_max_age_hours,
    get_export_upload_batch_limit,
    get_export_upload_compile_check_enabled,
    get_export_upload_compile_pending_stuck_age_minutes,
    get_export_upload_compile_pending_stuck_attempt_threshold,
    get_export_upload_compile_poll_interval_sec,
    get_export_upload_compile_poll_max_attempt,
    get_export_upload_compile_required_flags,
    get_export_file_retention_days,
    is_export_requeue_failed_transient_on_startup_enabled,
    get_export_upload_retryable_reasons,
    get_export_upload_retry_backoff_base_sec,
    get_export_upload_retry_backoff_factor,
    get_export_upload_retry_backoff_max_sec,
    get_export_upload_retry_max_attempt,
    get_export_upload_timeout_sec,
    is_export_upload_enabled,
    is_settlement_direct_only_mode,
)
from pypos.modules.sinkronisasi.models.export_upload_model import ExportUploadModel
from pypos.modules.sinkronisasi.services.export_upload_api_service import ExportUploadApiService

LOGGER = logging.getLogger(__name__)
_IDEMPOTENCY_RE = re.compile(r'"idempotency_key"\s*:\s*"([^"]+)"', re.IGNORECASE)
_SETTLEMENT_EXPORT_TABLES = {"settlement_history", "transaksi_settlement"}

# edited by glg


class ExportUploadService:
    def __init__(
        self,
        model=None,
        api_service=None,
        retryable_reason_getter=get_export_upload_retryable_reasons,
        device_id_getter=get_device_id,
        active_device_getter=get_active_device_info,
        enabled_getter=is_export_upload_enabled,
        batch_limit_getter=get_export_upload_batch_limit,
        timeout_getter=get_export_upload_timeout_sec,
        max_attempt_getter=get_export_upload_retry_max_attempt,
        backoff_base_getter=get_export_upload_retry_backoff_base_sec,
        backoff_factor_getter=get_export_upload_retry_backoff_factor,
        backoff_max_getter=get_export_upload_retry_backoff_max_sec,
        retention_days_getter=get_export_file_retention_days,
        empty_flux_retention_days_getter=get_export_flux_empty_retention_days,
        empty_flux_cleanup_limit_getter=get_export_flux_empty_cleanup_limit,
        compile_check_enabled_getter=get_export_upload_compile_check_enabled,
        compile_poll_max_attempt_getter=get_export_upload_compile_poll_max_attempt,
        compile_poll_interval_getter=get_export_upload_compile_poll_interval_sec,
        compile_pending_stuck_attempt_threshold_getter=get_export_upload_compile_pending_stuck_attempt_threshold,
        compile_pending_stuck_age_minutes_getter=get_export_upload_compile_pending_stuck_age_minutes,
        compile_required_flags_getter=get_export_upload_compile_required_flags,
        requeue_failed_transient_enabled_getter=is_export_requeue_failed_transient_on_startup_enabled,
        requeue_failed_transient_limit_getter=get_export_requeue_failed_transient_limit,
        requeue_failed_transient_max_age_hours_getter=get_export_requeue_failed_transient_max_age_hours,
        settlement_direct_only_mode_getter=is_settlement_direct_only_mode,
        allowed_export_tables_getter=get_export_tables,
    ):
        self.model = model or ExportUploadModel()
        self.api_service = api_service or ExportUploadApiService()
        self.retryable_reason_getter = retryable_reason_getter
        self.device_id_getter = device_id_getter
        self.active_device_getter = active_device_getter
        self.enabled_getter = enabled_getter
        self.batch_limit_getter = batch_limit_getter
        self.timeout_getter = timeout_getter
        self.max_attempt_getter = max_attempt_getter
        self.backoff_base_getter = backoff_base_getter
        self.backoff_factor_getter = backoff_factor_getter
        self.backoff_max_getter = backoff_max_getter
        self.retention_days_getter = retention_days_getter
        self.empty_flux_retention_days_getter = empty_flux_retention_days_getter
        self.empty_flux_cleanup_limit_getter = empty_flux_cleanup_limit_getter
        self.compile_check_enabled_getter = compile_check_enabled_getter
        self.compile_poll_max_attempt_getter = compile_poll_max_attempt_getter
        self.compile_poll_interval_getter = compile_poll_interval_getter
        self.compile_pending_stuck_attempt_threshold_getter = compile_pending_stuck_attempt_threshold_getter
        self.compile_pending_stuck_age_minutes_getter = compile_pending_stuck_age_minutes_getter
        self.compile_required_flags_getter = compile_required_flags_getter
        self.requeue_failed_transient_enabled_getter = requeue_failed_transient_enabled_getter
        self.requeue_failed_transient_limit_getter = requeue_failed_transient_limit_getter
        self.requeue_failed_transient_max_age_hours_getter = requeue_failed_transient_max_age_hours_getter
        self.settlement_direct_only_mode_getter = settlement_direct_only_mode_getter
        self.allowed_export_tables_getter = allowed_export_tables_getter

    # edited by glg
    def _is_settlement_export_row(self, row):
        table_name = str((row or {}).get("table_name") or "").strip().lower()
        return table_name in _SETTLEMENT_EXPORT_TABLES

    # edited by glg
    # Saat direct-only aktif, row backlog settlement di export_flux disuppress
    # agar tidak double-send (direct + upload file).
    def _suppress_settlement_rows_for_direct_only(self, rows):
        if not bool(self.settlement_direct_only_mode_getter()):
            return list(rows or []), 0
        filtered = []
        suppressed = 0
        for row in rows or []:
            if not self._is_settlement_export_row(row):
                filtered.append(row)
                continue
            row_id = int((row or {}).get("id") or 0)
            if row_id <= 0:
                continue
            table_name = str((row or {}).get("table_name") or "").strip()
            self.model.mark_uploaded(row_id, "skipped_by_settlement_direct_only_mode")
            LOGGER.info(
                "[EXPORT] Suppress backlog settlement row karena direct_only: row_id=%s table=%s",
                row_id,
                table_name or "-",
            )
            suppressed += 1
        return filtered, suppressed

    # edited by glg
    # Sinkronkan upload backlog export_flux dengan toggle tabel export aktif saat ini.
    # Jika tabel sudah OFF, row backlog ditandai uploaded (skipped) agar tidak dikirim.
    def _suppress_rows_by_export_table_toggle(self, rows):
        try:
            allowed_tables_raw = (
                self.allowed_export_tables_getter() if callable(self.allowed_export_tables_getter) else []
            )
        except (RuntimeError, TypeError, ValueError):
            allowed_tables_raw = []
        allowed_tables = {
            str(name or "").strip().lower()
            for name in (allowed_tables_raw or [])
            if str(name or "").strip()
        }
        if not allowed_tables:
            return list(rows or []), 0

        filtered = []
        suppressed = 0
        for row in rows or []:
            table_name = str((row or {}).get("table_name") or "").strip().lower()
            if not table_name or table_name in allowed_tables:
                filtered.append(row)
                continue
            row_id = int((row or {}).get("id") or 0)
            if row_id <= 0:
                continue
            self.model.mark_uploaded(row_id, "skipped_by_export_table_toggle")
            LOGGER.info(
                "[EXPORT] Suppress backlog row karena tabel OFF: row_id=%s table=%s",
                row_id,
                table_name or "-",
            )
            suppressed += 1
        return filtered, suppressed

    def _resolve_machine_id(self):
        device_id = str(self.device_id_getter() or "").strip()
        active = self.active_device_getter(device_id) or {}
        machine_id = str(active.get("machine_id") or device_id or "").strip()
        return machine_id or "UNKNOWN"

    def _extract_last_id(self, batch_start):
        text = str(batch_start or "").strip()
        if not text:
            return "0"
        parts = text.split("-")
        if len(parts) < 2:
            return "0"
        try:
            return str(int(parts[1]))
        except (TypeError, ValueError):
            return "0"

    def _build_metadata(self, row, machine_id):
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        return {
            "time_sent": now,
            "file_size": int(row.get("file_size") or 0),
            "row": int(row.get("row_count") or 0),
            "lastid": self._extract_last_id(row.get("batch_start")),
            "time_create": str(row.get("batch_end") or now),
            "id_machine": machine_id,
            "db_table": str(row.get("table_name") or ""),
            "file_hash": str(row.get("file_hash") or ""),
            "server_hash": str(row.get("server_hash") or ""),
            "file_seq": self._resolve_file_seq(row),
        }

    def _resolve_file_seq(self, row):
        raw_seq = str(row.get("file_seq") or "").strip()
        if raw_seq.isdigit():
            return raw_seq
        raw_id = str(row.get("id") or "").strip()
        if raw_id.isdigit():
            return raw_id
        return ""

    def _hash_file(self, file_path):
        digest = hashlib.sha256()
        with open(file_path, "rb") as handle:
            while True:
                chunk = handle.read(8192)
                if not chunk:
                    break
                digest.update(chunk)
        return digest.hexdigest()

    def _resolve_file_hash(self, row):
        raw_hash = str(row.get("file_hash") or "").strip().lower()
        if len(raw_hash) == 64 and all(ch in "0123456789abcdef" for ch in raw_hash):
            return raw_hash
        file_path = str(row.get("file_path") or "").strip()
        if not file_path or not os.path.isfile(file_path):
            return ""
        try:
            return self._hash_file(file_path)
        except (OSError, IOError):
            return ""

    def _validate_upload_metadata(self, metadata):
        required = ("id_machine", "file_hash", "file_seq")
        missing = [key for key in required if not str(metadata.get(key) or "").strip()]
        if missing:
            return f"metadata_missing:{','.join(missing)}"
        return ""

    def _mark_retry_or_failed(self, row_id, error_message, retryable, response_text=""):
        if not retryable:
            self.model.mark_failed(row_id, error_message, response_text=response_text)
            return
        self.model.mark_retry(
            row_id,
            error_message,
            max_attempt=self.max_attempt_getter(),
            backoff_base=self.backoff_base_getter(),
            backoff_factor=self.backoff_factor_getter(),
            backoff_max=self.backoff_max_getter(),
            response_text=response_text,
        )

    def _run_housekeeping(self):
        cleaned_files = self.model.cleanup_uploaded_files(self.retention_days_getter())
        cleaned_empty = self.model.cleanup_empty_flux_rows(
            self.empty_flux_retention_days_getter(),
            limit=self.empty_flux_cleanup_limit_getter(),
        )
        return int(cleaned_files or 0), int(cleaned_empty or 0)

    def _safe_load_json(self, raw_text):
        text = sanitize_response_text(raw_text).strip()
        if not text:
            return {}
        try:
            loaded = json.loads(text)
            if isinstance(loaded, dict):
                return loaded
            return {}
        except (json.JSONDecodeError, TypeError, ValueError):
            return {}

    def _extract_idempotency_key(self, upload_response_text):
        payload = self._safe_load_json(upload_response_text)
        key = str(payload.get("idempotency_key") or "").strip()
        if key:
            return key
        nested = payload.get("response")
        if isinstance(nested, dict):
            return str(nested.get("idempotency_key") or "").strip()
        matched = _IDEMPOTENCY_RE.search(str(upload_response_text or ""))
        if matched:
            return str(matched.group(1) or "").strip()
        return ""

    def _truncate(self, raw, limit=600):
        return str(raw or "")[: max(20, int(limit or 600))]

    def _extract_compile_snapshot(self, compile_result):
        payload = compile_result.get("response_json") if isinstance(compile_result, dict) else {}
        payload = payload if isinstance(payload, dict) else {}
        idempotency = payload.get("idempotency")
        idempotency = idempotency if isinstance(idempotency, dict) else {}
        per_key = idempotency.get("per_key")
        per_key = per_key if isinstance(per_key, dict) else {}
        return {
            "status_code": compile_result.get("status_code") if isinstance(compile_result, dict) else None,
            "idempotency_key_filter": self._truncate(payload.get("idempotency_key_filter"), 128),
            "pending_compile": int(idempotency.get("pending_compile") or 0) if isinstance(idempotency, dict) else 0,
            "processed": int(idempotency.get("processed") or 0) if isinstance(idempotency, dict) else 0,
            "failed": int(idempotency.get("failed") or 0) if isinstance(idempotency, dict) else 0,
            "found": int(per_key.get("found") or 0) if isinstance(per_key, dict) else 0,
            "compile_state": self._truncate(per_key.get("compile_state"), 40),
            "status": self._truncate(per_key.get("status"), 40),
            "required_stages": per_key.get("required_stages") if isinstance(per_key.get("required_stages"), (list, tuple)) else [],
            "required_flags": per_key.get("required_flags") if isinstance(per_key.get("required_flags"), dict) else {},
            "compiled_transaksi": per_key.get("compiled_transaksi"),
            "compiled_data": per_key.get("compiled_data"),
            "compiled_registry": per_key.get("compiled_registry"),
        }

    def _build_issue_detail(self, row, phase, retryable, error_message, idempotency_key="", status_code=None, compile_snapshot=None):
        detail = {
            "row_id": int(row.get("id") or 0),
            "table_name": self._truncate(row.get("table_name"), 80),
            "file_seq": self._truncate(row.get("file_seq"), 40),
            "phase": self._truncate(phase, 40),
            "retryable": bool(retryable),
            "error": self._truncate(error_message, 180),
            "status_code": status_code,
            "idempotency_key": self._truncate(idempotency_key, 128),
        }
        if isinstance(compile_snapshot, dict) and compile_snapshot:
            detail["compile"] = compile_snapshot
        return detail

    def _log_issue_detail(self, detail):
        try:
            LOGGER.warning("[EXPORT][OBS] %s", json.dumps(detail, ensure_ascii=False))
        except (TypeError, ValueError):
            LOGGER.warning("[EXPORT][OBS] %s", detail)

    def _is_compile_completed(self, payload):
        if not isinstance(payload, dict):
            return False
        def _is_truthy(value):
            return value in {1, "1", True, "true", "ok", "success", "done", "completed"} or str(value).strip().lower() in {"1", "true", "ok", "success", "done", "completed"}

        idempotency = payload.get("idempotency")
        if isinstance(idempotency, dict):
            per_key = idempotency.get("per_key")
            if isinstance(per_key, dict) and int(per_key.get("found") or 0) == 1:
                if _is_truthy(per_key.get("is_compiled_full")):
                    return True
                compile_state = str(per_key.get("compile_state") or "").strip().upper()
                if compile_state in {"DONE", "COMPILED", "COMPLETE", "COMPLETED"}:
                    return True
                status_key = str(per_key.get("status") or "").strip().upper()
                compiled_transaksi = per_key.get("compiled_transaksi")
                compiled_data = per_key.get("compiled_data")
                compiled_registry = per_key.get("compiled_registry")
                required_flags = per_key.get("required_flags")
                if isinstance(required_flags, dict) and required_flags:
                    values = [required_flags.get(k) for k in required_flags.keys()]
                    if values and all(_is_truthy(v) for v in values):
                        return True
                required_stages = per_key.get("required_stages")
                if isinstance(required_stages, (list, tuple, set)) and required_stages:
                    stage_values = {
                        "transaksi": compiled_transaksi,
                        "data": compiled_data,
                        "registry": compiled_registry,
                    }
                    normalized_stages = []
                    for stage in required_stages:
                        stage_name = str(stage or "").strip().lower()
                        if stage_name and stage_name not in normalized_stages:
                            normalized_stages.append(stage_name)
                    if normalized_stages:
                        if all(_is_truthy(stage_values.get(stage_name)) for stage_name in normalized_stages):
                            return True
                if status_key == "PROCESSED":
                    flags = [v for v in (compiled_transaksi, compiled_data, compiled_registry) if v is not None]
                    if flags and all(_is_truthy(v) for v in flags):
                        return True
                return False

            try:
                pending_compile = int(idempotency.get("pending_compile") or 0)
                failed = int(idempotency.get("failed") or 0)
                processed = int(idempotency.get("processed") or 0)
                if pending_compile == 0 and failed == 0 and processed > 0:
                    return True
            except (TypeError, ValueError):
                pass

        for key in ("compiled_complete", "compile_complete", "compiled", "completed"):
            value = payload.get(key)
            if _is_truthy(value):
                return True

        required_flags = [str(v).strip() for v in (self.compile_required_flags_getter() or []) if str(v).strip()]
        if not required_flags:
            return False

        candidate_nodes = [payload]
        data_node = payload.get("data")
        if isinstance(data_node, dict):
            candidate_nodes.append(data_node)

        for node in candidate_nodes:
            if all(flag in node for flag in required_flags):
                all_done = True
                for flag in required_flags:
                    val = node.get(flag)
                    if not _is_truthy(val):
                        all_done = False
                        break
                if all_done:
                    return True
        return False

    def _parse_datetime(self, raw_value):
        text = str(raw_value or "").strip()
        if not text:
            return None
        try:
            return datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
        except (TypeError, ValueError):
            return None

    def _classify_compile_pending(self, row):
        attempt_threshold = max(1, int(self.compile_pending_stuck_attempt_threshold_getter() or 1))
        age_threshold_minutes = max(1, int(self.compile_pending_stuck_age_minutes_getter() or 1))

        attempt_count = int(row.get("upload_attempt_count") or 0)
        next_attempt = attempt_count + 1
        created_at = self._parse_datetime(row.get("created_at"))
        updated_at = self._parse_datetime(row.get("updated_at"))
        baseline = created_at or updated_at
        age_minutes = 0
        if baseline:
            age_minutes = max(0, int((datetime.now() - baseline).total_seconds() // 60))

        is_stuck = next_attempt >= attempt_threshold or age_minutes >= age_threshold_minutes
        if is_stuck:
            return (
                f"compile_status_worker_suspected_stuck(attempt={next_attempt},age_min={age_minutes})",
                True,
            )
        return (
            f"compile_status_pending_normal(attempt={next_attempt},age_min={age_minutes})",
            False,
        )

    def _check_compile_status(self, upload_response_text, timeout):
        if not self.compile_check_enabled_getter():
            return {"ok": True, "retryable": False, "error": "", "response_text": "", "response_json": {}, "status_code": None}

        idempotency_key = self._extract_idempotency_key(upload_response_text)
        if not idempotency_key:
            return {
                "ok": False,
                "retryable": False,
                "error": "compile_status_missing_idempotency_key",
                "response_text": "",
                "response_json": {},
                "status_code": None,
            }

        max_attempt = max(1, int(self.compile_poll_max_attempt_getter() or 1))
        poll_interval = max(0.0, float(self.compile_poll_interval_getter() or 0))
        last_error = "compile_status_pending"
        last_response_text = ""
        last_response_json = {}
        last_status_code = None
        retryable = True

        for attempt in range(1, max_attempt + 1):
            result = self.api_service.fetch_compile_status(idempotency_key, timeout)
            last_response_text = str(result.get("response_text") or "")
            last_response_json = result.get("response_json") if isinstance(result.get("response_json"), dict) else {}
            last_status_code = result.get("status_code")
            if not result.get("ok"):
                last_error = str(result.get("error") or "compile_status_failed")
                retryable = bool(result.get("retryable", True))
                if retryable and attempt < max_attempt and poll_interval > 0:
                    time.sleep(poll_interval)
                    continue
                return {
                    "ok": False,
                    "retryable": retryable,
                    "error": last_error,
                    "response_text": last_response_text,
                    "response_json": last_response_json,
                    "status_code": last_status_code,
                }

            payload = result.get("response_json") or {}
            if self._is_compile_completed(payload):
                return {
                    "ok": True,
                    "retryable": False,
                    "error": "",
                    "response_text": last_response_text,
                    "response_json": payload if isinstance(payload, dict) else {},
                    "status_code": result.get("status_code"),
                }

            if attempt < max_attempt and poll_interval > 0:
                time.sleep(poll_interval)

        return {
            "ok": False,
            "retryable": True,
            "error": last_error,
            "response_text": last_response_text,
            "response_json": last_response_json,
            "status_code": last_status_code,
        }

    # edited by glg
    def _emit_progress(self, callback, percent):
        if not callable(callback):
            return
        try:
            value = int(percent)
        except (TypeError, ValueError):
            value = 0
        value = max(0, min(100, value))
        try:
            callback(value)
        except (RuntimeError, TypeError, ValueError):
            LOGGER.debug("[EXPORT] progress callback gagal")

    def process_pending_uploads(
        self,
        progress_callback=None,
        limit_override=None,
        requeue_failed_transient=False,
        timeout_override=None,
    ):
        self._emit_progress(progress_callback, 0)
        requeued_failed_transient = 0
        suppressed_direct_only = 0
        suppressed_table_toggle = 0
        if bool(requeue_failed_transient) and self.requeue_failed_transient_enabled_getter():
            try:
                requeued_failed_transient = int(
                    self.model.recover_failed_transient_rows(
                        retryable_terms=self.retryable_reason_getter(),
                        limit=self.requeue_failed_transient_limit_getter(),
                        max_age_hours=self.requeue_failed_transient_max_age_hours_getter(),
                    )
                    or 0
                )
            except (RuntimeError, TypeError, ValueError) as exc:
                LOGGER.warning("[EXPORT] recover_failed_transient_rows error: %s", exc)
                requeued_failed_transient = 0
            if requeued_failed_transient > 0:
                LOGGER.info(
                    "[EXPORT] Requeue FAILED transient rows: %s",
                    requeued_failed_transient,
                )

        if not self.enabled_getter():
            cleaned_files, cleaned_empty = self._run_housekeeping()
            self._emit_progress(progress_callback, 100)
            return {
                "uploaded": 0,
                "failed": 0,
                "retried": 0,
                "cleaned_files": cleaned_files,
                "cleaned_empty_flux": cleaned_empty,
                "retry_items": [],
                "failed_items": [],
                "requeued_failed_transient": requeued_failed_transient,
                "suppressed_direct_only": 0,
                "suppressed_table_toggle": 0,
            }

        batch_limit = max(1, int(self.batch_limit_getter() or 1))
        if limit_override is not None:
            try:
                override_val = int(limit_override)
            except (TypeError, ValueError):
                override_val = 0
            if override_val > 0:
                batch_limit = min(batch_limit, override_val)

        rows = self.model.fetch_pending_uploads(batch_limit)
        rows, suppressed_direct_only = self._suppress_settlement_rows_for_direct_only(rows)
        rows, suppressed_table_toggle = self._suppress_rows_by_export_table_toggle(rows)
        if not rows:
            cleaned_files, cleaned_empty = self._run_housekeeping()
            self._emit_progress(progress_callback, 100)
            return {
                "uploaded": 0,
                "failed": 0,
                "retried": 0,
                "cleaned_files": cleaned_files,
                "cleaned_empty_flux": cleaned_empty,
                "retry_items": [],
                "failed_items": [],
                "requeued_failed_transient": requeued_failed_transient,
                "suppressed_direct_only": int(suppressed_direct_only or 0),
                "suppressed_table_toggle": int(suppressed_table_toggle or 0),
            }

        machine_id = self._resolve_machine_id()
        timeout = self.timeout_getter()
        if timeout_override is not None:
            try:
                timeout = max(1, int(timeout_override))
            except (TypeError, ValueError):
                timeout = self.timeout_getter()
        uploaded = 0
        failed = 0
        retried = 0
        retry_items = []
        failed_items = []
        issue_item_limit = 10
        total_rows = max(1, int(len(rows) or 0))
        processed_rows = 0

        for row in rows:
            try:
                row_id = int(row.get("id") or 0)
                if row_id <= 0:
                    continue
                metadata = self._build_metadata(row, machine_id)
                metadata["file_hash"] = self._resolve_file_hash(row)
                validation_error = self._validate_upload_metadata(metadata)
                if validation_error:
                    self.model.mark_failed(row_id, validation_error, response_text="")
                    detail = self._build_issue_detail(
                        row=row,
                        phase="metadata_validation",
                        retryable=False,
                        error_message=validation_error,
                    )
                    self._log_issue_detail(detail)
                    if len(failed_items) < issue_item_limit:
                        failed_items.append(detail)
                    failed += 1
                    continue
                result = self.api_service.upload_file(
                    file_path=str(row.get("file_path") or "").strip(),
                    metadata=metadata,
                    timeout=timeout,
                )
                if result.get("ok"):
                    # edited by glg
                    # Batas sukses POS: upload stream berhasil (status=1) -> langsung UPLOADED.
                    # Compile status backend tidak lagi menjadi syarat sukses POS.
                    upload_response = str(result.get("response_text", "") or "")
                    combined_response = upload_response
                    if self.compile_check_enabled_getter():
                        compile_result = self._check_compile_status(upload_response, timeout)
                        compile_response = str(compile_result.get("response_text", "") or "")
                        if compile_response:
                            combined_response = f"{combined_response}\n{compile_response}".strip()
                        if not compile_result.get("ok"):
                            retryable = bool(compile_result.get("retryable", True))
                            error_message = str(compile_result.get("error") or "compile_status_pending")
                            if error_message == "compile_status_pending":
                                classified_error, _ = self._classify_compile_pending(row)
                                error_message = classified_error
                            idempotency_key = self._extract_idempotency_key(upload_response)
                            if not idempotency_key:
                                snapshot_tmp = self._extract_compile_snapshot(compile_result)
                                idempotency_key = str(snapshot_tmp.get("idempotency_key_filter") or "").strip()
                            compile_snapshot = self._extract_compile_snapshot(compile_result)
                            detail = self._build_issue_detail(
                                row=row,
                                phase="compile_status_non_blocking",
                                retryable=retryable,
                                error_message=error_message,
                                idempotency_key=idempotency_key,
                                status_code=compile_result.get("status_code"),
                                compile_snapshot=compile_snapshot,
                            )
                            self._log_issue_detail(detail)
                    self.model.mark_uploaded(row_id, combined_response)
                    uploaded += 1
                    continue

                retryable = bool(result.get("retryable", True))
                error_message = str(result.get("error") or "upload_failed")
                upload_response = str(result.get("response_text", "") or "")
                idempotency_key = self._extract_idempotency_key(upload_response)
                detail = self._build_issue_detail(
                    row=row,
                    phase="upload_stream",
                    retryable=retryable,
                    error_message=error_message,
                    idempotency_key=idempotency_key,
                    status_code=result.get("status_code"),
                )
                self._log_issue_detail(detail)
                self._mark_retry_or_failed(
                    row_id,
                    error_message,
                    retryable=retryable,
                    response_text=upload_response,
                )
                if retryable:
                    retried += 1
                    if len(retry_items) < issue_item_limit:
                        retry_items.append(detail)
                else:
                    failed += 1
                    if len(failed_items) < issue_item_limit:
                        failed_items.append(detail)
            finally:
                processed_rows += 1
                self._emit_progress(
                    progress_callback,
                    int((processed_rows * 100) / total_rows),
                )

        cleaned_files, cleaned_empty = self._run_housekeeping()
        self._emit_progress(progress_callback, 100)
        return {
            "uploaded": uploaded,
            "failed": failed,
            "retried": retried,
            "cleaned_files": cleaned_files,
            "cleaned_empty_flux": cleaned_empty,
            "retry_items": retry_items,
            "failed_items": failed_items,
            "requeued_failed_transient": requeued_failed_transient,
            "suppressed_direct_only": int(suppressed_direct_only or 0),
            "suppressed_table_toggle": int(suppressed_table_toggle or 0),
        }
