import os
import sqlite3
import threading
import time
from datetime import datetime, timedelta
from PySide6.QtCore import QTimer, Qt

from pypos.core.base_controller import BaseController
from pypos.core.utils.path_utils import get_db_path
from pypos.core.utils.device_utils import get_device_id, get_active_device_info
from pypos.core.utils.config_utils import read_config
from pypos.core.utils.db_helper import connect_sqlite_read_fast
from pypos.modules.sinkronisasi.services.sync_service import SyncService
from pypos.core.utils.app_state_utils import get_bool
from pypos.modules.sinkronisasi.services.transaction_export_service import TransactionExportService
from pypos.modules.sinkronisasi.services.export_upload_service import ExportUploadService
from pypos.modules.sinkronisasi.services.export_cycle_service import ExportCycleService
from pypos.modules.sinkronisasi.services.settlement_direct_service import SettlementDirectService
from pypos.modules.sinkronisasi.services.network_orchestrator_service import NetworkOrchestratorService
from pypos.modules.sinkronisasi.services.event_ingestion_backpressure_service import (
    EventIngestionBackpressureService,
)
from pypos.modules.platform_ops.services.fleet_rollout_runtime_guard_service import (
    FleetRolloutRuntimeGuardService,
)
from pypos.modules.dashboard.controllers.dashboard_event_runtime_mixin import (
    DashboardEventRuntimeMixin,
)
from pypos.modules.dashboard.controllers.dashboard_export_batch_mixin import (
    DashboardExportBatchMixin,
)
from pypos.modules.dashboard.services.dashboard_config_service import DashboardConfigService
from pypos.modules.dashboard.services.dashboard_asset_service import DashboardAssetService
from pypos.modules.dashboard.services.dashboard_value_utils import DashboardValueUtils
from pypos.modules.dashboard.services.export_runtime_metrics_service import (
    ExportRuntimeMetricsService,
)
from pypos.core.utils.worker_pool_utils import submit_ui_periodic_task_keyed


class DashboardController(DashboardExportBatchMixin, DashboardEventRuntimeMixin, BaseController):
    def __init__(self, view, app_controller):
        super().__init__()
        self.view = view
        self.app_controller = app_controller
        self.config_service = DashboardConfigService()
        self.asset_service = DashboardAssetService()
        self._pending_manual_sync = False
        # edited by glg
        self._pending_export_after_sync = False
        self._pending_export_after_settlement = False
        self._pending_export_startup_replay = False
        self._export_retry_scheduled = False
        self._export_startup_gate_probe_inflight = False
        # edited by glg
        # Guard dispatch export async agar eksekusi batch tidak memblokir thread UI.
        self._export_dispatch_lock = threading.Lock()
        self._export_worker_inflight = False
        # edited by glg
        # Timer-export boleh ditunda sebentar saat user aktif pada halaman transaksi berat
        # agar interaksi klik/hover tidak rebutan resource dengan siklus export.
        self._periodic_export_defer_scheduled = False
        self._periodic_export_defer_attempt = 0
        # edited by glg
        # Push direct settlement berjalan async agar UI settlement tidak tersendat.
        self._settlement_direct_service = None
        self._settlement_direct_inflight = False
        # edited by glg
        # Orchestrator jaringan tunggal untuk flow sync/export/settlement direct.
        self.network_orchestrator = NetworkOrchestratorService()
        # edited by glg
        # Fondasi P2 scale:
        # - outbox metrics untuk observasi backlog
        # - policy backpressure untuk menyesuaikan ritme recheck export.
        self._event_outbox_service = None
        self._event_backpressure_service = EventIngestionBackpressureService(
            self._build_event_backpressure_policy()
        )
        self._event_first_export_runtime_service = None
        self._fleet_rollout_runtime_guard_service = FleetRolloutRuntimeGuardService()
        # edited by glg
        # Metrik runtime export (error rate + p95 latency) untuk input backpressure adaptif.
        self._export_runtime_metrics = ExportRuntimeMetricsService()
        # edited by glg
        # Guard pengiriman settlement direct (close-app safety):
        # simpan state pending/gagal agar aplikasi tidak ditutup saat belum terkirim.
        self._settlement_delivery_lock = threading.Lock()
        self._settlement_unsent_map = {}
        # edited by glg
        # Antrean settlement direct yang harus menunggu export settlement-trigger selesai.
        self._pending_settlement_direct_after_export = {}
        # edited by glg
        # Mode autosync notify_only:
        # - timer probe ringan (default 60 detik) hanya cek update (tanpa apply)
        # - apply sinkronisasi dilakukan setelah konfirmasi user.
        self._auto_sync_probe_lock = threading.Lock()
        self._auto_sync_probe_inflight = False
        self._auto_sync_update_required = False
        self._auto_sync_update_signature = ""
        self._auto_sync_update_snoozed_signature = ""
        # edited by glg
        # Penundaan notifikasi update (TTL) agar dialog tidak spam:
        # - klik "Nanti" => jeda default 10 menit
        # - setelah TTL habis, notifikasi boleh muncul lagi untuk signature sama.
        self._auto_sync_update_snoozed_until = None
        # edited by glg
        # Simpan payload update terakhir agar tombol "Sinkron Sekarang"
        # bisa memunculkan dialog konfirmasi yang sama saat update masih pending.
        self._auto_sync_update_last_payload = {}
        self._auto_sync_update_prompt_open = False
        self._pending_auto_sync_apply = False
        self._auto_sync_apply_recheck_scheduled = False

    def get_ui_layout_config(self):
        return self.config_service.get_ui_layout_config()

    # edited by glg
    def get_online_poll_interval_ms(self):
        return self.config_service.get_online_poll_interval_ms()

    # edited by glg
    def get_online_probe_timeout_sec(self):
        return self.config_service.get_online_probe_timeout_sec()

    # edited by glg
    def _can_run_network_operation(self, process_name, force_probe=False, log_block=True):
        process = str(process_name or "").strip().lower()
        if not process:
            process = NetworkOrchestratorService.PROCESS_SYNC
        try:
            decision = self.network_orchestrator.can_run(
                process=process,
                force_probe=bool(force_probe),
            )
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
            self.log_warning(f"[NetworkOrchestrator] fallback allow (error={exc})")
            return {
                "allow": True,
                "state": "UNKNOWN",
                "reason": "orchestrator_error",
                "server_online": True,
                "internet_online": True,
            }
        if log_block and not bool(decision.get("allow")):
            self.log_info(
                "[NetworkOrchestrator] BLOCK "
                f"process={process} state={decision.get('state')} reason={decision.get('reason')} "
                f"server_online={int(bool(decision.get('server_online')))} "
                f"internet_online={int(bool(decision.get('internet_online')))} "
                f"fail_streak={int(decision.get('fail_streak') or 0)}"
            )
        return decision

    # edited by glg
    def get_auto_sync_probe_interval_seconds(self):
        return self.config_service.get_auto_sync_probe_interval_seconds()

    # edited by glg
    def check_online_status_for_ui(self):
        # edited by glg
        # Catat latensi probe online agar bottleneck jaringan pada UI bisa dilacak dari log.
        started_at = time.perf_counter()
        probe_source = "fallback_false"
        timeout_sec = self.get_online_probe_timeout_sec()
        try:
            if self.app_controller and hasattr(self.app_controller, "is_online_for_ui"):
                probe_source = "is_online_for_ui"
                return bool(self.app_controller.is_online_for_ui(timeout=timeout_sec))
            if self.app_controller and hasattr(self.app_controller, "is_online_fast"):
                probe_source = "is_online_fast"
                return bool(self.app_controller.is_online_fast(timeout=timeout_sec))
            if self.app_controller and hasattr(self.app_controller, "is_online"):
                probe_source = "is_online"
                return bool(self.app_controller.is_online())
            return False
        finally:
            elapsed_ms = (time.perf_counter() - started_at) * 1000.0
            if elapsed_ms >= 150.0:
                self.log_info(
                    f"[PERF] online_probe source={probe_source} "
                    f"timeout_sec={float(timeout_sec):.2f} elapsed_ms={elapsed_ms:.1f}"
                )

    def create_dashboard_info_controller(self):
        from pypos.modules.dashboard.controllers.dashboard_info_controller import DashboardInfoController
        return DashboardInfoController(get_db_path())

    def create_transaksi_penjualan_controller(self, user_info, parent_window, scanner_controller=None):
        from pypos.modules.customer.controllers.customer_controller import CustomerController
        from pypos.modules.penjualan.controllers.transaksi_penjualan_controller import (
            TransaksiPenjualanController,
        )
        customer_list = CustomerController().load_all_customers()
        return TransaksiPenjualanController(
            customer_list,
            user_info,
            get_db_path(),
            parent_window=parent_window,
            scanner_controller=scanner_controller,
        )

    def create_history_transaksi_controller(
        self,
        parent_window,
        user_id,
        tanggal,
        on_close_requested,
    ):
        from pypos.modules.penjualan.controllers.history_transaksi_controller import (
            HistoryTransaksiController,
        )
        return HistoryTransaksiController(
            parent_window,
            user_id,
            get_db_path(),
            tanggal=tanggal,
            as_page=True,
            on_close_requested=on_close_requested,
        )

    def create_settlement_controller(self, user_info, parent_window, on_close_requested):
        from pypos.modules.penjualan.controllers.settlement_controller import SettlementController
        return SettlementController(
            db_path=get_db_path(),
            user_info=user_info,
            parent_window=parent_window,
            as_page=True,
            on_close_requested=on_close_requested,
            on_settlement_completed=self.request_export_after_settlement,
        )

    def load_header_logo(self, force_refresh=False):
        cache_path = self.view._get_logo_cache_path() if self.view else None
        if force_refresh and cache_path and os.path.isfile(cache_path):
            try:
                os.remove(cache_path)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
                self.log_warning(f"Gagal hapus cache logo: {e}")
        if cache_path and os.path.isfile(cache_path) and not force_refresh:
            self.view._apply_header_logo_path(cache_path)
        threading.Thread(target=self._fetch_header_logo, daemon=True).start()

    def _delete_header_logo_cache(self):
        cache_path = self.view._get_logo_cache_path() if self.view else None
        if cache_path and os.path.isfile(cache_path):
            try:
                os.remove(cache_path)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
                self.log_warning(f"Gagal hapus cache logo (fetch gagal): {e}")

    def _fetch_header_logo(self):
        try:
            base_url = self.config_service.get_api_base_url()
            if not base_url:
                self._delete_header_logo_cache()
                return
            endpoint = self.config_service.get_ui_assets_endpoint()
            timeout = self.config_service.get_request_timeout()
            payload = self.asset_service.fetch_ui_assets_payload(
                base_url=base_url,
                endpoint=endpoint,
                machine_id=get_device_id(),
                timeout=timeout,
            )
            if not payload:
                self._delete_header_logo_cache()
                return
            logo_url = payload.get("logo_header_url") if isinstance(payload, dict) else ""
            if not logo_url:
                self._delete_header_logo_cache()
                return
            if not str(logo_url).lower().startswith(("http://", "https://", "data:")):
                if not str(logo_url).startswith("/"):
                    logo_url = "/" + str(logo_url)
                logo_url = base_url + str(logo_url)
            content = self.asset_service.download_logo_content(str(logo_url), timeout=timeout)
            if not content:
                self._delete_header_logo_cache()
                return
            self.view.logo_loaded.emit(content)
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
            self._delete_header_logo_cache()
            self.log_warning(f"Gagal fetch header logo: {e}")
            return

    def init_auto_sync_master(self):
        # edited by glg
        # Auto-sync wajib fokus ke master data kasir (watch tables),
        # bukan full sync_tables agar tabel non-master seperti fifo_avg tidak ikut.
        self.view.auto_sync_tables = self.config_service.get_sync_watch_tables() or [
            "produk",
            "price",
            "price_per_area",
            "diskon",
            "diskon_customer",
            "per_customers",
            "setting_struk",
        ]
        self.view.auto_sync_service = SyncService()
        self.view.auto_sync_timer = QTimer(self.view)
        self.view.auto_sync_timer.timeout.connect(self._auto_sync_master)
        # edited by glg
        # Auto-sync dikunci ke notifikasi:
        # - timer periodik hanya cek update ringan
        # - apply sinkronisasi tetap lewat konfirmasi user.
        interval_sec = self.get_auto_sync_probe_interval_seconds()
        interval_ms = max(15000, int(interval_sec * 1000))
        self.log_info(
            "[AutoSync] mode=notify_only_locked "
            f"probe_interval_sec={int(max(1, interval_sec))}"
        )
        self.view.auto_sync_timer.start(interval_ms)
        delay_sec = self.config_service.get_auto_sync_start_delay_seconds()
        QTimer.singleShot(delay_sec * 1000, self._auto_sync_master)
        self.view.auto_sync_service.sync_completed.connect(
            self._on_auto_sync_completed,
            Qt.QueuedConnection
        )

    # edited by glg
    @staticmethod
    def _safe_int(value, default=0):
        return DashboardValueUtils.safe_int(value, default)

    # edited by glg
    @staticmethod
    def _format_int_local(value):
        return DashboardValueUtils.format_int_local(value)

    # edited by glg
    @staticmethod
    def _safe_float(value, default=None):
        return DashboardValueUtils.safe_float(value, default)

    # edited by glg
    def _format_rupiah_local(self, value):
        return DashboardValueUtils.format_rupiah_local(value)

    # edited by glg
    @staticmethod
    def _friendly_sync_table_name(table_name):
        return DashboardValueUtils.friendly_sync_table_name(table_name)

    # edited by glg
    def _resolve_sync_device_context(self):
        config = read_config() or {}
        device_id = str(get_device_id() or "").strip()
        device_info = get_active_device_info(device_id) or {}
        machine_id = str(
            device_info.get("machine_id")
            or config.get("machine_id")
            or device_id
            or ""
        ).strip()
        cabang_id = device_info.get("cabang_id")
        if cabang_id in (None, "", "None"):
            cabang_id = config.get("cabang_id", 0)
        cabang_id_int = self._safe_int(cabang_id, 0)
        if not machine_id or cabang_id_int <= 0:
            return "", 0
        return machine_id, cabang_id_int

    # edited by glg
    @staticmethod
    def _extract_check_update_samples(raw_value):
        return DashboardValueUtils.extract_check_update_samples(raw_value)

    # edited by glg
    def _extract_changed_tables_from_check(self, check_response, requested_tables=None):
        return DashboardValueUtils.extract_changed_tables_from_check(
            check_response,
            requested_tables=requested_tables,
        )

    # edited by glg
    def _extract_row_total_from_check(self, check_response, changed_tables):
        return DashboardValueUtils.extract_row_total_from_check(check_response, changed_tables)

    # edited by glg
    def _read_last_sync_marker(self, model, table_name):
        info = {}
        try:
            info = model.get_last_sync_info() if model else {}
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            info = {}
        row = info.get(str(table_name or "").strip()) if isinstance(info, dict) else {}
        if not isinstance(row, dict):
            row = {}
        dtime = str(row.get("last_update") or "2000-01-01 00:00:00").strip()
        if not dtime:
            dtime = "2000-01-01 00:00:00"
        last_id = self._safe_int(row.get("last_id"), 0)
        return dtime, max(0, int(last_id))

    # edited by glg
    @staticmethod
    def _extract_sync_new_rows(sync_response, table_name):
        if not isinstance(sync_response, dict):
            return []
        data = sync_response.get("data")
        if not isinstance(data, dict):
            return []
        table_payload = data.get(str(table_name or "").strip())
        # edited by glg
        # Kompatibilitas payload:
        # - format baru: data[table] = {"new": [...]}
        # - format legacy: data[table] = [...]
        if isinstance(table_payload, list):
            return [row for row in table_payload if isinstance(row, dict)]
        if not isinstance(table_payload, dict):
            return []
        for key in ("new", "rows", "data", "items"):
            rows = table_payload.get(key)
            if isinstance(rows, list):
                return [row for row in rows if isinstance(row, dict)]
        return []

    # edited by glg
    def _lookup_local_product_price_snapshot(self, produk_id, cabang_id, limit=8):
        parsed_produk_id = self._safe_int(produk_id, 0)
        parsed_cabang_id = self._safe_int(cabang_id, 0)
        if parsed_produk_id <= 0:
            return "", []
        conn = None
        try:
            conn = connect_sqlite_read_fast(
                db_path=get_db_path(),
                timeout=3,
                busy_timeout_ms=250,
            )
            conn.row_factory = sqlite3.Row
            cur = conn.cursor()
            cur.execute(
                "SELECT nama FROM produk WHERE id = ? LIMIT 1",
                (parsed_produk_id,),
            )
            row_nama = cur.fetchone()
            nama = str((row_nama["nama"] if row_nama else "") or "").strip()

            values = []
            safe_limit = max(1, int(limit or 1))
            try:
                cur.execute(
                    """
                    SELECT nilai
                    FROM price
                    WHERE produk_id = ?
                      AND jenis_value = 'harga_list'
                      AND COALESCE(status, 1) = 1
                      AND COALESCE(trash, 0) = 0
                      AND (cabang_id = ? OR cabang_id IN (-1, 0) OR cabang_id IS NULL)
                    ORDER BY
                      CASE
                        WHEN cabang_id = ? THEN 0
                        WHEN cabang_id IN (-1, 0) OR cabang_id IS NULL THEN 1
                        ELSE 2
                      END,
                      id DESC
                    LIMIT ?
                    """,
                    (parsed_produk_id, parsed_cabang_id, parsed_cabang_id, safe_limit),
                )
            except sqlite3.OperationalError:
                cur.execute(
                    """
                    SELECT nilai
                    FROM price
                    WHERE produk_id = ?
                      AND jenis_value = 'harga_list'
                    ORDER BY id DESC
                    LIMIT ?
                    """,
                    (parsed_produk_id, safe_limit),
                )
            for row_nilai in cur.fetchall():
                try:
                    nilai = row_nilai["nilai"]
                except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
                    try:
                        nilai = row_nilai[0]
                    except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
                        nilai = None
                parsed = self._safe_float(nilai, default=None)
                if parsed is None:
                    continue
                values.append(parsed)
            return nama, values
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            return "", []
        finally:
            if conn:
                try:
                    conn.close()
                except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
                    pass

    # edited by glg
    @staticmethod
    def _resolve_before_price(local_values, after_value):
        if not isinstance(local_values, list) or not local_values:
            return None
        try:
            after_float = float(after_value) if after_value is not None else None
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            after_float = None
        if after_float is None:
            return local_values[0]
        for raw in local_values:
            try:
                candidate = float(raw)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
                continue
            if abs(candidate - after_float) > 0.0000001:
                return candidate
        return None

    # edited by glg
    def _build_price_delta_samples_from_rows(self, rows, cabang_id, max_items=3):
        limit = max(1, int(max_items or 1))
        results = []
        seen_produk_ids = set()
        for row in list(rows or []):
            if not isinstance(row, dict):
                continue
            jenis_value = str(row.get("jenis_value") or "").strip().lower()
            if jenis_value and jenis_value != "harga_list":
                continue
            produk_id = self._safe_int(row.get("produk_id"), 0)
            if produk_id <= 0 or produk_id in seen_produk_ids:
                continue
            seen_produk_ids.add(produk_id)
            harga_baru = self._safe_float(
                row.get("nilai"),
                default=self._safe_float(row.get("value"), default=None),
            )
            nama_lokal, harga_versions = self._lookup_local_product_price_snapshot(
                produk_id=produk_id,
                cabang_id=cabang_id,
                limit=8,
            )
            harga_lama = self._resolve_before_price(harga_versions, harga_baru)
            entity_name = (
                nama_lokal
                or str(row.get("produk_nama") or row.get("nama") or "").strip()
            )
            if not entity_name:
                # edited by glg
                # Hindari fallback "Produk <id>" pada popup update karena
                # pengguna mengharapkan nama produk yang human-readable.
                continue
            results.append(
                {
                    "entity_name": entity_name,
                    "entity_id": str(produk_id),
                    "changed_field": "harga_jual",
                    "before_value": harga_lama,
                    "after_value": harga_baru,
                }
            )
            if len(results) >= limit:
                break
        return results

    # edited by glg
    def _collect_price_delta_samples(self, model, machine_id, cabang_id, max_items=3):
        if model is None:
            return []
        try:
            dtime, last_id = self._read_last_sync_marker(model, "price")
            preview = model.sync_data_server_table(
                machine_id=machine_id,
                cabang_id=int(cabang_id),
                table_name="price",
                dtime=dtime,
                last_id=last_id,
                partial=0,
                page_size=max(10, int(max_items or 3) * 8),
            )
            rows = self._extract_sync_new_rows(preview, "price")
            return self._build_price_delta_samples_from_rows(
                rows=rows,
                cabang_id=int(cabang_id),
                max_items=max_items,
            )
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
            self.log_warning(f"[AutoSync] fallback sampel harga gagal: {exc}")
            return []

    # edited by glg
    @staticmethod
    def _table_has_readable_price_samples(table_item):
        samples = list((table_item or {}).get("samples") or [])
        for sample in samples:
            if not isinstance(sample, dict):
                continue
            entity_name = str(sample.get("entity_name") or "").strip()
            before_value = sample.get("before_value")
            after_value = sample.get("after_value")
            if entity_name and (before_value is not None or after_value is not None):
                return True
        return False

    # edited by glg
    def _enrich_changed_tables_with_price_delta(self, changed_tables, model, machine_id, cabang_id):
        if not isinstance(changed_tables, list):
            return []
        enriched = []
        price_samples_cache = None
        for item in changed_tables:
            row = dict(item) if isinstance(item, dict) else {}
            table_name = str(row.get("table") or "").strip().lower()
            if table_name != "price":
                enriched.append(row)
                continue
            if self._table_has_readable_price_samples(row):
                enriched.append(row)
                continue
            if price_samples_cache is None:
                price_samples_cache = self._collect_price_delta_samples(
                    model=model,
                    machine_id=machine_id,
                    cabang_id=cabang_id,
                    max_items=3,
                )
            if price_samples_cache:
                row["samples"] = [dict(sample) for sample in price_samples_cache]
            enriched.append(row)
        return enriched

    # edited by glg
    def _build_auto_sync_update_signature(self, row_total, changed_tables):
        return DashboardValueUtils.build_auto_sync_update_signature(row_total, changed_tables)

    # edited by glg
    def _build_auto_sync_prompt_message(self, row_total, changed_tables):
        return DashboardValueUtils.build_auto_sync_prompt_message(row_total, changed_tables)

    # edited by glg
    def _emit_auto_sync_probe_result(self, payload):
        if not hasattr(self.view, "auto_sync_update_detected"):
            return
        try:
            self.view.auto_sync_update_detected.emit(payload)
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            pass

    # edited by glg
    def _run_auto_sync_probe_async(self):
        with self._auto_sync_probe_lock:
            if self._auto_sync_probe_inflight:
                return False
            self._auto_sync_probe_inflight = True

        tables = list(getattr(self.view, "auto_sync_tables", []) or [])

        def _worker():
            payload = {
                "has_update": False,
                "row_total": 0,
                "changed_tables": [],
                "signature": "",
                "error": "",
                "skip_update_state": False,
            }
            model = None
            try:
                # edited by glg
                # Probe network orchestrator dipindah ke worker agar UI thread tidak terkena blocking I/O.
                gate = self._can_run_network_operation(
                    NetworkOrchestratorService.PROCESS_SYNC,
                    force_probe=True,
                    log_block=False,
                )
                if not bool(gate.get("allow")):
                    payload.update(
                        {
                            "skip_update_state": True,
                            "gate_blocked": True,
                            "gate_state": str(gate.get("state") or ""),
                            "gate_reason": str(gate.get("reason") or ""),
                        }
                    )
                    return
                machine_id, cabang_id = self._resolve_sync_device_context()
                if not machine_id or int(cabang_id or 0) <= 0:
                    payload["error"] = "device_or_cabang_not_ready"
                    return
                from pypos.modules.sinkronisasi.models.sinkron_model import SinkronModel

                model = SinkronModel()
                check = model.check_update_server(
                    machine_id=machine_id,
                    cabang_id=int(cabang_id),
                    tables=tables,
                    force_full_tables=[],
                )
                changed_tables = self._extract_changed_tables_from_check(
                    check_response=check or {},
                    requested_tables=tables,
                )
                changed_tables = self._enrich_changed_tables_with_price_delta(
                    changed_tables=changed_tables,
                    model=model,
                    machine_id=machine_id,
                    cabang_id=int(cabang_id),
                )
                row_total = self._extract_row_total_from_check(
                    check_response=check or {},
                    changed_tables=changed_tables,
                )
                signature = self._build_auto_sync_update_signature(row_total, changed_tables)
                payload.update(
                    {
                        "has_update": bool(row_total > 0),
                        "row_total": int(row_total),
                        "changed_tables": changed_tables,
                        "signature": signature,
                    }
                )
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
                payload["error"] = str(exc or "unknown_error")
            finally:
                try:
                    if model:
                        model.close_connections()
                except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
                    pass
                with self._auto_sync_probe_lock:
                    self._auto_sync_probe_inflight = False
                self._emit_auto_sync_probe_result(payload)

        threading.Thread(
            target=_worker,
            daemon=True,
            name="autosync-update-probe",
        ).start()
        return True

    # edited by glg
    def _start_auto_sync_apply(self, source="manual"):
        if not hasattr(self.view, "auto_sync_service") or not self.view.auto_sync_service:
            self.log_warning("[AutoSync] start apply gagal: auto_sync_service belum siap")
            return False
        tables = list(getattr(self.view, "auto_sync_tables", []) or [])
        if not tables:
            self.log_warning("[AutoSync] start apply gagal: auto_sync_tables kosong")
            return False
        if self.view.auto_sync_service.is_running():
            return False
        self.view.auto_sync_service.start_sync(tables=tables)
        self.log_info(f"[AutoSync] apply dimulai source={source} tables={len(tables)}")
        return True

    # edited by glg
    def _schedule_auto_sync_apply_recheck(self, delay_ms=500):
        if self._auto_sync_apply_recheck_scheduled:
            return
        self._auto_sync_apply_recheck_scheduled = True

        def _run():
            self._auto_sync_apply_recheck_scheduled = False
            self._run_pending_auto_sync_apply_if_ready()

        QTimer.singleShot(max(50, int(delay_ms or 0)), _run)

    # edited by glg
    def _run_pending_auto_sync_apply_if_ready(self):
        if not self._pending_auto_sync_apply:
            return
        if self.is_any_sync_running():
            self._schedule_auto_sync_apply_recheck(500)
            return
        self._pending_auto_sync_apply = False
        self._start_auto_sync_apply(source="queued_after_busy")

    # edited by glg
    def _request_auto_sync_apply_from_prompt(self):
        if self.is_any_sync_running():
            self._pending_auto_sync_apply = True
            self._schedule_auto_sync_apply_recheck(500)
            if hasattr(self.view, "show_toast"):
                self.view.show_toast(
                    "Update dijadwalkan. Proses akan berjalan setelah aktivitas saat ini selesai.",
                    duration_ms=3200,
                    level="information",
                )
            return False
        return self._start_auto_sync_apply(source="notify_only_prompt")

    # edited by glg
    def _clear_auto_sync_update_snooze(self):
        self._auto_sync_update_snoozed_signature = ""
        self._auto_sync_update_snoozed_until = None

    # edited by glg
    def _set_auto_sync_update_snooze(self, signature):
        self._auto_sync_update_snoozed_signature = str(signature or "").strip()
        reminder_min = self.config_service.get_auto_sync_notify_reminder_minutes()
        self._auto_sync_update_snoozed_until = datetime.now() + timedelta(
            minutes=max(1, int(reminder_min or 10))
        )

    # edited by glg
    def _is_auto_sync_update_snoozed(self, signature):
        signed = str(signature or "").strip()
        if not signed:
            return False
        if signed != str(self._auto_sync_update_snoozed_signature or "").strip():
            return False
        until = self._auto_sync_update_snoozed_until
        if not isinstance(until, datetime):
            return False
        return datetime.now() < until

    # edited by glg
    def _build_auto_sync_prompt_payload(self, payload):
        data = payload if isinstance(payload, dict) else {}
        changed_tables = data.get("changed_tables")
        return {
            "has_update": bool(data.get("has_update")),
            "row_total": int(data.get("row_total") or 0),
            "changed_tables": changed_tables if isinstance(changed_tables, list) else [],
            "signature": str(data.get("signature") or "").strip(),
        }

    # edited by glg
    def _show_auto_sync_update_prompt(self, payload, force=False):
        prompt_data = self._build_auto_sync_prompt_payload(payload)
        if not bool(prompt_data.get("has_update")):
            return False
        signature = str(prompt_data.get("signature") or "").strip()
        if self._auto_sync_update_prompt_open:
            return False
        if not bool(force) and self._is_auto_sync_update_snoozed(signature):
            return False

        message = self._build_auto_sync_prompt_message(
            prompt_data.get("row_total"),
            prompt_data.get("changed_tables"),
        )
        self._auto_sync_update_prompt_open = True
        try:
            proceed = self.ask_confirm(
                "Pembaruan Data Tersedia",
                message,
                view=self.view,
                yes_label="Update Sekarang",
                no_label="Nanti",
                default_no=True,
            )
        finally:
            self._auto_sync_update_prompt_open = False

        if proceed:
            self._clear_auto_sync_update_snooze()
            self._request_auto_sync_apply_from_prompt()
            return True

        self._set_auto_sync_update_snooze(signature)
        if hasattr(self.view, "show_toast"):
            self.view.show_toast(
                "Pembaruan ditunda. Klik 'Sinkron Sekarang' kapan saja untuk menerapkan update.",
                duration_ms=3400,
                level="warning",
            )
        return True

    # edited by glg
    def handle_sync_now_request(self):
        if not self._auto_sync_update_required:
            return False
        payload = self._build_auto_sync_prompt_payload(
            self._auto_sync_update_last_payload
            or {
                "has_update": True,
                "row_total": 0,
                "changed_tables": [],
                "signature": self._auto_sync_update_signature,
            }
        )
        payload["has_update"] = True
        if not payload.get("signature"):
            payload["signature"] = str(self._auto_sync_update_signature or "").strip()
        self._show_auto_sync_update_prompt(payload, force=True)
        return True

    # edited by glg
    def on_auto_sync_update_detected(self, payload):
        data = payload if isinstance(payload, dict) else {}
        if bool(data.get("skip_update_state")):
            return
        error = str(data.get("error") or "").strip()
        if error:
            self.log_warning(f"[AutoSync] probe update gagal: {error}")
            return

        prompt_data = self._build_auto_sync_prompt_payload(data)
        has_update = bool(prompt_data.get("has_update"))

        if not has_update:
            self._auto_sync_update_required = False
            self._auto_sync_update_signature = ""
            self._auto_sync_update_last_payload = {}
            self._clear_auto_sync_update_snooze()
            if hasattr(self.view, "update_sync_status"):
                self.view.update_sync_status(False)
            return

        self._auto_sync_update_required = True
        self._auto_sync_update_signature = str(prompt_data.get("signature") or "").strip()
        self._auto_sync_update_last_payload = dict(prompt_data)
        if hasattr(self.view, "update_sync_status"):
            self.view.update_sync_status(True)

        self._show_auto_sync_update_prompt(prompt_data, force=False)

    # edited by glg
    def _evaluate_export_rollout_runtime_guard(self, source="timer"):
        service = getattr(self, "_fleet_rollout_runtime_guard_service", None)
        if service is None:
            return {"allow": True, "reason": "runtime_guard_service_missing"}
        _, cabang_id = self._resolve_sync_device_context()
        metrics_payload = self._build_event_runtime_metrics_payload(source=source)
        return service.evaluate_export_guard(
            branch_id=cabang_id,
            metrics_payload={
                "error_rate_pct": float(metrics_payload.get("error_rate_pct", 0.0) or 0.0),
                "p95_latency_ms": float(metrics_payload.get("avg_latency_ms", 0.0) or 0.0),
                "sample_count": int(metrics_payload.get("sample_count", 0) or 0),
                "queue_pending": int(metrics_payload.get("pending", 0) or 0),
                "queue_inflight": int(metrics_payload.get("inflight", 0) or 0),
                "source": str(source or "").strip(),
            },
        )

    # edited by glg
    def _resolve_export_delay_with_backpressure(self, requested_delay_ms):
        base_delay = max(300, int(requested_delay_ms or 3000))
        if not self._is_event_backpressure_enabled():
            return int(base_delay)
        service = self._get_event_outbox_service()
        if service is None:
            return int(base_delay)
        try:
            queue_metrics = service.get_queue_metrics() or {}
            runtime_metrics = {}
            runtime_service = getattr(self, "_export_runtime_metrics", None)
            if runtime_service is not None and hasattr(runtime_service, "snapshot"):
                runtime_metrics = runtime_service.snapshot() or {}
            decision = self._event_backpressure_service.evaluate(
                {
                    "pending": int(queue_metrics.get("pending", 0) or 0),
                    "inflight": int(queue_metrics.get("inflight", 0) or 0),
                    "error_rate_pct": float(runtime_metrics.get("error_rate_pct", 0.0) or 0.0),
                    "avg_latency_ms": float(runtime_metrics.get("p95_latency_ms", 0.0) or 0.0),
                }
            )
            recommended_delay = int(decision.get("recommended_poll_delay_ms", base_delay) or base_delay)
            resolved_delay = max(int(base_delay), int(recommended_delay))
            mode = str(decision.get("mode") or "").strip()
            if mode in {"throttle", "critical"}:
                self.log_info(
                    "[EVENT_BACKPRESSURE] "
                    f"mode={mode} pending={int(queue_metrics.get('pending', 0) or 0)} "
                    f"inflight={int(queue_metrics.get('inflight', 0) or 0)} "
                    f"error_rate_pct={float(runtime_metrics.get('error_rate_pct', 0.0) or 0.0):.2f} "
                    f"p95_latency_ms={float(runtime_metrics.get('p95_latency_ms', 0.0) or 0.0):.1f} "
                    f"delay_ms={int(resolved_delay)}"
                )
            return int(resolved_delay)
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
            self.log_warning(f"[EVENT_BACKPRESSURE] fallback delay default: {exc}")
            return int(base_delay)

    # edited by glg
    def _is_high_interaction_view_active(self):
        view = getattr(self, "view", None)
        content_area = getattr(view, "content_area", None)
        if content_area is None or not hasattr(content_area, "currentWidget"):
            return False
        try:
            current_widget = content_area.currentWidget()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            return False
        if current_widget is None:
            return False
        heavy_candidates = [
            getattr(view, "transaksi_view", None),
            getattr(view, "pembatalan_container", None),
            getattr(view, "pembatalan_view", None),
            getattr(view, "history_container", None),
            getattr(view, "history_view", None),
            getattr(view, "settlement_container", None),
            getattr(view, "settlement_view", None),
        ]
        for candidate in heavy_candidates:
            if candidate is not None and current_widget is candidate:
                return True
        return False

    # edited by glg
    def _get_periodic_export_defer_delay_ms(self):
        cfg = read_config() or {}
        delay_ms = self._to_non_negative_int(cfg.get("export_timer_defer_delay_ms", 8000), 8000)
        return max(1500, int(delay_ms))

    # edited by glg
    def _schedule_periodic_export_retry(self):
        if bool(self._periodic_export_defer_scheduled):
            return
        delay_ms = self._get_periodic_export_defer_delay_ms()
        self._periodic_export_defer_scheduled = True

        def _retry():
            self._periodic_export_defer_scheduled = False
            self._request_periodic_export_async()

        QTimer.singleShot(delay_ms, _retry)

    # edited by glg
    def _should_defer_periodic_export_for_ui_activity(self):
        cfg = read_config() or {}
        defer_enabled = int(cfg.get("export_timer_defer_when_ui_active", 1) or 1) == 1
        if not defer_enabled:
            self._periodic_export_defer_attempt = 0
            return False
        if not self._is_high_interaction_view_active():
            self._periodic_export_defer_attempt = 0
            return False
        max_attempt = self._to_non_negative_int(cfg.get("export_timer_defer_max_attempt", 2), 2)
        self._periodic_export_defer_attempt += 1
        if int(self._periodic_export_defer_attempt) > int(max_attempt):
            # Tetap jalankan export setelah batas percobaan agar tidak starve.
            self._periodic_export_defer_attempt = 0
            return False
        return True

    def init_export_json_batch(self):
        interval_min = self.config_service.get_export_json_interval_minutes()
        self.view.export_service = TransactionExportService()
        self.view.export_upload_service = ExportUploadService()
        self.view.export_cycle_service = ExportCycleService(
            export_service=self.view.export_service,
            upload_service=self.view.export_upload_service,
        )
        self.view.export_json_timer = QTimer(self.view)
        self.view.export_json_timer.timeout.connect(
            self._request_periodic_export_async
        )
        self.view.export_json_timer.start(interval_min * 60 * 1000)
        # edited by glg
        self.request_export_replay_on_startup()

    # edited by glg
    def _request_periodic_export_async(self):
        if self._should_defer_periodic_export_for_ui_activity():
            if int(self._periodic_export_defer_attempt or 0) == 1:
                self.log_info("[EXPORT] Timer export ditunda sementara karena user aktif di halaman transaksi.")
            self._schedule_periodic_export_retry()
            return False
        self._periodic_export_defer_attempt = 0
        with self._export_dispatch_lock:
            if self._export_worker_inflight:
                return False
            self._export_worker_inflight = True
        worker_context = {
            "source": "timer",
            "upload_limit_override": None,
            "requeue_failed_transient": False,
            "pending_dispatch": False,
            "needs_sync": False,
            "needs_settlement": False,
            "needs_startup": False,
        }
        if not self._start_export_worker_thread(worker_context):
            self._schedule_export_recheck(3000)
            return False
        return True

    # edited by glg
    def _start_export_worker_thread(self, worker_context):
        context = dict(worker_context or {})
        source = str(context.get("source") or "timer")
        upload_limit_override = context.get("upload_limit_override")
        requeue_failed_transient = bool(context.get("requeue_failed_transient"))

        def _worker():
            result = self._export_json_batch(
                source=source,
                upload_limit_override=upload_limit_override,
                requeue_failed_transient=requeue_failed_transient,
            )
            self._on_export_worker_finished(context, result)

        try:
            threading.Thread(
                target=_worker,
                daemon=True,
                name=f"export-cycle-{source}",
            ).start()
            return True
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
            with self._export_dispatch_lock:
                self._export_worker_inflight = False
            self.log_warning(f"[EXPORT] Gagal start worker async: {exc}")
            return False

    # edited by glg
    def _on_export_worker_finished(self, worker_context, result):
        context = dict(worker_context or {})
        export_result = result if isinstance(result, dict) else {}
        with self._export_dispatch_lock:
            self._export_worker_inflight = False

        needs_sync = bool(context.get("needs_sync"))
        needs_settlement = bool(context.get("needs_settlement"))
        needs_startup = bool(context.get("needs_startup"))
        is_pending_dispatch = bool(context.get("pending_dispatch"))

        if is_pending_dispatch:
            if not export_result.get("ran") and export_result.get("skipped") in {"already_running", "sync_running"}:
                with self._export_dispatch_lock:
                    if needs_sync:
                        self._pending_export_after_sync = True
                    if needs_settlement:
                        self._pending_export_after_settlement = True
                    if needs_startup:
                        self._pending_export_startup_replay = True
                self._schedule_export_recheck(3000)
                return
            if needs_settlement and not bool(export_result.get("ran")):
                # edited by glg
                # Jaga urutan: transaksi via export harus dicoba lagi sebelum direct settlement dilanjutkan.
                with self._export_dispatch_lock:
                    self._pending_export_after_settlement = True
                self.log_warning(
                    "[EXPORT][SETTLEMENT] Export belum berjalan sukses; direct settlement tetap ditahan."
                )
                self._schedule_export_recheck(5000)
                return
            # edited by glg
            # Sequencing settlement delivery:
            # direct settlement diproses setelah export-trigger settlement selesai berjalan.
            if needs_settlement and self._has_pending_settlement_direct_after_export():
                self._flush_pending_settlement_direct_after_export(trigger_reason="after_export")

        # edited by glg
        # Jalankan evaluasi antrean setelah worker selesai agar trigger yang masuk saat inflight tidak hilang.
        self._run_pending_export_if_ready()

    # edited by glg
    def _schedule_export_recheck(self, delay_ms=3000):
        resolved_delay_ms = self._resolve_export_delay_with_backpressure(delay_ms)
        try:
            delay_sec = max(0.05, float(resolved_delay_ms or 0) / 1000.0)
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            delay_sec = 3.0
        with self._export_dispatch_lock:
            if self._export_retry_scheduled:
                return
            self._export_retry_scheduled = True

        def _run():
            with self._export_dispatch_lock:
                self._export_retry_scheduled = False
            self._run_pending_export_if_ready()

        timer = threading.Timer(delay_sec, _run)
        timer.daemon = True
        timer.start()

    # edited by glg
    def _dispatch_startup_export_gate_probe_async(self):
        with self._export_dispatch_lock:
            if self._export_startup_gate_probe_inflight:
                return False
            self._export_startup_gate_probe_inflight = True
        started_event = threading.Event()

        def _worker():
            started_event.set()
            try:
                gate = self._can_run_network_operation(
                    NetworkOrchestratorService.PROCESS_EXPORT,
                    force_probe=True,
                    log_block=False,
                )
                if not bool(gate.get("allow")):
                    self.log_info(
                        "[EXPORT] Replay startup ditunda: "
                        f"state={gate.get('state')} reason={gate.get('reason')}"
                    )
                    self._schedule_export_recheck(5000)
                    return
                self._run_pending_export_if_ready()
            finally:
                with self._export_dispatch_lock:
                    self._export_startup_gate_probe_inflight = False

        key = f"dashboard-export-startup-gate:{id(self)}"
        future = submit_ui_periodic_task_keyed(key, _worker)
        if future is None:
            with self._export_dispatch_lock:
                self._export_startup_gate_probe_inflight = False
            return False

        # edited by glg
        # Fail-safe antrean penuh: task bisa ter-drop sebelum worker berjalan.
        # Lepaskan inflight agar replay export tetap bisa retrigger.
        def _on_done(_):
            if started_event.is_set():
                return
            with self._export_dispatch_lock:
                self._export_startup_gate_probe_inflight = False

        try:
            future.add_done_callback(_on_done)
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            if not started_event.is_set():
                with self._export_dispatch_lock:
                    self._export_startup_gate_probe_inflight = False
        return True

    # edited by glg
    def _run_pending_export_if_ready(self):
        with self._export_dispatch_lock:
            needs_sync = bool(self._pending_export_after_sync)
            needs_settlement = bool(self._pending_export_after_settlement)
            needs_startup = bool(self._pending_export_startup_replay)
            worker_inflight = bool(self._export_worker_inflight)
        if worker_inflight:
            return
        has_pending_settlement_direct = self._has_pending_settlement_direct_after_export()
        if not needs_sync and not needs_settlement and not needs_startup:
            if has_pending_settlement_direct:
                self._flush_pending_settlement_direct_after_export(trigger_reason="queue_only")
            return
        # edited by glg
        # Hindari probe jaringan blocking di UI thread.
        if needs_startup and threading.current_thread() is threading.main_thread():
            if self.config_service.is_export_replay_on_startup_require_online():
                self._dispatch_startup_export_gate_probe_async()
                return
        if needs_startup and self.config_service.is_export_replay_on_startup_require_online():
            gate = self._can_run_network_operation(
                NetworkOrchestratorService.PROCESS_EXPORT,
                force_probe=True,
                log_block=False,
            )
            if not bool(gate.get("allow")):
                self.log_info(
                    "[EXPORT] Replay startup ditunda: "
                    f"state={gate.get('state')} reason={gate.get('reason')}"
                )
                self._schedule_export_recheck(5000)
                return
        if self.is_any_sync_running():
            self._schedule_export_recheck(3000)
            return
        source = "startup_replay" if needs_startup else ("settlement" if needs_settlement else "after_sync")
        upload_limit_override = None
        requeue_failed_transient = False
        if needs_startup:
            upload_limit_override = self.config_service.get_export_replay_on_startup_upload_batch_limit()
            requeue_failed_transient = self.config_service.is_export_requeue_failed_transient_on_startup_enabled()
        worker_context = {
            "source": source,
            "upload_limit_override": upload_limit_override,
            "requeue_failed_transient": bool(requeue_failed_transient),
            "pending_dispatch": True,
            "needs_sync": bool(needs_sync),
            "needs_settlement": bool(needs_settlement),
            "needs_startup": bool(needs_startup),
        }
        with self._export_dispatch_lock:
            if self._export_worker_inflight:
                return
            self._pending_export_after_sync = False
            self._pending_export_after_settlement = False
            self._pending_export_startup_replay = False
            self._export_worker_inflight = True
        if not self._start_export_worker_thread(worker_context):
            with self._export_dispatch_lock:
                self._export_worker_inflight = False
                if needs_sync:
                    self._pending_export_after_sync = True
                if needs_settlement:
                    self._pending_export_after_settlement = True
                if needs_startup:
                    self._pending_export_startup_replay = True
            self._schedule_export_recheck(3000)
        return

    # edited by glg
    def request_export_after_settlement(self, settlement_result=None):
        delivery_mode = "dual"
        try:
            delivery_mode = str(
                self.config_service.get_settlement_delivery_mode()
            ).strip().lower()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            delivery_mode = "dual"

        direct_candidate = bool(delivery_mode != "export_only")
        counter = self._normalize_settlement_counter(settlement_result)
        direct_triggered = False
        if direct_candidate:
            # edited by glg
            # Kompatibilitas perilaku existing:
            # direct trigger dicoba segera, queue dipakai sebagai fallback saat trigger gagal.
            direct_triggered = bool(self._trigger_settlement_direct_async(settlement_result))
            if direct_triggered:
                if counter:
                    self.log_info(f"[SettlementDirect] Trigger langsung berhasil: counter={counter}")
                else:
                    self.log_info("[SettlementDirect] Trigger langsung berhasil.")
            else:
                self._enqueue_settlement_direct_after_export(settlement_result)
                if counter:
                    self.log_info(
                        f"[SettlementDirect] Queued menunggu export terlebih dahulu: counter={counter}"
                    )
                else:
                    self.log_info("[SettlementDirect] Queued menunggu export terlebih dahulu.")
        else:
            self.log_info("[SettlementDirect] Trigger dilewati: settlement_delivery_mode=export_only")

        if not self.config_service.is_export_on_settlement_enabled():
            self.log_info("[EXPORT][SETTLEMENT] Trigger dilewati: config export_on_settlement=0")
            if direct_candidate and not direct_triggered:
                self._flush_pending_settlement_direct_after_export(trigger_reason="export_disabled")
            return False
        with self._export_dispatch_lock:
            self._pending_export_after_settlement = True
        if self.config_service.is_settlement_direct_only_mode():
            if counter:
                self.log_info(
                    "[EXPORT][SETTLEMENT] Trigger diterima (direct_only) "
                    f"counter={counter}; settlement table di-skip, tabel lain tetap diexport."
                )
            else:
                self.log_info(
                    "[EXPORT][SETTLEMENT] Trigger diterima (direct_only); "
                    "settlement table di-skip, tabel lain tetap diexport."
                )
        elif counter:
            self.log_info(f"[EXPORT][SETTLEMENT] Trigger diterima untuk counter={counter}")
        else:
            self.log_info("[EXPORT][SETTLEMENT] Trigger diterima")
        # edited by glg
        # Trigger segera untuk meminimalkan delay setelah settlement selesai.
        try:
            self._run_pending_export_if_ready()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
            self.log_warning(f"[EXPORT][SETTLEMENT] Trigger langsung gagal: {exc}")
        # Jadwalkan fallback async agar tetap retry bila guard/export masih sibuk.
        self._schedule_export_recheck(10)
        return True

    # edited by glg
    def _normalize_settlement_counter(self, settlement_result=None):
        if not isinstance(settlement_result, dict):
            return ""
        return str(settlement_result.get("counter") or "").strip()

    # edited by glg
    def _snapshot_settlement_result(self, settlement_result=None):
        result = settlement_result if isinstance(settlement_result, dict) else {}
        transaksi_ids = result.get("transaksi_ids")
        if not isinstance(transaksi_ids, list):
            transaksi_ids = []
        normalized_ids = []
        seen = set()
        for raw_id in transaksi_ids:
            try:
                parsed = int(raw_id)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
                continue
            if parsed <= 0 or parsed in seen:
                continue
            seen.add(parsed)
            normalized_ids.append(parsed)
        return {
            "counter": str(result.get("counter") or "").strip(),
            "transaksi_ids": normalized_ids,
        }

    # edited by glg
    # Simpan payload settlement ke antrean agar direct push dieksekusi setelah export settlement-trigger.
    def _enqueue_settlement_direct_after_export(self, settlement_result=None):
        result_payload = settlement_result if isinstance(settlement_result, dict) else {}
        counter = self._normalize_settlement_counter(result_payload)
        if not counter:
            return False
        with self._settlement_delivery_lock:
            self._pending_settlement_direct_after_export[counter] = {
                "settlement_result": dict(result_payload),
                "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            }
        # edited by glg
        # Tandai unsent dari awal agar close-guard tetap melindungi settlement yang belum terkirim.
        self._mark_settlement_unsent(
            settlement_result=result_payload,
            error="queued_after_export",
            status_code=0,
            reason_code="queued_after_export",
            retryable=True,
        )
        return True

    # edited by glg
    def _has_pending_settlement_direct_after_export(self):
        with self._settlement_delivery_lock:
            return bool(self._pending_settlement_direct_after_export)

    # edited by glg
    def _peek_pending_settlement_direct_after_export(self):
        with self._settlement_delivery_lock:
            if not self._pending_settlement_direct_after_export:
                return "", {}
            ordered = sorted(
                self._pending_settlement_direct_after_export.items(),
                key=lambda item: str((item[1] or {}).get("updated_at") or ""),
            )
            counter, payload = ordered[0]
            result_payload = {}
            if isinstance(payload, dict):
                result_payload = payload.get("settlement_result")
            if not isinstance(result_payload, dict):
                result_payload = {}
            return str(counter or "").strip(), dict(result_payload)

    # edited by glg
    def _remove_pending_settlement_direct_after_export(self, counter):
        key = str(counter or "").strip()
        if not key:
            return False
        with self._settlement_delivery_lock:
            return self._pending_settlement_direct_after_export.pop(key, None) is not None

    # edited by glg
    def _is_pending_settlement_direct_after_export_counter(self, counter):
        key = str(counter or "").strip()
        if not key:
            return False
        with self._settlement_delivery_lock:
            return key in self._pending_settlement_direct_after_export

    # edited by glg
    # Flush antrean settlement direct setelah export. Tetap menggunakan gate orchestrator yang sama.
    def _flush_pending_settlement_direct_after_export(self, trigger_reason=""):
        try:
            mode = str(self.config_service.get_settlement_delivery_mode()).strip().lower()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            mode = "dual"

        if mode == "export_only":
            with self._settlement_delivery_lock:
                counters = list(self._pending_settlement_direct_after_export.keys())
                self._pending_settlement_direct_after_export.clear()
            for counter in counters:
                self._clear_settlement_unsent(counter)
            if counters:
                self.log_info(
                    "[SettlementDirect] Queue dibersihkan karena mode export_only "
                    f"(count={len(counters)})."
                )
            return {
                "triggered": 0,
                "remaining": 0,
                "reason": "export_only",
            }

        if self._settlement_direct_inflight:
            remaining_inflight = 0
            with self._settlement_delivery_lock:
                remaining_inflight = int(len(self._pending_settlement_direct_after_export))
            self._schedule_export_recheck(1500)
            return {
                "triggered": 0,
                "remaining": remaining_inflight,
                "reason": "inflight",
            }

        counter, payload = self._peek_pending_settlement_direct_after_export()
        if not counter:
            return {
                "triggered": 0,
                "remaining": 0,
                "reason": "empty",
            }

        triggered = bool(self._trigger_settlement_direct_async(payload))
        if triggered:
            self._remove_pending_settlement_direct_after_export(counter)
        remaining = 0
        with self._settlement_delivery_lock:
            remaining = int(len(self._pending_settlement_direct_after_export))

        if remaining > 0:
            self._schedule_export_recheck(2000 if triggered else 7000)

        self.log_info(
            "[SettlementDirect] Flush queue "
            f"reason={str(trigger_reason or '-')} counter={counter} triggered={int(triggered)} "
            f"remaining={remaining}"
        )
        return {
            "triggered": int(triggered),
            "remaining": remaining,
            "reason": str(trigger_reason or "-"),
        }

    # edited by glg
    def _mark_settlement_unsent(self, settlement_result=None, error="", status_code=0, reason_code="", retryable=True):
        snapshot = self._snapshot_settlement_result(settlement_result)
        counter = str(snapshot.get("counter") or "").strip()
        if not counter:
            return False
        payload = {
            "settlement_result": snapshot,
            "error": str(error or "").strip(),
            "status_code": int(status_code or 0),
            "reason_code": str(reason_code or "").strip(),
            "retryable": bool(retryable),
            "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        }
        with self._settlement_delivery_lock:
            self._settlement_unsent_map[counter] = payload
        return True

    # edited by glg
    def _clear_settlement_unsent(self, counter):
        key = str(counter or "").strip()
        if not key:
            return False
        with self._settlement_delivery_lock:
            return self._settlement_unsent_map.pop(key, None) is not None

    # edited by glg
    def get_settlement_delivery_guard_state(self):
        with self._settlement_delivery_lock:
            pending_items = [dict(v) for v in self._settlement_unsent_map.values()]
        pending_items.sort(
            key=lambda item: str(item.get("updated_at") or ""),
            reverse=True,
        )
        return {
            "inflight": bool(self._settlement_direct_inflight),
            "unsent_count": int(len(pending_items)),
            "unsent_items": pending_items,
            "blocking": bool(self._settlement_direct_inflight or pending_items),
        }

    # edited by glg
    def retry_pending_settlement_direct_delivery(self, max_items=3):
        state_before = self.get_settlement_delivery_guard_state()
        if not state_before.get("blocking"):
            return {
                "attempted": 0,
                "success": 0,
                "failed": 0,
                "inflight": False,
                "remaining": 0,
            }
        if self._settlement_direct_inflight:
            return {
                "attempted": 0,
                "success": 0,
                "failed": 0,
                "inflight": True,
                "remaining": int(state_before.get("unsent_count") or 0),
            }

        gate = self._can_run_network_operation(
            NetworkOrchestratorService.PROCESS_SETTLEMENT_DIRECT,
            force_probe=True,
            log_block=False,
        )
        if not bool(gate.get("allow")):
            return {
                "attempted": 0,
                "success": 0,
                "failed": 0,
                "inflight": False,
                "remaining": int(state_before.get("unsent_count") or 0),
            }

        try:
            service = self._get_settlement_direct_service()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
            self.log_warning(f"[SettlementDirect] Retry close-guard: service init gagal: {exc}")
            return {
                "attempted": 0,
                "success": 0,
                "failed": int(state_before.get("unsent_count") or 0),
                "inflight": False,
                "remaining": int(state_before.get("unsent_count") or 0),
            }

        if not service.enabled_getter():
            return {
                "attempted": 0,
                "success": 0,
                "failed": int(state_before.get("unsent_count") or 0),
                "inflight": False,
                "remaining": int(state_before.get("unsent_count") or 0),
            }

        limit = max(1, int(max_items or 1))
        attempted = 0
        success = 0
        failed = 0

        candidates = state_before.get("unsent_items") or []
        for item in candidates[:limit]:
            payload = item.get("settlement_result") if isinstance(item, dict) else {}
            payload = payload if isinstance(payload, dict) else {}
            counter = str(payload.get("counter") or "").strip()
            if not counter:
                continue
            # edited by glg
            # Jaga urutan kirim:
            # settlement yang masih antre menunggu export tidak boleh dipaksa direct saat close-guard retry.
            reason_code = str((item or {}).get("reason_code") or "").strip().lower()
            if reason_code == "queued_after_export" and self._is_pending_settlement_direct_after_export_counter(counter):
                continue
            attempted += 1
            try:
                result = service.send_settlement(payload)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
                failed += 1
                self._mark_settlement_unsent(
                    settlement_result=payload,
                    error=str(exc),
                    status_code=0,
                    reason_code="exception",
                    retryable=True,
                )
                continue

            if not isinstance(result, dict):
                failed += 1
                self._mark_settlement_unsent(
                    settlement_result=payload,
                    error="invalid_response",
                    status_code=0,
                    reason_code="invalid_response",
                    retryable=True,
                )
                continue

            if bool(result.get("ok")):
                success += 1
                self._clear_settlement_unsent(counter)
                continue

            failed += 1
            self._mark_settlement_unsent(
                settlement_result=payload,
                error=str(result.get("error") or "unknown_error"),
                status_code=int(result.get("status_code") or 0),
                reason_code=str(result.get("reason_code") or ""),
                retryable=bool(result.get("retryable")),
            )

        state_after = self.get_settlement_delivery_guard_state()
        return {
            "attempted": int(attempted),
            "success": int(success),
            "failed": int(failed),
            "inflight": bool(state_after.get("inflight")),
            "remaining": int(state_after.get("unsent_count") or 0),
        }

    # edited by glg
    def _get_settlement_direct_service(self):
        service = getattr(self, "_settlement_direct_service", None)
        if service is not None:
            return service
        self._settlement_direct_service = SettlementDirectService()
        return self._settlement_direct_service

    # edited by glg
    def _trigger_settlement_direct_async(self, settlement_result=None):
        try:
            mode = str(self.config_service.get_settlement_delivery_mode()).strip().lower()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
            mode = "dual"
        if mode == "export_only":
            self.log_info("[SettlementDirect] Skip trigger: settlement_delivery_mode=export_only")
            return False

        try:
            service = self._get_settlement_direct_service()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
            self.log_warning(f"[SettlementDirect] Gagal inisialisasi service: {exc}")
            return False

        if not service.enabled_getter():
            self.log_info("[SettlementDirect] Skip trigger: settlement_direct_enabled=0")
            return False

        with self._settlement_delivery_lock:
            if self._settlement_direct_inflight:
                self.log_info("[SettlementDirect] Skip trigger: push sebelumnya masih berjalan.")
                return False
            self._settlement_direct_inflight = True

        counter = self._normalize_settlement_counter(settlement_result)
        # edited by glg
        # Simpan kandidat unsent terlebih dahulu sampai server mengembalikan sukses.
        self._mark_settlement_unsent(
            settlement_result=settlement_result,
            error="pending_send",
            status_code=0,
            reason_code="pending_send",
            retryable=True,
        )

        def _worker():
            try:
                # edited by glg
                # Probe jaringan dipindah ke worker agar callback settlement tidak blocking UI.
                gate = self._can_run_network_operation(
                    NetworkOrchestratorService.PROCESS_SETTLEMENT_DIRECT,
                    force_probe=True,
                    log_block=False,
                )
                if not bool(gate.get("allow")):
                    error_msg = (
                        "network_blocked:"
                        f"{gate.get('state')}:{gate.get('reason')}"
                    )
                    self._mark_settlement_unsent(
                        settlement_result=settlement_result,
                        error=error_msg,
                        status_code=0,
                        reason_code="network_blocked",
                        retryable=True,
                    )
                    self.log_info(
                        "[SettlementDirect] Ditunda oleh orchestrator "
                        f"counter={counter or '-'} state={gate.get('state')} reason={gate.get('reason')}"
                    )
                    return
                result = service.send_settlement(settlement_result or {})
                if not isinstance(result, dict):
                    self.log_warning("[SettlementDirect] Response service tidak valid.")
                    self._mark_settlement_unsent(
                        settlement_result=settlement_result,
                        error="invalid_response",
                        status_code=0,
                        reason_code="invalid_response",
                        retryable=True,
                    )
                    return
                if not result.get("attempted"):
                    skip_reason = str(result.get("skipped") or "unknown")
                    err = str(result.get("error") or "").strip()
                    self.log_info(
                        f"[SettlementDirect] Tidak dikirim (reason={skip_reason})"
                        + (f" detail={err}" if err else "")
                    )
                    self._mark_settlement_unsent(
                        settlement_result=settlement_result,
                        error=err or skip_reason,
                        status_code=int(result.get("status_code") or 0),
                        reason_code=skip_reason,
                        retryable=True,
                    )
                    return
                status_code = int(result.get("status_code") or 0)
                idem = str(result.get("idempotency_key") or "").strip()
                if result.get("ok"):
                    self.log_info(
                        "[SettlementDirect] BERHASIL "
                        f"counter={counter or '-'} status={status_code} idem={idem}"
                    )
                    self._clear_settlement_unsent(counter)
                    return
                err = str(result.get("error") or "unknown_error").strip()
                retryable = 1 if bool(result.get("retryable")) else 0
                self.log_warning(
                    "[SettlementDirect] GAGAL "
                    f"counter={counter or '-'} status={status_code} retryable={retryable} "
                    f"idem={idem} error={err}"
                )
                self._mark_settlement_unsent(
                    settlement_result=settlement_result,
                    error=err,
                    status_code=status_code,
                    reason_code=str(result.get("reason_code") or ""),
                    retryable=bool(result.get("retryable")),
                )
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
                self.log_warning(f"[SettlementDirect] Exception saat push async: {exc}")
                self._mark_settlement_unsent(
                    settlement_result=settlement_result,
                    error=str(exc),
                    status_code=0,
                    reason_code="exception",
                    retryable=True,
                )
            finally:
                with self._settlement_delivery_lock:
                    self._settlement_direct_inflight = False

        try:
            threading.Thread(target=_worker, daemon=True, name="settlement-direct-push").start()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as exc:
            with self._settlement_delivery_lock:
                self._settlement_direct_inflight = False
            self.log_warning(f"[SettlementDirect] Gagal start worker async: {exc}")
            self._mark_settlement_unsent(
                settlement_result=settlement_result,
                error=str(exc),
                status_code=0,
                reason_code="worker_start_failed",
                retryable=True,
            )
            return False
        self.log_info(f"[SettlementDirect] Trigger async diterima untuk counter={counter or '-'}")
        return True

    # edited by glg
    def request_export_replay_on_startup(self):
        if not self.config_service.is_export_replay_on_startup_enabled():
            return False
        with self._export_dispatch_lock:
            self._pending_export_startup_replay = True
        delay_ms = self.config_service.get_export_replay_on_startup_delay_ms()
        self._schedule_export_recheck(delay_ms)
        return True

    def is_any_sync_running(self):
        try:
            if hasattr(self.view, "auto_sync_service") and self.view.auto_sync_service:
                if self.view.auto_sync_service.is_running():
                    return True
            if hasattr(self.view, "sync_flow_controller") and self.view.sync_flow_controller:
                if self.view.sync_flow_controller.is_running():
                    return True
            if hasattr(self.app_controller, "sync_flow_bg_controller") and self.app_controller.sync_flow_bg_controller:
                if self.app_controller.sync_flow_bg_controller.is_running():
                    return True
            if hasattr(self.app_controller, "sinkron_controller") and self.app_controller.sinkron_controller:
                thread = getattr(self.app_controller.sinkron_controller, "thread", None)
                if thread and thread.isRunning():
                    return True
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
            self.log_warning(f"is_any_sync_running error: {e}")
            return True
        return False

    # edited by glg
    def stop_all_sync(self, force=False, wait_timeout_ms=None):
        cfg = read_config() or {}
        if wait_timeout_ms is None:
            try:
                wait_timeout_ms = int(cfg.get("sync_stop_wait_timeout_ms") or 2000)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError):
                wait_timeout_ms = 2000

        if hasattr(self.view, "auto_sync_service") and self.view.auto_sync_service:
            try:
                self.view.auto_sync_service.stop_sync(force=force, wait_timeout_ms=wait_timeout_ms)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
                self.log_warning(f"Gagal stop auto_sync_service: {e}")

        if hasattr(self.view, "sync_flow_controller") and self.view.sync_flow_controller:
            try:
                self.view.sync_flow_controller.stop(force=force, wait_timeout_ms=wait_timeout_ms)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
                self.log_warning(f"Gagal stop sync_flow_controller (manual): {e}")

        if hasattr(self.app_controller, "sync_flow_bg_controller") and self.app_controller.sync_flow_bg_controller:
            try:
                self.app_controller.sync_flow_bg_controller.stop(force=force, wait_timeout_ms=wait_timeout_ms)
            except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
                self.log_warning(f"Gagal stop sync_flow_bg_controller: {e}")

    def request_manual_sync(self):
        if self.is_any_sync_running():
            self._pending_manual_sync = True
            return "queued"
        return "start"

    def _auto_sync_master(self):
        try:
            if ExportCycleService.is_running():
                return
            if self._pending_manual_sync or get_bool("first_sync_pending", False):
                return
            if self.is_any_sync_running():
                return
            if not hasattr(self.view, "auto_sync_tables") or not self.view.auto_sync_tables:
                return
            self._run_auto_sync_probe_async()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
            self.log_warning(f"Auto-sync master gagal: {e}")
            return

    def _on_auto_sync_completed(self, total_rows):
        try:
            self._auto_sync_update_required = False
            self._auto_sync_update_signature = ""
            self._auto_sync_update_last_payload = {}
            self._clear_auto_sync_update_snooze()
            if hasattr(self.view, "update_sync_status"):
                self.view.update_sync_status(False)
            if hasattr(self.view, "transaksi_controller") and self.view.transaksi_controller:
                self.view.transaksi_controller.refresh_master_data()
            self.load_header_logo(force_refresh=True)
            if hasattr(self.view, "printer_settings_controller") and self.view.printer_settings_controller:
                self.view.printer_settings_controller.refresh_company_logo_assets(force_refresh=True)
            # edited by glg
            # Jalankan apply yang sebelumnya ditunda karena ada aktivitas sinkronisasi lain.
            self._schedule_auto_sync_apply_recheck(200)
            if self._pending_manual_sync and self.app_controller and self.app_controller.dashboard_window:
                self._pending_manual_sync = False
                QTimer.singleShot(0, self.app_controller.dashboard_window.buka_menu_sinkron_data)
            # edited by glg
            self._run_pending_export_if_ready()
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
            self.log_warning(f"Gagal refresh master data: {e}")
            return

    def export_on_close(self, source="shutdown"):
        try:
            if self.config_service.is_export_on_close_enabled():
                cycle_service = ExportCycleService()
                result = cycle_service.run_cycle(source=source)
                if not result.get("ran"):
                    self.log_info(
                        "Export JSON on close dilewati: "
                        f"guard aktif dari sumber {result.get('guard_source') or 'unknown'}"
                    )
                    return
                self.log_info(
                    "Export JSON on close: "
                    f"{int(result.get('exported_rows') or 0)} rows | "
                    f"upload={int(result.get('uploaded') or 0)}"
                )
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError, LookupError, ArithmeticError, ImportError) as e:
            self.log_warning(f"Export JSON on close gagal: {e}")
