﻿# edited by glg
import json
import os
import re
import sqlite3
import tempfile
from datetime import datetime, timedelta
import hashlib
from typing import Any, Callable, Dict, List, Optional, cast
import logging
import base64
import binascii

from pypos.core.utils.config_utils import read_config, get_current_server_hash, normalize_base_url
from pypos.core.utils.device_utils import get_device_id, get_active_device_info
from pypos.core.utils.path_utils import get_app_data_dir, get_db_path
from pypos.core.utils.sql_query_builder import build_pragma_table_info_sql
from pypos.core.database.schema_migrator import run_schema_migrations_once
from pypos.modules.sinkronisasi.services.transaction_export_db_service import (
    TransactionExportDbService,
)
from pypos.modules.sinkronisasi.services.transaction_export_legacy_defaults_service import (
    TransactionExportLegacyDefaultsService,
)
from pypos.modules.sinkronisasi.services.transaction_export_settlement_payment_utils import (
    TransactionExportSettlementPaymentUtils,
)
from pypos.modules.sinkronisasi.services.transaction_export_device_context_service import (
    TransactionExportDeviceContextService,
)
from pypos.modules.sinkronisasi.services.transaction_export_payload_compression_service import (
    TransactionExportPayloadCompressionService,
)
from pypos.modules.sinkronisasi.services.transaction_export_settlement_history_payload_service import (
    TransactionExportSettlementHistoryPayloadService,
)
from pypos.modules.sinkronisasi.services.transaction_export_value_utils import TransactionExportValueUtils
from pypos.modules.sinkronisasi.services.transaction_export_hotspot_use_case_service import (
    TransactionExportHotspotUseCaseService,
)
from pypos.modules.sinkronisasi.services.transaction_export_batch_use_case_service import (
    TransactionExportBatchUseCaseService,
)
from pypos.modules.sinkronisasi.services.transaction_export_snapshot_query_service import (
    TransactionExportSnapshotQueryService,
)
from pypos.modules.sinkronisasi.services.transaction_export_legacy_payment_payload_service import (
    TransactionExportLegacyPaymentPayloadService,
)

LOGGER = logging.getLogger(__name__)
_EMPTY_PHP_ARRAY_B64 = "YTowOnt9"


class TransactionExportService:
    # edited by glg
    def __init__(self, db_path: str = None):
        self.db_path = str(db_path or get_db_path())
        self._db_service = TransactionExportDbService(self.db_path)
        self.transaction_export_legacy_defaults_service = TransactionExportLegacyDefaultsService()
        self.transaction_export_settlement_payment_utils = TransactionExportSettlementPaymentUtils()
        # edited by glg
        # Dekomposisi hotspot monolitik: context device dipisah ke service terdedikasi.
        self.transaction_export_device_context_service = TransactionExportDeviceContextService(
            connect_fn=self._connect,
            as_positive_int_fn=self._as_positive_int,
            logger=LOGGER,
        )
        # edited by glg
        # Dekomposisi hotspot monolitik: kompresi payload dipisah ke service terdedikasi.
        self.transaction_export_payload_compression_service = (
            TransactionExportPayloadCompressionService(logger=LOGGER)
        )
        # edited by glg
        # Dekomposisi hotspot monolitik: payload settlement legacy dipisah ke service.
        self.transaction_export_settlement_history_payload_service = (
            TransactionExportSettlementHistoryPayloadService(export_service=self)
        )
        # edited by glg
        # Dekomposisi hotspot monolitik:
        # - normalisasi row export
        # - backfill row transaksi/detail
        self.transaction_export_hotspot_use_case_service = TransactionExportHotspotUseCaseService(
            export_service=self,
            logger=LOGGER,
        )
        # edited by glg
        # Dekomposisi hotspot monolitik: orkestrasi export batch dipindah ke use-case service.
        self.transaction_export_batch_use_case_service = TransactionExportBatchUseCaseService(
            export_service=self,
            logger=LOGGER,
        )
        # edited by glg
        # Dekomposisi hotspot monolitik: query snapshot tabel dipisah ke service.
        self.transaction_export_snapshot_query_service = TransactionExportSnapshotQueryService(
            export_service=self
        )
        # edited by glg
        # Dekomposisi hotspot monolitik: builder payload payment legacy dipisah ke service.
        self.transaction_export_legacy_payment_payload_service = (
            TransactionExportLegacyPaymentPayloadService(export_service=self)
        )

    # edited by glg
    def _get_legacy_defaults_service(self) -> TransactionExportLegacyDefaultsService:
        service = getattr(self, "transaction_export_legacy_defaults_service", None)
        if service is None:
            service = TransactionExportLegacyDefaultsService()
            self.transaction_export_legacy_defaults_service = service
        return service

    # edited by glg
    def _get_settlement_payment_utils(self) -> TransactionExportSettlementPaymentUtils:
        service = getattr(self, "transaction_export_settlement_payment_utils", None)
        if service is None:
            service = TransactionExportSettlementPaymentUtils()
            self.transaction_export_settlement_payment_utils = service
        return service

    # edited by glg
    def _get_settlement_history_payload_service(self) -> TransactionExportSettlementHistoryPayloadService:
        service = getattr(self, "transaction_export_settlement_history_payload_service", None)
        if service is None:
            service = TransactionExportSettlementHistoryPayloadService(export_service=self)
            self.transaction_export_settlement_history_payload_service = service
        return service

    # edited by glg
    def _get_legacy_payment_payload_service(self) -> TransactionExportLegacyPaymentPayloadService:
        service = getattr(self, "transaction_export_legacy_payment_payload_service", None)
        if service is None:
            service = TransactionExportLegacyPaymentPayloadService(export_service=self)
            self.transaction_export_legacy_payment_payload_service = service
        return service

    # edited by glg
    def _get_db_service(self) -> TransactionExportDbService:
        service = getattr(self, "_db_service", None)
        if service is None:
            service = TransactionExportDbService(self.db_path)
            self._db_service = service
        if str(service.db_path or "") != str(self.db_path or ""):
            service.db_path = str(self.db_path or "")
        return service

    # edited by glg
    def _get_hotspot_use_case_service(self) -> TransactionExportHotspotUseCaseService:
        service = getattr(self, "transaction_export_hotspot_use_case_service", None)
        if service is None:
            service = TransactionExportHotspotUseCaseService(export_service=self, logger=LOGGER)
            self.transaction_export_hotspot_use_case_service = service
        return service

    # edited by glg
    def _get_batch_use_case_service(self) -> TransactionExportBatchUseCaseService:
        service = getattr(self, "transaction_export_batch_use_case_service", None)
        if service is None:
            service = TransactionExportBatchUseCaseService(export_service=self, logger=LOGGER)
            self.transaction_export_batch_use_case_service = service
        return service

    # edited by glg
    def _get_snapshot_query_service(self) -> TransactionExportSnapshotQueryService:
        service = getattr(self, "transaction_export_snapshot_query_service", None)
        if service is None:
            service = TransactionExportSnapshotQueryService(export_service=self)
            self.transaction_export_snapshot_query_service = service
        return service

    # edited by glg
    def _as_number_or_default(self, value, default=0):
        return TransactionExportValueUtils.as_number_or_default(value, default)

    # edited by glg
    # Normalisasi integer positif; nilai <= 0 dianggap tidak valid.
    def _as_positive_int(self, value, default=0) -> int:
        return TransactionExportValueUtils.as_positive_int(value, default)

    # edited by glg
    # Ambil context device real dari per_cabang_device untuk isi cabang_id/toko_id export.
    def _get_export_device_context(self, machine_id_hint: str = "") -> Dict[str, Any]:
        cache = getattr(self, "_device_context_cache", None)
        if cache is None:
            cache = {}
            setattr(self, "_device_context_cache", cache)
        context = self.transaction_export_device_context_service.get_export_device_context(
            machine_id_hint=machine_id_hint,
            cache=cache,
        )
        # edited by glg
        # Kompatibilitas test/hardening: sinkronkan fallback device context dari helper lokal.
        try:
            active_info = get_active_device_info(machine_id_hint or get_device_id()) or {}
        except (TypeError, ValueError, KeyError, AttributeError, RuntimeError, OSError):
            active_info = {}
        if isinstance(active_info, dict):
            active_machine = str(active_info.get("machine_id") or "").strip()
            if active_machine:
                context["machine_id"] = active_machine
            active_cabang = self._as_positive_int(active_info.get("cabang_id"), 0)
            if active_cabang > 0 and self._as_positive_int(context.get("cabang_id"), 0) <= 0:
                context["cabang_id"] = active_cabang
            active_toko = self._as_positive_int(active_info.get("toko_id"), 0)
            if active_toko > 0 and self._as_positive_int(context.get("toko_id"), 0) <= 0:
                context["toko_id"] = active_toko
        return context

    def _resolve_export_compression(self, config: Dict[str, Any]) -> str:
        return self.transaction_export_payload_compression_service.resolve_export_compression(
            config
        )

    # edited by glg
    # Bridge konfigurasi agar patch test tetap kompatibel walau hotspot diekstrak ke use-case service.
    def _read_config(self) -> Dict[str, Any]:
        cfg = read_config()
        return cfg if isinstance(cfg, dict) else {}

    # edited by glg
    # Bridge machine id agar patch test tetap kompatibel setelah dekomposisi hotspot.
    def _get_device_id(self) -> str:
        return str(get_device_id() or "").strip()

    def _write_compressed_payload(self, tmp_path: str, payload: Any, codec: str, config: Dict[str, Any]):
        return self.transaction_export_payload_compression_service.write_compressed_payload(
            tmp_path=tmp_path,
            payload=payload,
            codec=codec,
            config=config,
        )

    def _is_blank_value(self, value):
        return TransactionExportValueUtils.is_blank_value(value)

    # edited by glg
    # Ambil metadata kolom SQLite untuk sanitasi payload berbasis schema lokal.
    def _get_table_schema_meta(self, table_name: str) -> Dict[str, Dict[str, Any]]:
        cache = getattr(self, "_table_schema_meta_cache", None)
        if cache is None:
            cache = {}
            setattr(self, "_table_schema_meta_cache", cache)

        key = str(table_name or "").strip()
        if not key:
            return {}
        if key in cache:
            return cache.get(key) or {}

        meta = {}
        try:
            conn = self._connect()
            try:
                cur = conn.cursor()
                cur.execute(build_pragma_table_info_sql(key))
                for row in cur.fetchall() or []:
                    # PRAGMA table_info: cid, name, type, notnull, dflt_value, pk
                    col_name = str(row[1] or "").strip()
                    if not col_name:
                        continue
                    decl_type = str(row[2] or "").strip().upper()
                    is_notnull = int(row[3] or 0) == 1
                    meta[col_name] = {
                        "decl_type": decl_type,
                        "notnull": is_notnull,
                    }
            finally:
                conn.close()
        except (ValueError, sqlite3.Error, RuntimeError):
            meta = {}

        cache[key] = meta
        return meta

    # edited by glg
    # Default value berdasarkan tipe kolom SQLite.
    def _default_by_decl_type(self, decl_type: str):
        raw = str(decl_type or "").upper()
        numeric_markers = ("INT", "REAL", "FLOA", "DOUB", "NUM", "DEC", "BOOL")
        if any(marker in raw for marker in numeric_markers):
            return 0
        return ""

    # edited by glg
    # Isi nilai null untuk kolom NOT NULL sesuai schema lokal agar payload lebih stabil.
    def _sanitize_row_by_notnull_schema(self, table_name: str, row: Dict[str, Any]) -> Dict[str, Any]:
        if not isinstance(row, dict):
            return row
        schema_meta = self._get_table_schema_meta(table_name)
        if not schema_meta:
            return row
        for col_name, meta in schema_meta.items():
            if not meta.get("notnull"):
                continue
            if col_name not in row:
                continue
            if not self._is_blank_value(row.get(col_name)):
                continue
            row[col_name] = self._default_by_decl_type(str(meta.get("decl_type") or ""))
        return row

    # edited by glg
    # Normalisasi label jenis transaksi untuk guard invoice-only export.
    def _normalize_transaksi_label(self, value) -> str:
        return str(value or "").strip().lower()

    # edited by glg
    # Invoice legacy bisa tersimpan dengan label kosong/null, tetap diperlakukan sebagai invoice.
    def _is_invoice_transaksi_label(self, value) -> bool:
        label = self._normalize_transaksi_label(value)
        return label in {"", "invoice", "penjualan", "jual", "transaksi"}

    # edited by glg
    # Rule export transaksi:
    # transaksi yang sudah dibatalkan tidak boleh lagi dikirim sebagai 582.
    # Event pembatalan harus lewat jalur return/cancellation (982).
    def _is_cancelled_transaksi_row(self, row: Dict[str, Any]) -> bool:
        if not isinstance(row, dict):
            return False
        if not self._is_blank_value(row.get("cancel_dtime")):
            return True
        if self._to_int(row.get("status_cancel"), 0) == 1:
            return True
        if self._to_int(row.get("cancel_id"), 0) > 0:
            return True
        if not self._is_blank_value(row.get("cancel_name")):
            return True
        if self._to_int(row.get("cancel_transaksi_id"), 0) > 0:
            return True
        if not self._is_blank_value(row.get("cancel_transaksi_nomer")):
            return True
        return False

    # edited by glg
    # Export transaksi hanya untuk invoice agar draft simpan_transaksi tidak ikut terkirim.
    def _filter_transaksi_invoice_rows(
        self,
        rows: List[Dict[str, Any]],
        source: str = "main",
    ) -> List[Dict[str, Any]]:
        if not rows:
            return []
        filtered: List[Dict[str, Any]] = []
        dropped = 0
        included_cancel = 0
        dropped_labels: Dict[str, int] = {}
        for row in rows:
            if not isinstance(row, dict):
                continue
            label = self._normalize_transaksi_label(row.get("jenis_label"))
            if not self._is_invoice_transaksi_label(label):
                dropped += 1
                label_key = label or "(empty)"
                dropped_labels[label_key] = dropped_labels.get(label_key, 0) + 1
                continue
            # edited by glg
            # Invoice batal tetap harus ikut export agar endpoint server bisa
            # melakukan rekonsiliasi status cancel terhadap transaksi asal.
            if self._is_cancelled_transaksi_row(row):
                included_cancel += 1
            filtered.append(row)
        if dropped > 0 or included_cancel > 0:
            LOGGER.info(
                "[DEBUG EXPORT] transaksi invoice_only filter source=%s kept=%s included_cancel=%s dropped=%s labels=%s",
                source,
                len(filtered),
                included_cancel,
                dropped,
                dropped_labels,
            )
        return filtered

    def _resolve_payload_profile(self, config: Dict[str, Any]) -> str:
        raw_profile = str((config or {}).get("export_payload_profile") or "").strip().lower()
        # edited by glg
        # legacy_strict dipetakan ke legacy_bundle agar isi file mengikuti format lama
        # (array transaksi: items/free_items/payment), sementara mode modern tetap tersedia.
        if raw_profile in {"legacy_strict", "legacy_bundle", "legacy_transaction_bundle"}:
            return "legacy_bundle"
        if raw_profile in {"modern", "legacy_table"}:
            return raw_profile
        return "legacy_bundle"

    def _normalize_rows_for_payload_profile(
        self,
        table_name: str,
        rows: List[Dict[str, Any]],
        machine_id: str,
        payload_profile: str,
    ) -> List[Dict[str, Any]]:
        normalized = []
        for raw_row in rows or []:
            if not isinstance(raw_row, dict):
                continue
            row = dict(raw_row)

            if machine_id and self._is_blank_value(row.get("machine_id")):
                row["machine_id"] = machine_id

            if table_name in {"transaksi", "transaksi_data"}:
                if self._is_blank_value(row.get("x_id")) and not self._is_blank_value(row.get("id")):
                    row["x_id"] = row.get("id")
                if payload_profile == "legacy_strict" and "id" in row:
                    row.pop("id", None)

            normalized.append(row)
        return normalized

    def _normalize_rows_for_export(self, table_name: str, rows: List[Dict[str, Any]], batch_end: str) -> List[Dict[str, Any]]:
        config = read_config() or {}
        device_context = self._get_export_device_context()
        machine_fallback = str(get_device_id() or "").strip()
        return self._get_hotspot_use_case_service().normalize_rows_for_export(
            table_name=table_name,
            rows=rows,
            batch_end=batch_end,
            config=config,
            device_context=device_context,
            machine_fallback=machine_fallback,
        )

    # edited by glg
    # Lookup nama cabang dari master per_cabang untuk backfill transaksi lama.
    def _lookup_cabang_nama_for_backfill(self, cursor, cabang_id: int) -> str:
        cabang_int = self._as_positive_int(cabang_id, 0)
        if cabang_int <= 0:
            return ""
        try:
            cursor.execute(
                """
                SELECT nama
                FROM per_cabang
                WHERE id = ?
                  AND COALESCE(status, 1) = 1
                  AND COALESCE(trash, 0) = 0
                LIMIT 1
                """,
                (cabang_int,),
            )
            row = cursor.fetchone()
            if row and row[0]:
                return str(row[0]).strip()
        except sqlite3.Error:
            return ""
        return ""

    # edited by glg
    # Fallback nama cabang untuk payload settlement saat field cabang_nama kosong.
    def _lookup_cabang_nama_by_id(self, cabang_id: int) -> str:
        cabang_int = self._as_positive_int(cabang_id, 0)
        if cabang_int <= 0:
            return ""
        cache = getattr(self, "_cabang_nama_lookup_cache", None)
        if cache is None:
            cache = {}
            setattr(self, "_cabang_nama_lookup_cache", cache)
        if cabang_int in cache:
            return str(cache.get(cabang_int) or "").strip()
        if not self._table_exists("per_cabang"):
            cache[cabang_int] = ""
            return ""
        conn = self._connect()
        try:
            cur = conn.cursor()
            cabang_nama = str(self._lookup_cabang_nama_for_backfill(cur, cabang_int) or "").strip()
            cache[cabang_int] = cabang_nama
            return cabang_nama
        except (sqlite3.Error, RuntimeError):
            cache[cabang_int] = ""
            return ""
        finally:
            conn.close()

    # edited by glg
    # Resolver ID approver settlement dari login admin untuk fallback settlement_oto_id.
    def _lookup_employee_id_by_login(self, login_name: str) -> int:
        login = str(login_name or "").strip()
        if not login:
            return 0
        cache = getattr(self, "_employee_id_lookup_cache", None)
        if cache is None:
            cache = {}
            setattr(self, "_employee_id_lookup_cache", cache)
        cache_key = login.lower()
        if cache_key in cache:
            return int(self._to_int(cache.get(cache_key), 0))
        if not self._table_exists("per_employee"):
            cache[cache_key] = 0
            return 0
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                SELECT id
                FROM per_employee
                WHERE LOWER(COALESCE(nama_login, '')) = LOWER(?)
                ORDER BY id ASC
                LIMIT 1
                """,
                (login,),
            )
            row = cur.fetchone()
            employee_id = 0
            if row:
                raw = row[0] if isinstance(row, (tuple, list)) else row["id"]
                employee_id = self._as_positive_int(raw, 0)
            cache[cache_key] = int(employee_id)
            return int(employee_id)
        except sqlite3.Error:
            cache[cache_key] = 0
            return 0
        finally:
            conn.close()

    # edited by glg
    # Resolver gudang default per cabang (single source) untuk transaksi/export POS.
    def _resolve_cabang_default_gudang_for_backfill(self, cursor, cabang_id: int):
        cabang_int = self._as_positive_int(cabang_id, 0)
        if cabang_int <= 0:
            return 0, ""

        cache = getattr(self, "_cabang_default_gudang_cache", None)
        if cache is None:
            cache = {}
            setattr(self, "_cabang_default_gudang_cache", cache)
        if cabang_int in cache:
            cached = cache.get(cabang_int) or {}
            return int(self._to_int(cached.get("gudang_id"), 0)), str(cached.get("gudang_nama") or "").strip()

        try:
            cursor.execute("PRAGMA table_info(per_cabang)")
            columns = {str(row[1]) for row in cursor.fetchall() if row and len(row) > 1 and row[1]}
            if not columns:
                cache[cabang_int] = {"gudang_id": 0, "gudang_nama": ""}
                return 0, ""

            gudang_id_candidates = (
                "gudang_id",
                "gudangID",
                "default_gudang_id",
                "id_gudang",
            )
            gudang_nama_candidates = (
                "gudang_nama",
                "gudangName",
                "default_gudang_nama",
                "nama_gudang",
            )
            gudang_id_col = next((name for name in gudang_id_candidates if name in columns), "")
            gudang_nama_col = next((name for name in gudang_nama_candidates if name in columns), "")
            if not gudang_id_col and not gudang_nama_col:
                cache[cabang_int] = {"gudang_id": 0, "gudang_nama": ""}
                return 0, ""

            selected_cols = ["id"]
            if gudang_id_col:
                selected_cols.append(gudang_id_col)
            if gudang_nama_col:
                selected_cols.append(gudang_nama_col)
            where_status = " AND COALESCE(status, 1) = 1" if "status" in columns else ""
            where_trash = " AND COALESCE(trash, 0) = 0" if "trash" in columns else ""
            cursor.execute(
                (
                    f"SELECT {', '.join(selected_cols)} "  # nosec B608
                    f"FROM per_cabang "
                    f"WHERE id = ?{where_status}{where_trash} "
                    f"LIMIT 1"
                ),
                (cabang_int,),
            )
            row = cursor.fetchone()
            if not row:
                cache[cabang_int] = {"gudang_id": 0, "gudang_nama": ""}
                return 0, ""

            description = getattr(cursor, "description", None) or []
            row_map = {}
            for idx, desc in enumerate(description):
                if idx < len(row):
                    row_map[str(desc[0])] = row[idx]
            resolved_gudang_id = int(
                self._to_int(
                    row_map.get(gudang_id_col) if gudang_id_col else 0,
                    0,
                )
            )
            resolved_gudang_nama = str(
                row_map.get(gudang_nama_col) if gudang_nama_col else ""
            ).strip()
            cache[cabang_int] = {
                "gudang_id": resolved_gudang_id,
                "gudang_nama": resolved_gudang_nama,
            }
            return resolved_gudang_id, resolved_gudang_nama
        except (sqlite3.Error, ValueError, TypeError):
            cache[cabang_int] = {"gudang_id": 0, "gudang_nama": ""}
            return 0, ""

    # edited by glg
    # Lookup nama toko dari company_profile untuk backfill transaksi lama.
    def _lookup_toko_nama_for_backfill(self, cursor, toko_id: int) -> str:
        toko_int = self._as_positive_int(toko_id, 0)
        if toko_int <= 0:
            return ""
        try:
            cursor.execute(
                """
                SELECT nama
                FROM company_profile
                WHERE toko_id = ?
                  AND COALESCE(status, 1) = 1
                  AND COALESCE(trash, 0) = 0
                ORDER BY id ASC
                LIMIT 1
                """,
                (toko_int,),
            )
            row = cursor.fetchone()
            if row and row[0]:
                return str(row[0]).strip()
        except sqlite3.Error:
            return ""
        return ""

    # edited by glg
    # Resolusi rekening pembayaran agar bank_rekening_* tidak kosong pada transaksi tersimpan.
    def _resolve_bank_rekening_for_backfill(self, cursor, bank_id: int, cabang_id: int, is_tunai: bool):
        preferred_jenis = "account_cash" if is_tunai else "account_in"
        fallback_id = 108 if is_tunai else 110
        fallback_nama = "Tunai" if is_tunai else "Non Tunai"
        cabang_int = self._as_positive_int(cabang_id, 0)
        bank_int = self._as_positive_int(bank_id, 0)
        try:
            params: List[Any] = [preferred_jenis]
            where_cabang = ""
            if cabang_int > 0:
                where_cabang = (
                    "AND (cabang_id = ? OR cabang_id = -1 OR cabang_id = 0 OR cabang_id IS NULL)"
                )
                params.append(cabang_int)
            # edited by glg
            # Bandit B608: where_cabang hanya dari fragment internal whitelist, nilai tetap via bind parameter.
            query = (
                "SELECT id, nama "  # nosec B608
                "FROM bank "
                "WHERE jenis = ? "
                "AND COALESCE(status, 1) = 1 "
                "AND COALESCE(trash, 0) = 0 "
                f"{where_cabang} "
                "ORDER BY CASE WHEN id = ? THEN 0 ELSE 1 END, id ASC "
                "LIMIT 1"
            ) 
            cursor.execute(query, tuple(params + [bank_int]))
            row = cursor.fetchone()
            if row:
                return int(row[0]), str(row[1] or fallback_nama)
        except sqlite3.Error:
            return fallback_id, fallback_nama
        return fallback_id, fallback_nama

    # edited by glg
    # Backfill transaksi/transaksi_data lama agar kolom konteks tidak kosong saat bisa diturunkan dari master.
    def _backfill_transaksi_rows_in_db(self, rows: List[Dict[str, Any]], batch_end: str) -> List[Dict[str, Any]]:
        config = read_config() or {}
        device_context = self._get_export_device_context()
        return self._get_hotspot_use_case_service().backfill_transaksi_rows_in_db(
            rows=rows,
            batch_end=batch_end,
            config=config,
            device_context=device_context,
        )

    def _build_master_index(self, transaksi_rows: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]:
        # edited by glg
        # Index master transaksi untuk validasi triplet dan sinkronisasi dtime detail.
        index = {}
        for row in transaksi_rows or []:
            if not isinstance(row, dict):
                continue
            xid = row.get("x_id")
            if self._is_blank_value(xid):
                xid = row.get("id")
            key = str(xid or "").strip()
            if not key:
                continue
            dtime = str(row.get("dtime") or "").strip()
            machine_id = str(row.get("machine_id") or "").strip()
            index[key] = {
                "dtime": dtime,
                "date": dtime[:10] if dtime else "",
                "machine_id": machine_id,
                "nomer": row.get("nomer"),
            }
        return index

    def _align_detail_and_registry_rows(
        self,
        rows: List[Dict[str, Any]],
        table_name: str,
        master_index: Dict[str, Dict[str, Any]],
    ) -> List[Dict[str, Any]]:
        # edited by glg
        # Jaga konsistensi master/detail/registry: transaksi_id harus ada di master.
        # Untuk detail, dtime dipaksa mengikuti dtime master agar tidak mismatch tanggal.
        if not rows or not master_index or table_name not in {"transaksi_data", "transaksi_data_registry"}:
            return rows

        filtered_rows = []
        dropped_missing_master = 0
        fixed_machine = 0
        fixed_dtime = 0
        dropped_registry_blob = 0
        fixed_registry_main = 0
        fixed_registry_items = 0

        for row in rows:
            if not isinstance(row, dict):
                continue
            transaksi_id = str(
                row.get("transaksi_id")
                if not self._is_blank_value(row.get("transaksi_id"))
                else row.get("transaksiID")
                or ""
            ).strip()
            if not transaksi_id:
                dropped_missing_master += 1
                continue

            master = master_index.get(transaksi_id)
            if not master:
                dropped_missing_master += 1
                continue

            master_machine = str(master.get("machine_id") or "").strip()
            row_machine = str(row.get("machine_id") or "").strip()
            if master_machine and row_machine != master_machine:
                row["machine_id"] = master_machine
                fixed_machine += 1

            if table_name == "transaksi_data":
                master_dtime = str(master.get("dtime") or "").strip()
                detail_dtime = str(row.get("dtime") or "").strip()
                if master_dtime:
                    if not detail_dtime or detail_dtime[:10] != master_dtime[:10]:
                        row["dtime"] = master_dtime
                        fixed_dtime += 1
                if self._is_blank_value(row.get("x_id")) and not self._is_blank_value(row.get("id")):
                    row["x_id"] = row.get("id")
                if self._is_blank_value(row.get("x_id")) and not self._is_blank_value(row.get("transaksi_id")):
                    row["x_id"] = row.get("transaksi_id")
            else:
                if not self._registry_main_has_dtime(row.get("main")):
                    row["main"] = self._build_min_registry_main_blob(transaksi_id, master)
                    fixed_registry_main += 1
                if self._is_blank_value(row.get("items")):
                    row["items"] = _EMPTY_PHP_ARRAY_B64
                    fixed_registry_items += 1
                if self._is_blank_value(row.get("main")):
                    dropped_registry_blob += 1
                    continue

            filtered_rows.append(row)

        if (
            dropped_missing_master
            or fixed_machine
            or fixed_dtime
            or dropped_registry_blob
            or fixed_registry_main
            or fixed_registry_items
        ):
            LOGGER.warning(
                "[DEBUG EXPORT] triplet align table=%s total=%s keep=%s drop_missing_master=%s drop_registry_blob=%s fix_machine=%s fix_dtime=%s fix_registry_main=%s fix_registry_items=%s",
                table_name,
                len(rows),
                len(filtered_rows),
                dropped_missing_master,
                dropped_registry_blob,
                fixed_machine,
                fixed_dtime,
                fixed_registry_main,
                fixed_registry_items,
            )
        return filtered_rows

    def _quote_identifier(self, identifier: str) -> str:
        return self._get_db_service().quote_identifier(identifier)

    # edited by glg
    # Bangun set-clause update dengan identifier aman.
    def _build_update_set_clause(self, mapping: Dict[str, Any]) -> str:
        parts = []
        for key_name in (mapping or {}).keys():
            parts.append(f"{self._quote_identifier(key_name)} = ?")
        return ", ".join(parts)

    def _table_exists(self, table_name: str) -> bool:
        return self._get_db_service().table_exists(table_name)

    def _php_serialize(self, value):
        if value is None:
            return "N;"
        if isinstance(value, bool):
            return f"b:{1 if value else 0};"
        if isinstance(value, int):
            return f"i:{value};"
        if isinstance(value, float):
            return f"d:{value};"
        if isinstance(value, str):
            encoded = value.encode("utf-8")
            return f's:{len(encoded)}:"{value}";'
        if isinstance(value, (list, tuple)):
            items = []
            for idx, val in enumerate(value):
                items.append(self._php_serialize(idx))
                items.append(self._php_serialize(val))
            return f'a:{len(value)}:{{{"".join(items)}}}'
        if isinstance(value, dict):
            items = []
            for key, val in value.items():
                if isinstance(key, int):
                    items.append(self._php_serialize(key))
                else:
                    items.append(self._php_serialize(str(key)))
                items.append(self._php_serialize(val))
            return f'a:{len(value)}:{{{"".join(items)}}}'
        return self._php_serialize(str(value))

    def _php_b64(self, value):
        serialized = self._php_serialize(value)
        return base64.b64encode(serialized.encode("utf-8")).decode("ascii")

    # edited by glg
    # Validasi minimal blob registry: harus bisa decode base64 dan memuat marker dtime.
    def _registry_main_has_dtime(self, main_blob) -> bool:
        if self._is_blank_value(main_blob):
            return False
        try:
            raw = base64.b64decode(str(main_blob))
            text = raw.decode("utf-8", errors="ignore")
            return "dtime" in text
        except (TypeError, ValueError, binascii.Error):
            return False

    # edited by glg
    # Bangun blob main minimum agar compileFilesDataRegistryV2 tidak menolak payload.
    def _build_min_registry_main_blob(self, transaksi_id: str, master: Dict[str, Any]) -> str:
        master = master or {}
        dtime_val = str(master.get("dtime") or datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
        payload = {
            "transaksi_id": str(transaksi_id or ""),
            "nomer": master.get("nomer") or str(transaksi_id or ""),
            "nomer2": master.get("nomer") or str(transaksi_id or ""),
            "jenis": master.get("jenis") or 582,
            "dtime": dtime_val,
            "machine_id": master.get("machine_id") or "",
        }
        return self._php_b64(payload)

    def _build_virtual_registry_rows(
        self,
        transaksi_rows: List[Dict[str, Any]],
        transaksi_data_rows: List[Dict[str, Any]],
        batch_end: str,
        last_id: int,
        limit: int = 0,
    ) -> List[Dict[str, Any]]:
        # edited by glg
        # Fallback saat tabel transaksi_data_registry tidak ada di lokal.
        # Worker backend compileFilesDataRegistryV2 tetap butuh file registry non-empty.
        device_context = self._get_export_device_context()
        machine_id = str(device_context.get("machine_id") or get_device_id() or "").strip()

        transaksi_map = {}
        for row in transaksi_rows or []:
            if not isinstance(row, dict):
                continue
            tr_id = self._to_int(row.get("id"), 0)
            if tr_id <= int(last_id or 0):
                continue
            transaksi_map[tr_id] = dict(row)
        if not transaksi_map:
            return []

        details_by_trx = {}
        for row in transaksi_data_rows or []:
            if not isinstance(row, dict):
                continue
            tr_id = self._to_int(row.get("transaksi_id"), 0)
            if tr_id <= 0:
                continue
            details_by_trx.setdefault(tr_id, []).append(dict(row))

        default_fields = {
            "items2": _EMPTY_PHP_ARRAY_B64,
            "items2_sum": _EMPTY_PHP_ARRAY_B64,
            "itemSrc": _EMPTY_PHP_ARRAY_B64,
            "itemSrc_sum": _EMPTY_PHP_ARRAY_B64,
            "items3": _EMPTY_PHP_ARRAY_B64,
            "items3_sum": _EMPTY_PHP_ARRAY_B64,
            "items4": _EMPTY_PHP_ARRAY_B64,
            "items4_sum": _EMPTY_PHP_ARRAY_B64,
            "items_noapprove": _EMPTY_PHP_ARRAY_B64,
            "rsltItems": _EMPTY_PHP_ARRAY_B64,
            "rsltItems2": _EMPTY_PHP_ARRAY_B64,
            "rsltItems3": _EMPTY_PHP_ARRAY_B64,
            "tableIn_master": _EMPTY_PHP_ARRAY_B64,
            "tableIn_detail": _EMPTY_PHP_ARRAY_B64,
            "tableIn_detail2_sum": _EMPTY_PHP_ARRAY_B64,
            "tableIn_detail_rsltItems": _EMPTY_PHP_ARRAY_B64,
            "tableIn_detail_rsltItems2": _EMPTY_PHP_ARRAY_B64,
            "tableIn_master_values": _EMPTY_PHP_ARRAY_B64,
            "tableIn_detail_values": _EMPTY_PHP_ARRAY_B64,
            "tableIn_detail_values_rsltItems": _EMPTY_PHP_ARRAY_B64,
            "tableIn_detail_values_rsltItems2": _EMPTY_PHP_ARRAY_B64,
            "tableIn_detail_values2_sum": _EMPTY_PHP_ARRAY_B64,
            "main_add_values": _EMPTY_PHP_ARRAY_B64,
            "main_add_fields": _EMPTY_PHP_ARRAY_B64,
            "main_elements": _EMPTY_PHP_ARRAY_B64,
            "main_inputs": _EMPTY_PHP_ARRAY_B64,
            "main_inputs_orig": _EMPTY_PHP_ARRAY_B64,
            "receiptDetailFields": _EMPTY_PHP_ARRAY_B64,
            "receiptSumFields": _EMPTY_PHP_ARRAY_B64,
            "receiptDetailFields2": _EMPTY_PHP_ARRAY_B64,
            "receiptDetailSrcFields": _EMPTY_PHP_ARRAY_B64,
            "receiptSumFields2": _EMPTY_PHP_ARRAY_B64,
            "jurnal_index": _EMPTY_PHP_ARRAY_B64,
            "postProcessor": _EMPTY_PHP_ARRAY_B64,
            "preProcessor": _EMPTY_PHP_ARRAY_B64,
            "revert": _EMPTY_PHP_ARRAY_B64,
            "items_komposisi": _EMPTY_PHP_ARRAY_B64,
            "jurnalItems": _EMPTY_PHP_ARRAY_B64,
            "componentsBuilder": _EMPTY_PHP_ARRAY_B64,
            "items5_sum": _EMPTY_PHP_ARRAY_B64,
            "items6_sum": _EMPTY_PHP_ARRAY_B64,
            "items7_sum": _EMPTY_PHP_ARRAY_B64,
            "items8_sum": _EMPTY_PHP_ARRAY_B64,
            "items9_sum": _EMPTY_PHP_ARRAY_B64,
            "items10_sum": _EMPTY_PHP_ARRAY_B64,
            "rsltItems3_sub": None,
            "rsltItems_revert": None,
            "rsltItems2_revert": None,
            "requiredParam": _EMPTY_PHP_ARRAY_B64,
            "item_price": None,
            "items_elements": _EMPTY_PHP_ARRAY_B64,
            "itemPrice_sum": _EMPTY_PHP_ARRAY_B64,
            "mainOriginal": None,
            "itemsOriginal": None,
            "itemPrice": _EMPTY_PHP_ARRAY_B64,
            "coreBuilder": _EMPTY_PHP_ARRAY_B64,
            "diskon_event": _EMPTY_PHP_ARRAY_B64,
            "cashback_event": _EMPTY_PHP_ARRAY_B64,
            "free_items": _EMPTY_PHP_ARRAY_B64,
        }

        registry_rows = []
        trx_ids = sorted(transaksi_map.keys())
        for trx_id in trx_ids:
            tr = transaksi_map.get(trx_id) or {}
            details = details_by_trx.get(trx_id) or []
            items_payload = {}
            for d in details:
                item_key = str(d.get("id") or d.get("produk_id") or len(items_payload) + 1)
                items_payload[item_key] = {
                    "produk_id": d.get("produk_id"),
                    "qty": d.get("valid_qty") or d.get("produk_ord_jml") or d.get("qty"),
                    "harga": d.get("harga_jual") or d.get("produk_ord_hrg") or d.get("harga"),
                    "subtotal": d.get("harga_net") or d.get("produk_ord_nilai") or d.get("subtotal"),
                }

            main_payload = {
                "transaksi_id": trx_id,
                "nomer": tr.get("nomer"),
                "nomer2": tr.get("nomer2"),
                "jenis": tr.get("jenis"),
                "dtime": tr.get("dtime") or tr.get("datetime") or str(batch_end or ""),
                "machine_id": machine_id,
                "point_transaksi": self._to_int(
                    tr.get("point_transaksi"),
                    self._to_int(self._parse_semicolon_log(tr.get("diskon_log")).get("point"), 0),
                ),
                "cash_account": tr.get("cash_account") or "",
                "cash_account__nama": tr.get("cash_account__nama") or "",
            }

            row = {
                "transaksi_id": str(trx_id),
                "x_id": str(trx_id),
                "machine_id": machine_id,
                "datetime": tr.get("dtime") or tr.get("datetime") or None,
                "main": self._php_b64(main_payload),
                "items": self._php_b64(items_payload),
            }
            row.update(default_fields)
            registry_rows.append(row)
            if limit and int(limit) > 0 and len(registry_rows) >= int(limit):
                break
        return registry_rows

    # edited by glg
    def _emit_progress(self, callback, percent):
        if not callable(callback):
            return
        try:
            value = int(percent)
        except (TypeError, ValueError):
            value = 0
        value = max(0, min(100, value))
        try:
            callback(value)
        except (RuntimeError, TypeError, ValueError):
            LOGGER.debug("[DEBUG EXPORT] callback progress gagal")

    # edited by glg
    def export_batch(self, progress_callback: Optional[Callable[[int], None]] = None) -> int:
        # Delegasi ke use-case service agar hotspot export_batch tetap kecil dan terukur.
        return int(
            self._get_batch_use_case_service().run_export_batch(
                progress_callback=progress_callback,
            )
            or 0
        )

    def _connect(self):
        return self._get_db_service().connect()

    def _ensure_tables(self):
        run_schema_migrations_once(self.db_path, strict=False)
        conn = self._connect()
        cur = conn.cursor()
        cur.execute("SELECT COUNT(1) FROM export_file_seq WHERE id = 1")
        if cur.fetchone()[0] == 0:
            cur.execute(
                "INSERT INTO export_file_seq (id, last_no) VALUES (1, ?)",
                (self._guess_last_seq(),)
            )
        self._ensure_column(cur, "export_flux", "table_name", "TEXT")
        self._ensure_column(cur, "export_flux", "server_hash", "TEXT")
        self._ensure_column(cur, "export_flux", "file_seq", "INTEGER")
        self._ensure_column(cur, "export_flux", "file_hash", "TEXT")
        self._ensure_column(cur, "export_flux", "file_size", "INTEGER")
        self._ensure_column(cur, "export_flux", "attempt_count", "INTEGER")
        self._ensure_column(cur, "export_flux", "next_retry_at", "TEXT")
        self._ensure_column(cur, "export_flux", "error_code", "TEXT")
        self._ensure_column(cur, "export_cursor_table", "server_hash", "TEXT")
        cur.execute(
            """
            CREATE TABLE IF NOT EXISTS export_retry_state (
                table_name TEXT NOT NULL,
                server_hash TEXT NOT NULL,
                attempt_count INTEGER NOT NULL DEFAULT 0,
                next_retry_at TEXT NULL,
                error_code TEXT NULL,
                last_error TEXT NULL,
                updated_at TEXT NOT NULL,
                PRIMARY KEY (table_name, server_hash)
            )
            """
        )
        cur.execute(
            "CREATE INDEX IF NOT EXISTS idx_export_retry_state_next_retry ON export_retry_state(next_retry_at)"
        )
        # best-effort migrasi cursor lama ke scoped (hanya untuk server aktif)
        try:
            server_hash = get_current_server_hash()
            cur.execute("SELECT COUNT(1) FROM export_cursor_table_scoped")
            if cur.fetchone()[0] == 0:
                cur.execute("SELECT table_name, last_id FROM export_cursor_table")
                rows = cur.fetchall()
                for r in rows:
                    cur.execute(
                        "INSERT OR IGNORE INTO export_cursor_table_scoped (table_name, server_hash, last_id) VALUES (?, ?, ?)",
                        (r[0], server_hash, int(r[1] or 0)),
                    )
        except (sqlite3.Error, ValueError, TypeError) as exc:
            LOGGER.debug("[DEBUG EXPORT] migrasi cursor scoped diabaikan: %s", exc)
        conn.commit()
        conn.close()

    def _ensure_column(self, cursor, table_name, col_name, col_type):
        self._get_db_service().ensure_column(cursor, table_name, col_name, col_type)

    def _get_cursor(self, table_name: str) -> int:
        conn = self._connect()
        cur = conn.cursor()
        server_hash = get_current_server_hash()
        cur.execute(
            "SELECT last_id FROM export_cursor_table_scoped WHERE table_name = ? AND server_hash = ?",
            (table_name, server_hash),
        )
        row = cur.fetchone()
        conn.close()
        if not row:
            return 0
        return int(row["last_id"])

    # edited by glg
    def _get_last_success_batch_end(self, table_name: str, server_hash: str) -> str:
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                SELECT batch_end
                FROM export_flux
                WHERE table_name = ?
                  AND server_hash = ?
                  AND status = 'SUCCESS'
                ORDER BY id DESC
                LIMIT 1
                """,
                (table_name, server_hash),
            )
            row = cur.fetchone()
            if not row:
                return ""
            return str(row["batch_end"] or "").strip()
        except (sqlite3.Error, ValueError, TypeError):
            return ""
        finally:
            conn.close()

    # edited by glg
    def _fetch_transaksi_cancel_updates(
        self,
        start_batch_end: str,
        end_batch_end: str,
        limit: int = 0,
    ) -> List[Dict[str, Any]]:
        start_dt = str(start_batch_end or "").strip()
        end_dt = str(end_batch_end or "").strip()
        if not start_dt or not end_dt:
            return []
        if not self._table_exists("transaksi"):
            return []

        conn = self._connect()
        try:
            cur = conn.cursor()
            query = (
                "SELECT * FROM transaksi "
                "WHERE COALESCE(cancel_dtime, '') <> '' "
                "AND datetime(cancel_dtime) >= datetime(?) "
                "AND datetime(cancel_dtime) <= datetime(?) "
                "ORDER BY datetime(cancel_dtime) ASC, id ASC"
            )
            params: List[Any] = [start_dt, end_dt]
            if limit and int(limit) > 0:
                query = f"{query} LIMIT ?"
                params.append(int(limit))
            cur.execute(query, tuple(params))
            rows = cur.fetchall() or []
            return [dict(row) for row in rows]
        except (sqlite3.Error, ValueError, TypeError) as exc:
            LOGGER.warning("[DEBUG EXPORT] gagal fetch update pembatalan transaksi: %s", exc)
            return []
        finally:
            conn.close()

    # edited by glg
    def _merge_transaksi_rows_for_export(
        self,
        main_rows: List[Dict[str, Any]],
        extra_rows: List[Dict[str, Any]],
    ) -> List[Dict[str, Any]]:
        merged_by_id: Dict[int, Dict[str, Any]] = {}
        ordered_ids: List[int] = []

        def _push_rows(rows: List[Dict[str, Any]]):
            for row in rows or []:
                if not isinstance(row, dict):
                    continue
                trx_id = self._to_int(row.get("id"), 0)
                if trx_id <= 0:
                    continue
                if trx_id not in merged_by_id:
                    ordered_ids.append(trx_id)
                merged_by_id[trx_id] = dict(row)

        _push_rows(main_rows)
        _push_rows(extra_rows)
        return [merged_by_id[trx_id] for trx_id in ordered_ids if trx_id in merged_by_id]

    # edited by glg
    # Jangan kirim ulang nota lama yang sudah diexport pada batch sebelumnya.
    # Re-export nota pembatalan hanya boleh jika nota asli memang termasuk
    # main_rows batch saat ini (nota asli + pembatalan dalam satu export).
    def _scope_cancel_updates_to_current_main_batch(
        self,
        main_rows: List[Dict[str, Any]],
        cancel_rows: List[Dict[str, Any]],
    ) -> List[Dict[str, Any]]:
        main_ids = set()
        for row in main_rows or []:
            if not isinstance(row, dict):
                continue
            trx_id = self._to_int(row.get("id"), 0)
            if trx_id > 0:
                main_ids.add(int(trx_id))
        if not main_ids:
            return []

        scoped_rows: List[Dict[str, Any]] = []
        seen_ids = set()
        for row in cancel_rows or []:
            if not isinstance(row, dict):
                continue
            trx_id = self._to_int(row.get("id"), 0)
            if trx_id <= 0:
                continue
            trx_id = int(trx_id)
            if trx_id not in main_ids:
                continue
            if trx_id in seen_ids:
                continue
            seen_ids.add(trx_id)
            scoped_rows.append(dict(row))
        return scoped_rows

    def _update_cursor(self, table_name: str, last_id: int):
        conn = self._connect()
        cur = conn.cursor()
        server_hash = get_current_server_hash()
        cur.execute(
            """
            INSERT INTO export_cursor_table_scoped (table_name, server_hash, last_id)
            VALUES (?, ?, ?)
            ON CONFLICT(table_name, server_hash) DO UPDATE SET last_id = excluded.last_id
            """,
            (table_name, server_hash, int(last_id)),
        )
        conn.commit()
        conn.close()

    def _create_flux(self, table_name: str, batch_end: str, last_id: int, max_id: int):
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        server_hash = get_current_server_hash()
        conn = self._connect()
        cur = conn.cursor()
        cur.execute(
            """
            INSERT INTO export_flux (batch_start, batch_end, table_name, server_hash, file_seq, row_count, status, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?, 0, 'PENDING', ?, ?)
            """,
            (f"{last_id}-{max_id}", batch_end, table_name, server_hash, None, now, now)
        )
        flux_id = cur.lastrowid
        conn.commit()
        conn.close()
        return flux_id

    def _mark_flux_success(self, flux_id, row_count, file_path, file_seq=None, file_hash=None, file_size=None):
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        conn = self._connect()
        cur = conn.cursor()
        cur.execute(
            """
            UPDATE export_flux
            SET status = 'SUCCESS', row_count = ?, file_path = ?, file_seq = ?, file_hash = ?, file_size = ?, updated_at = ?, error_log = NULL
            WHERE id = ?
            """,
            (row_count, file_path, file_seq, file_hash, file_size, now, flux_id)
        )
        conn.commit()
        conn.close()

    def _mark_flux_retry(self, flux_id, error_log, attempt_count=None, next_retry_at=None, error_code=None):
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        conn = self._connect()
        cur = conn.cursor()
        cur.execute(
            """
            UPDATE export_flux
            SET status = 'RETRY', error_log = ?, attempt_count = COALESCE(?, attempt_count), next_retry_at = COALESCE(?, next_retry_at), error_code = COALESCE(?, error_code), updated_at = ?
            WHERE id = ?
            """,
            (error_log, attempt_count, next_retry_at, error_code, now, flux_id)
        )
        conn.commit()
        conn.close()

    def _get_retry_state(self, table_name: str, server_hash: str):
        conn = self._connect()
        cur = conn.cursor()
        cur.execute(
            """
            SELECT attempt_count, next_retry_at, error_code, last_error
            FROM export_retry_state
            WHERE table_name = ? AND server_hash = ?
            """,
            (table_name, server_hash),
        )
        row = cur.fetchone()
        conn.close()
        if not row:
            return None
        return {
            "attempt_count": int(row["attempt_count"] or 0),
            "next_retry_at": row["next_retry_at"],
            "error_code": row["error_code"],
            "last_error": row["last_error"],
        }

    def _allow_retry_now(self, table_name: str, server_hash: str):
        state = self._get_retry_state(table_name, server_hash)
        if not state:
            return True, None
        next_retry_at = state.get("next_retry_at")
        if not next_retry_at:
            return True, state
        try:
            next_dt = datetime.strptime(str(next_retry_at), "%Y-%m-%d %H:%M:%S")
            if datetime.now() >= next_dt:
                return True, state
            return False, state
        except ValueError:
            return True, state

    def _record_retry_state(self, table_name: str, server_hash: str, error_log: str, error_code: str):
        current = self._get_retry_state(table_name, server_hash) or {}
        current_attempt = int(current.get("attempt_count") or 0)
        attempt_count = max(1, current_attempt + 1)
        backoff_sec = self._calculate_backoff_seconds(attempt_count)
        next_retry_at = (datetime.now() + timedelta(seconds=backoff_sec)).strftime("%Y-%m-%d %H:%M:%S")
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

        conn = self._connect()
        cur = conn.cursor()
        cur.execute(
            """
            INSERT INTO export_retry_state (table_name, server_hash, attempt_count, next_retry_at, error_code, last_error, updated_at)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(table_name, server_hash) DO UPDATE SET
                attempt_count = excluded.attempt_count,
                next_retry_at = excluded.next_retry_at,
                error_code = excluded.error_code,
                last_error = excluded.last_error,
                updated_at = excluded.updated_at
            """,
            (table_name, server_hash, attempt_count, next_retry_at, error_code, error_log, now),
        )
        conn.commit()
        conn.close()
        return {
            "attempt_count": attempt_count,
            "next_retry_at": next_retry_at,
            "error_code": error_code,
        }

    def _clear_retry_state(self, table_name: str, server_hash: str):
        conn = self._connect()
        cur = conn.cursor()
        cur.execute(
            "DELETE FROM export_retry_state WHERE table_name = ? AND server_hash = ?",
            (table_name, server_hash),
        )
        conn.commit()
        conn.close()

    def _calculate_backoff_seconds(self, attempt_count: int) -> int:
        config = read_config()
        base = max(1.0, float(config.get("export_retry_backoff_base_sec") or config.get("http_backoff_sec") or 1.0))
        factor = max(1.0, float(config.get("export_retry_backoff_factor") or 2.0))
        max_sec = max(base, float(config.get("export_retry_backoff_max_sec") or 300.0))
        retry_max_attempt = int(config.get("export_retry_max_attempt") or 0)

        if retry_max_attempt > 0:
            attempt_count = min(attempt_count, retry_max_attempt)

        raw = base * (factor ** max(0, attempt_count - 1))
        return int(min(max_sec, raw))

    def _build_error_code(self, error_obj) -> str:
        message = str(error_obj or "").lower()
        if "locked" in message:
            return "SQLITE_LOCKED"
        if "timeout" in message:
            return "TIMEOUT"
        if "cursor" in message and "max_id" in message:
            return "CURSOR_AHEAD_OF_MAX_ID"
        if "network" in message or "connection" in message:
            return "NETWORK_ERROR"
        return "EXPORT_ERROR"

    def _fetch_table_rows(self, table_name: str, last_id: int, max_id: int, limit: int = 0):
        table_sql = self._quote_identifier(table_name)
        conn = self._connect()
        try:
            cur = conn.cursor()
            query = f"SELECT * FROM {table_sql} WHERE id > ? AND id <= ? ORDER BY id ASC"  # nosec B608
            params = [int(last_id), int(max_id)]
            if limit and limit > 0:
                query += " LIMIT ?"
                params.append(int(limit))
            cur.execute(query, tuple(params))
            rows = [dict(r) for r in cur.fetchall()]
            return rows
        finally:
            conn.close()

    def _fetch_transaksi_data_by_transaksi_ids(self, transaksi_ids: List[int], limit: int = 0):
        return self._get_snapshot_query_service().fetch_transaksi_data_by_transaksi_ids(
            transaksi_ids=transaksi_ids,
            limit=limit,
        )

    def _fetch_rows_by_foreign_key(self, table_name: str, key_column: str, key_values: List[int], order_column: str = "id", limit: int = 0):
        return self._get_snapshot_query_service().fetch_rows_by_foreign_key(
            table_name=table_name,
            key_column=key_column,
            key_values=key_values,
            order_column=order_column,
            limit=limit,
        )

    def _fetch_table_rows_by_column(self, table_name: str, cursor_column: str, last_id: int, max_id: int, limit: int = 0):
        table_sql = self._quote_identifier(table_name)
        cursor_sql = self._quote_identifier(cursor_column)
        conn = self._connect()
        try:
            cur = conn.cursor()
            query = (
                f"SELECT * FROM {table_sql} "  # nosec B608
                f"WHERE {cursor_sql} > ? AND {cursor_sql} <= ? "
                f"ORDER BY {cursor_sql} ASC"
            )
            params = [int(last_id), int(max_id)]
            if limit and limit > 0:
                query += " LIMIT ?"
                params.append(int(limit))
            cur.execute(query, tuple(params))
            return [dict(r) for r in cur.fetchall()]
        finally:
            conn.close()

    def _to_int(self, value, default: int = 0) -> int:
        return TransactionExportValueUtils.to_int(value, default)

    def _to_float(self, value, default: float = 0.0) -> float:
        return TransactionExportValueUtils.to_float(value, default)

    def _parse_semicolon_log(self, raw_text: Any) -> Dict[str, str]:
        return TransactionExportValueUtils.parse_semicolon_log(raw_text)

    # edited by glg
    # Normalisasi diskon per baris detail menjadi nominal line discount (Rp).
    # Prioritas sumber: nominal eksplisit -> persen -> fallback harga_disc.
    def _resolve_detail_discount_nominal(self, detail_row: Dict[str, Any]) -> int:
        row = detail_row if isinstance(detail_row, dict) else {}
        qty = self._to_int(row.get("valid_qty"), self._to_int(row.get("produk_ord_jml"), 0))
        qty = max(0, qty)
        if qty <= 0:
            return 0

        harga_jual = max(0, self._to_int(row.get("produk_ord_hrg"), 0))
        subtotal = max(0, harga_jual * qty)
        if subtotal <= 0:
            return 0

        nominal_explicit = self._to_float(
            row.get("produk_ord_diskon_nominal"),
            self._to_float(row.get("produk_ord_diskon_khusus"), -1.0),
        )
        if nominal_explicit > 0:
            return int(max(0, min(round(nominal_explicit), subtotal)))

        persen_explicit = self._to_float(
            row.get("produk_ord_diskon_persen"),
            self._to_float(row.get("produk_ord_diskon"), 0.0),
        )
        if persen_explicit > 0:
            if persen_explicit <= 100.0:
                nominal_from_percent = float(subtotal) * (float(persen_explicit) / 100.0)
            else:
                # Legacy fallback: nilai >100 dianggap nominal.
                nominal_from_percent = float(persen_explicit)
            return int(max(0, min(round(nominal_from_percent), subtotal)))

        harga_disc = self._to_int(row.get("harga_disc"), -1)
        if harga_disc >= 0 and harga_disc < harga_jual:
            nominal_from_price = max(0, (harga_jual - harga_disc) * qty)
            return int(max(0, min(nominal_from_price, subtotal)))
        return 0

    # edited by glg
    def _sum_diskon_produk_from_detail_rows(self, detail_rows: List[Dict[str, Any]]) -> int:
        total = 0
        for detail in detail_rows or []:
            if not isinstance(detail, dict):
                continue
            tipe_produk = str(detail.get("produk_jenis") or "").strip().lower()
            if tipe_produk == "free_produk":
                continue
            total += max(0, self._resolve_detail_discount_nominal(detail))
        return int(max(0, total))

    # edited by glg
    def _build_diskon_produk_hint_map_by_transaksi(
        self,
        detail_rows: List[Dict[str, Any]],
    ) -> Dict[int, int]:
        mapped: Dict[int, int] = {}
        for detail in detail_rows or []:
            if not isinstance(detail, dict):
                continue
            trx_id = self._to_int(detail.get("transaksi_id"), 0)
            if trx_id <= 0:
                continue
            tipe_produk = str(detail.get("produk_jenis") or "").strip().lower()
            if tipe_produk == "free_produk":
                continue
            mapped[trx_id] = mapped.get(trx_id, 0) + max(0, self._resolve_detail_discount_nominal(detail))
        return {int(k): int(max(0, v)) for k, v in mapped.items() if int(k) > 0}

    # edited by glg
    def _resolve_transaksi_bruto(self, transaksi_row: Dict[str, Any]) -> int:
        row = transaksi_row if isinstance(transaksi_row, dict) else {}
        bruto = self._to_int(row.get("transaksi_bulat"), 0)
        if bruto <= 0:
            bruto = self._to_int(row.get("transaksi_nilai_ori"), 0)
        if bruto <= 0:
            bruto = self._to_int(row.get("transaksi_nilai"), 0)
        return max(0, bruto)

    # edited by glg
    def _resolve_transaksi_tagihan(self, transaksi_row: Dict[str, Any], bruto_default: int = 0) -> int:
        row = transaksi_row if isinstance(transaksi_row, dict) else {}
        return max(0, self._to_int(row.get("transaksi_nilai"), max(0, int(bruto_default or 0))))

    # edited by glg
    # Normalisasi komponen diskon agar konsisten:
    # total diskon = diskon produk + diskon member + diskon tambahan.
    def _resolve_transaksi_discount_components(
        self,
        transaksi_row: Dict[str, Any],
        *,
        diskon_produk_hint: int = 0,
        bruto_hint: int = 0,
        tagihan_hint: int = 0,
    ) -> Dict[str, int]:
        row = transaksi_row if isinstance(transaksi_row, dict) else {}
        bruto = max(0, int(bruto_hint or self._resolve_transaksi_bruto(row)))
        tagihan = max(0, int(tagihan_hint or self._resolve_transaksi_tagihan(row, bruto)))
        diskon_member = max(
            0,
            self._to_int(self._parse_semicolon_log(row.get("diskon_log")).get("diskon_customer"), 0),
        )
        diskon_produk = max(0, self._to_int(diskon_produk_hint, 0))
        diskon_total = max(0, self._to_int(row.get("diskon_nilai"), 0))
        inferred_total = max(0, bruto - tagihan)
        if diskon_total <= 0 and inferred_total > 0:
            diskon_total = inferred_total

        diskon_tambahan_override = self._to_int(row.get("_diskon_tambahan_total"), -1)
        has_tambahan_override = diskon_tambahan_override >= 0
        diskon_tambahan_master = (
            max(0, diskon_tambahan_override)
            if has_tambahan_override
            else max(
                0,
                self._to_int(
                    row.get("tambahan_nilai"),
                    self._to_int(row.get("add_disc"), 0),
                ),
            )
        )
        # edited by glg
        # Guard data lama:
        # jika diskon_total master lebih besar dari selisih bruto-tagihan, sementara
        # tidak ada additional discount eksplisit, prioritaskan nilai selisih agar
        # diskon produk tidak duplikat ke diskon tambahan.
        if (
            inferred_total > 0
            and diskon_tambahan_master <= 0
            and (diskon_produk > 0 or diskon_member > 0)
            and diskon_total > inferred_total
        ):
            diskon_total = inferred_total

        diskon_tambahan_balance = 0
        if diskon_total >= (diskon_produk + diskon_member):
            diskon_tambahan_balance = max(0, diskon_total - diskon_produk - diskon_member)

        if has_tambahan_override:
            diskon_tambahan = diskon_tambahan_master
        elif diskon_total <= 0:
            diskon_tambahan = diskon_tambahan_master
        elif diskon_produk > 0:
            # Jika diskon produk sudah terukur dari detail, gunakan residual total.
            diskon_tambahan = diskon_tambahan_balance
        else:
            diskon_tambahan = (
                diskon_tambahan_balance
                if diskon_tambahan_balance > 0
                else diskon_tambahan_master
            )

        if diskon_produk <= 0 and diskon_total > 0:
            diskon_produk = max(0, diskon_total - diskon_member - diskon_tambahan)

        return {
            "diskon_total": int(max(0, diskon_total)),
            "diskon_produk": int(max(0, diskon_produk)),
            "diskon_member": int(max(0, diskon_member)),
            "diskon_tambahan": int(max(0, diskon_tambahan)),
        }

    # edited by glg
    def _resolve_transaksi_netto_for_settlement(self, transaksi_row: Dict[str, Any]) -> int:
        row = transaksi_row if isinstance(transaksi_row, dict) else {}
        bruto = self._resolve_transaksi_bruto(row)
        tagihan = self._resolve_transaksi_tagihan(row, bruto)
        if tagihan > 0:
            return int(tagihan)
        discount = self._resolve_transaksi_discount_components(
            row,
            diskon_produk_hint=0,
            bruto_hint=bruto,
            tagihan_hint=tagihan,
        )
        netto_formula = max(
            0,
            bruto
            - int(discount.get("diskon_produk") or 0)
            - int(discount.get("diskon_member") or 0)
            - int(discount.get("diskon_tambahan") or 0),
        )
        if netto_formula <= 0 and tagihan > 0:
            return int(tagihan)
        if tagihan > 0 and abs(int(tagihan) - int(netto_formula)) <= 1:
            return int(tagihan)
        if netto_formula > 0:
            return int(netto_formula)
        return int(max(0, tagihan))

    def _normalize_dtime_value(self, raw_value: Any, fallback_value: str) -> str:
        return TransactionExportValueUtils.normalize_dtime_value(raw_value, fallback_value)

    # edited by glg
    def _parse_int_list(self, raw_value: Any) -> List[int]:
        return TransactionExportValueUtils.parse_int_list(
            raw_value,
            to_int_fn=self._to_int,
            blank_checker=self._is_blank_value,
        )

    # edited by glg
    def _merge_unique_int_list(self, base: List[Any], extra: List[Any]) -> List[int]:
        return TransactionExportValueUtils.merge_unique_int_list(
            base,
            extra,
            to_int_fn=self._to_int,
        )

    # edited by glg
    # Hitung jumlah transaksi unik agar id yang overlap antara id_penjualan dan id_return
    # (mis. pembatalan memakai transaksi_id) tidak dihitung ganda.
    def _count_unique_settlement_transactions(self, id_penjualan: List[Any], id_return: List[Any]) -> int:
        return TransactionExportValueUtils.count_unique_settlement_transactions(
            id_penjualan,
            id_return,
            merge_fn=self._merge_unique_int_list,
        )

    # edited by glg
    # Hitung jumlah event return/pembatalan untuk kebutuhan event-count settlement.
    def _count_settlement_return_events(self, return_rows: List[Dict[str, Any]]) -> int:
        total_event = 0
        for row in return_rows or []:
            if not isinstance(row, dict):
                continue
            transaksi_id = self._to_int(row.get("transaksi_id"), 0)
            if transaksi_id <= 0:
                continue
            total_event += 1
        return int(max(0, total_event))

    # edited by glg
    def _infer_settlement_payment_method(self, transaksi_row: Dict[str, Any], is_non_tunai: bool) -> str:
        row = transaksi_row if isinstance(transaksi_row, dict) else {}
        raw_bank = str(row.get("bank_nama") or "").strip().lower()
        raw_payment = str(row.get("pembayaran") or "").strip().lower()
        raw_sys = str(row.get("pembayaran_sys") or "").strip().lower()
        joined = " ".join([raw_bank, raw_payment, raw_sys]).strip()

        if not is_non_tunai and (joined in {"", "tunai", "cash"}):
            return "cash"
        if "credit" in joined or "kredit" in joined:
            return "credit card"
        if "debit" in joined:
            return "debit card"
        if "qris" in joined or "qr" in joined:
            return "qris"
        return "non cash" if is_non_tunai else "cash"

    # edited by glg
    def _build_settlement_payment_payload(
        self,
        transaksi_rows: List[Dict[str, Any]],
        total_disetor: int,
        total_return_penjualan: int,
        return_rows: List[Dict[str, Any]] = None,
        transaksi_lookup_rows: List[Dict[str, Any]] = None,
    ) -> Dict[str, Dict[str, Any]]:
        grouped: Dict[str, Dict[str, Any]] = {}
        payment_utils = self._get_settlement_payment_utils()

        for transaksi in transaksi_rows or []:
            if not isinstance(transaksi, dict):
                continue
            # edited by glg
            # Nilai diterima wajib berbasis netto transaksi (bukan uang tender).
            netto_tagihan = max(0, self._resolve_transaksi_netto_for_settlement(transaksi))
            total_bayar_raw = max(0, self._to_int(transaksi.get("transaksi_dibayar"), 0))
            non_tunai_raw = max(0, self._to_int(transaksi.get("pembayaran_non_tunai"), 0))
            tunai, non_tunai = payment_utils.normalize_tunai_non_tunai(
                netto_tagihan=netto_tagihan,
                total_bayar_raw=total_bayar_raw,
                non_tunai_raw=non_tunai_raw,
            )

            if tunai > 0:
                entry = payment_utils.ensure_entry(grouped, "108", "cash", "")
                entry["jml_payment"] += 1
                entry["nilai_diterima"] += tunai

            if non_tunai > 0:
                account_id = self._to_int(transaksi.get("bank_rekening_id"), 110)
                if account_id <= 0:
                    account_id = 110
                metode = self._infer_settlement_payment_method(transaksi, is_non_tunai=True)
                bank_nama = str(transaksi.get("bank_nama") or "").strip()
                bank_field = bank_nama if metode == "debit card" else ""
                entry = payment_utils.ensure_entry(grouped, str(account_id), metode, bank_field)
                entry["jml_payment"] += 1
                entry["nilai_diterima"] += non_tunai

        if not grouped:
            payment_utils.ensure_entry(grouped, "108", "cash", "")

        cash_keys = [k for k, v in grouped.items() if str(v.get("metode_pembayaran") or "") == "cash"]
        cash_target = None
        if cash_keys:
            cash_target = sorted(cash_keys, key=lambda x: self._to_int(x, 0))[0]
        else:
            cash_target = sorted(grouped.keys(), key=lambda x: self._to_int(x, 0))[0]

        refund_value = max(0, self._to_int(total_return_penjualan, 0))
        allocated_return_value = 0
        # edited by glg
        # Alokasi return_nilai per metode pembayaran berdasarkan transaksi asal return.
        lookup_rows = []
        for raw in (transaksi_rows or []):
            if isinstance(raw, dict):
                lookup_rows.append(raw)
        for raw in (transaksi_lookup_rows or []):
            if isinstance(raw, dict):
                lookup_rows.append(raw)
        transaksi_map = {
            self._to_int(row.get("id"), 0): row
            for row in lookup_rows
            if isinstance(row, dict) and self._to_int(row.get("id"), 0) > 0
        }
        normalized_return_rows = [dict(row) for row in (return_rows or []) if isinstance(row, dict)]
        for return_row in normalized_return_rows:
            nominal_return = payment_utils.resolve_return_nominal(return_row, self._to_int)
            if nominal_return <= 0:
                continue
            transaksi_id = self._to_int(return_row.get("transaksi_id"), 0)
            transaksi_asal = transaksi_map.get(transaksi_id) if transaksi_id > 0 else None
            if isinstance(transaksi_asal, dict):
                total_bayar_raw = max(0, self._to_int(transaksi_asal.get("transaksi_dibayar"), 0))
                non_tunai_raw = max(0, self._to_int(transaksi_asal.get("pembayaran_non_tunai"), 0))
                if total_bayar_raw > 0 and non_tunai_raw > total_bayar_raw:
                    non_tunai_raw = total_bayar_raw
                tunai_raw = max(0, total_bayar_raw - non_tunai_raw)
                if total_bayar_raw > 0 and tunai_raw > 0 and non_tunai_raw > 0:
                    nominal_non_tunai = int(round((float(nominal_return) * float(non_tunai_raw)) / float(total_bayar_raw)))
                    nominal_non_tunai = max(0, min(nominal_non_tunai, nominal_return))
                    nominal_tunai = max(0, nominal_return - nominal_non_tunai)
                    if nominal_tunai > 0:
                        payment_utils.allocate_return_value(
                            grouped,
                            account_id=cash_target,
                            nominal=nominal_tunai,
                            metode="cash",
                            bank_name="",
                            to_int_fn=self._to_int,
                        )
                    if nominal_non_tunai > 0:
                        account_id = self._to_int(transaksi_asal.get("bank_rekening_id"), 110)
                        if account_id <= 0:
                            account_id = 110
                        metode_non_tunai = self._infer_settlement_payment_method(transaksi_asal, is_non_tunai=True)
                        bank_nama = str(transaksi_asal.get("bank_nama") or "").strip()
                        bank_field = bank_nama if metode_non_tunai == "debit card" else ""
                        payment_utils.allocate_return_value(
                            grouped,
                            account_id=str(account_id),
                            nominal=nominal_non_tunai,
                            metode=metode_non_tunai,
                            bank_name=bank_field,
                            to_int_fn=self._to_int,
                        )
                    allocated_return_value += nominal_return
                    continue
                if non_tunai_raw > 0:
                    account_id = self._to_int(transaksi_asal.get("bank_rekening_id"), 110)
                    if account_id <= 0:
                        account_id = 110
                    metode_non_tunai = self._infer_settlement_payment_method(transaksi_asal, is_non_tunai=True)
                    bank_nama = str(transaksi_asal.get("bank_nama") or "").strip()
                    bank_field = bank_nama if metode_non_tunai == "debit card" else ""
                    payment_utils.allocate_return_value(
                        grouped,
                        account_id=str(account_id),
                        nominal=nominal_return,
                        metode=metode_non_tunai,
                        bank_name=bank_field,
                        to_int_fn=self._to_int,
                    )
                    allocated_return_value += nominal_return
                    continue
                payment_utils.allocate_return_value(
                    grouped,
                    account_id=cash_target,
                    nominal=nominal_return,
                    metode="cash",
                    bank_name="",
                    to_int_fn=self._to_int,
                )
                allocated_return_value += nominal_return
                continue

            refund_method = str(return_row.get("refund_method") or "").strip().lower()
            if refund_method in {"voucher", "credit", "credit card", "debit", "debit card", "non cash", "non_tunai", "qris"}:
                account_id = 110
                if "credit" in refund_method:
                    metode_non_tunai = "credit card"
                elif "debit" in refund_method:
                    metode_non_tunai = "debit card"
                elif "qris" in refund_method:
                    metode_non_tunai = "qris"
                elif refund_method == "voucher":
                    metode_non_tunai = "voucher"
                else:
                    metode_non_tunai = "non cash"
                payment_utils.allocate_return_value(
                    grouped,
                    account_id=str(account_id),
                    nominal=nominal_return,
                    metode=metode_non_tunai,
                    bank_name="",
                    to_int_fn=self._to_int,
                )
            else:
                payment_utils.allocate_return_value(
                    grouped,
                    account_id=cash_target,
                    nominal=nominal_return,
                    metode="cash",
                    bank_name="",
                    to_int_fn=self._to_int,
                )
            allocated_return_value += nominal_return

        if refund_value > allocated_return_value and cash_target in grouped:
            grouped[cash_target]["return_nilai"] += int(refund_value - allocated_return_value)
        elif refund_value > 0 and allocated_return_value <= 0 and cash_target in grouped:
            grouped[cash_target]["return_nilai"] += refund_value

        settlement_cash = max(0, self._to_int(total_disetor, 0))
        for account_id, payload in grouped.items():
            metode = str(payload.get("metode_pembayaran") or "")
            nilai_diterima = max(0, self._to_int(payload.get("nilai_diterima"), 0))
            if metode == "cash" and account_id == cash_target:
                nilai_settlement = settlement_cash
            elif metode == "cash":
                nilai_settlement = 0
            else:
                nilai_settlement = nilai_diterima
            payload["nilai_diterima"] = nilai_diterima
            payload["nilai_settlement"] = max(0, self._to_int(nilai_settlement, 0))
            payload["return_nilai"] = max(0, self._to_int(payload.get("return_nilai"), 0))
            payload["jml_payment"] = max(0, self._to_int(payload.get("jml_payment"), 0))
            payload["selisih_settlement"] = int(payload["nilai_settlement"] - payload["nilai_diterima"])
            payment_utils.apply_payment_legacy_aliases(payload, to_int_fn=self._to_int)

        sorted_keys = sorted(grouped.keys(), key=lambda x: (self._to_int(x, 10**9), str(x)))
        return {key: grouped[key] for key in sorted_keys}

    # edited by glg
    def _fetch_transaksi_rows_for_ids(self, transaksi_ids: List[int]) -> List[Dict[str, Any]]:
        normalized_ids = self._parse_int_list(transaksi_ids)
        if not normalized_ids or not self._table_exists("transaksi"):
            return []
        return self._fetch_rows_by_foreign_key(
            table_name="transaksi",
            key_column="id",
            key_values=normalized_ids,
            order_column="id",
            limit=0,
        )

    # edited by glg
    def _fetch_return_summary_for_transaksi_ids(
        self,
        transaksi_ids: List[int],
        fallback_total: int = 0,
        settlement_dtime: str = "",
        settlement_window_start: str = "",
        kasir_name: str = "",
    ) -> Dict[str, Any]:
        normalized_ids = self._parse_int_list(transaksi_ids)
        fallback_total_val = max(0, self._to_int(fallback_total, 0))
        has_return_table = self._table_exists("return_transaksi_penjualan")
        has_cancel_table = self._table_exists("pembatalan_transaksi_history")
        if not normalized_ids and not has_cancel_table:
            return {"ids": [], "total": fallback_total_val, "rows": []}
        if not has_return_table and not has_cancel_table:
            return {"ids": [], "total": fallback_total_val, "rows": []}

        conn = self._connect()
        try:
            cur = conn.cursor()
            return_ids: List[int] = []
            total_return = 0
            return_rows: List[Dict[str, Any]] = []
            seen_event_keys = set()

            # edited by glg
            # Sumber return utama dari tabel return_transaksi_penjualan.
            if has_return_table and normalized_ids:
                placeholders = ",".join(["?"] * len(normalized_ids))
                try:
                    query = (
                        "SELECT id, transaksi_id, total_return, refund_amount, refund_method "  # nosec B608
                        "FROM return_transaksi_penjualan "
                        f"WHERE transaksi_id IN ({placeholders}) "
                        "ORDER BY id ASC"
                    ) 
                    cur.execute(
                        query,
                        tuple(normalized_ids),
                    )
                except sqlite3.OperationalError:
                    query = (
                        "SELECT id, transaksi_id, total_return, refund_amount "  # nosec B608
                        "FROM return_transaksi_penjualan "
                        f"WHERE transaksi_id IN ({placeholders}) "
                        "ORDER BY id ASC"
                    ) 
                    cur.execute(
                        query,
                        tuple(normalized_ids),
                    )
                rows = cur.fetchall() or []
                for row in rows:
                    data = dict(row)
                    return_id = self._to_int(data.get("id"), 0)
                    transaksi_id = int(max(0, self._to_int(data.get("transaksi_id"), 0)))
                    if transaksi_id <= 0:
                        continue
                    nominal = self._to_int(data.get("refund_amount"), self._to_int(data.get("total_return"), 0))
                    event_key = ("return", int(max(0, return_id)), transaksi_id)
                    if event_key in seen_event_keys:
                        continue
                    seen_event_keys.add(event_key)
                    id_payload = int(max(0, return_id))
                    if id_payload > 0:
                        return_ids.append(id_payload)
                    if nominal > 0:
                        total_return += nominal
                    return_rows.append(
                        {
                            "id": int(max(0, return_id)),
                            "transaksi_id": transaksi_id,
                            "refund_amount": int(max(0, nominal)),
                            "total_return": int(max(0, self._to_int(data.get("total_return"), 0))),
                            "refund_method": str(data.get("refund_method") or "cash").strip().lower() or "cash",
                            "event_source": "return",
                            "id_return_payload": id_payload,
                        }
                    )

            # edited by glg
            # Samakan definisi return settlement:
            # pembatalan full nota diperlakukan sebagai event return.
            if has_cancel_table:
                cancel_meta = self._get_table_schema_meta("pembatalan_transaksi_history")
                cancel_cols = set(cancel_meta.keys())
                cancel_clauses = ["1=1"]
                cancel_params: List[Any] = []
                window_start = str(settlement_window_start or "").strip()
                settle_time = str(settlement_dtime or "").strip()
                kasir_hint = str(kasir_name or "").strip()
                if window_start and "cancel_dtime" in cancel_cols:
                    cancel_clauses.append("datetime(COALESCE(cancel_dtime, '1970-01-01 00:00:00')) > datetime(?)")
                    cancel_params.append(window_start)
                if settle_time and "cancel_dtime" in cancel_cols:
                    cancel_clauses.append("datetime(COALESCE(cancel_dtime, '1970-01-01 00:00:00')) <= datetime(?)")
                    cancel_params.append(settle_time)
                if settle_time and not window_start and "cancel_dtime" in cancel_cols:
                    cancel_clauses.append("date(COALESCE(cancel_dtime, '1970-01-01')) = date(?)")
                    cancel_params.append(settle_time)
                if kasir_hint and "kasir_nama" in cancel_cols:
                    cancel_clauses.append("LOWER(COALESCE(kasir_nama, '')) = LOWER(?)")
                    cancel_params.append(kasir_hint)
                cancel_where = " AND ".join(cancel_clauses)
                kasir_select = "kasir_nama" if "kasir_nama" in cancel_cols else "'' AS kasir_nama"
                cancel_time_select = "cancel_dtime" if "cancel_dtime" in cancel_cols else "'' AS cancel_dtime"
                query = (
                    "SELECT id, transaksi_id, transaksi_nilai, "  # nosec B608
                    f"{cancel_time_select}, {kasir_select} "
                    "FROM pembatalan_transaksi_history "
                    f"WHERE {cancel_where} "
                    "ORDER BY id ASC"
                ) 
                cur.execute(query, tuple(cancel_params))
                cancel_rows = cur.fetchall() or []
                for row in cancel_rows:
                    data = dict(row)
                    cancel_id = self._to_int(data.get("id"), 0)
                    transaksi_id = self._to_int(data.get("transaksi_id"), 0)
                    if transaksi_id <= 0:
                        continue
                    event_key = ("cancel", int(max(0, cancel_id)), int(transaksi_id))
                    if event_key in seen_event_keys:
                        continue
                    seen_event_keys.add(event_key)
                    nominal = max(0, self._to_int(data.get("transaksi_nilai"), 0))
                    if nominal > 0:
                        total_return += nominal
                    # edited by glg
                    # Keputusan final mapping id_return pembatalan:
                    # gunakan transaksi_id yang dibatalkan (bukan id history internal),
                    # agar payload langsung mereferensikan transaksi asal yang direturn.
                    id_payload = int(transaksi_id)
                    if id_payload > 0:
                        return_ids.append(id_payload)
                    return_rows.append(
                        {
                            "id": int(max(0, cancel_id)),
                            "transaksi_id": int(transaksi_id),
                            "refund_amount": int(max(0, nominal)),
                            "total_return": int(max(0, nominal)),
                            "refund_method": "cash",
                            "event_source": "cancellation",
                            "id_return_payload": id_payload,
                        }
                    )
            if total_return <= 0:
                total_return = fallback_total_val
            else:
                total_return = max(total_return, fallback_total_val)
            return {
                "ids": self._parse_int_list(return_ids),
                "total": max(0, total_return),
                "rows": return_rows,
            }
        except sqlite3.Error:
            return {"ids": [], "total": fallback_total_val, "rows": []}
        finally:
            conn.close()

    # edited by glg
    # Ambil transaksi return terbaru berdasarkan jendela batch export transaksi
    # agar baris return ikut terkirim dalam file transaksi.
    def _fetch_return_rows_for_transaksi_export(
        self,
        start_batch_end: str,
        end_batch_end: str,
        limit: int = 0,
    ) -> List[Dict[str, Any]]:
        has_return_table = self._table_exists("return_transaksi_penjualan")
        has_cancel_table = self._table_exists("pembatalan_transaksi_history")
        if not has_return_table and not has_cancel_table:
            return []
        conn = self._connect()
        try:
            cur = conn.cursor()
            events: List[Dict[str, Any]] = []
            seen_keys = set()
            start_val = str(start_batch_end or "").strip()
            end_val = str(end_batch_end or "").strip()
            transaksi_meta = self._get_table_schema_meta("transaksi")
            transaksi_cols = set(transaksi_meta.keys())

            # edited by glg
            # Seleksi kolom transaksi harus dinamis agar schema outlet lama
            # yang belum punya machine_id/cpu_info/com_info tidak melempar error.
            transaksi_nomer_return_sql = (
                "t.nomer AS transaksi_nomer"
                if "nomer" in transaksi_cols
                else "'' AS transaksi_nomer"
            )
            transaksi_nomer_cancel_sql = (
                "COALESCE(t.nomer, c.nomer, '') AS transaksi_nomer"
                if "nomer" in transaksi_cols
                else "COALESCE(c.nomer, '') AS transaksi_nomer"
            )
            transaksi_cabang_id_sql = (
                "t.cabang_id AS transaksi_cabang_id"
                if "cabang_id" in transaksi_cols
                else "0 AS transaksi_cabang_id"
            )
            transaksi_cabang_nama_sql = (
                "t.cabang_nama AS transaksi_cabang_nama"
                if "cabang_nama" in transaksi_cols
                else "'' AS transaksi_cabang_nama"
            )
            transaksi_gudang_id_sql = (
                "t.gudang_id AS transaksi_gudang_id"
                if "gudang_id" in transaksi_cols
                else "0 AS transaksi_gudang_id"
            )
            transaksi_oleh_id_sql = (
                "t.oleh_id AS transaksi_oleh_id"
                if "oleh_id" in transaksi_cols
                else "0 AS transaksi_oleh_id"
            )
            transaksi_oleh_nama_return_sql = (
                "t.oleh_nama AS transaksi_oleh_nama"
                if "oleh_nama" in transaksi_cols
                else "'' AS transaksi_oleh_nama"
            )
            transaksi_oleh_nama_cancel_sql = (
                "COALESCE(t.oleh_nama, c.kasir_nama, '') AS transaksi_oleh_nama"
                if "oleh_nama" in transaksi_cols
                else "COALESCE(c.kasir_nama, '') AS transaksi_oleh_nama"
            )
            transaksi_machine_id_sql = (
                "t.machine_id AS transaksi_machine_id"
                if "machine_id" in transaksi_cols
                else "'' AS transaksi_machine_id"
            )
            transaksi_cpu_info_sql = (
                "t.cpu_info AS transaksi_cpu_info"
                if "cpu_info" in transaksi_cols
                else "'' AS transaksi_cpu_info"
            )
            transaksi_com_info_sql = (
                "t.com_info AS transaksi_com_info"
                if "com_info" in transaksi_cols
                else "'' AS transaksi_com_info"
            )

            # edited by glg
            # Samakan collector return export dengan settlement:
            # gabung return_transaksi_penjualan + pembatalan_transaksi_history.
            if has_return_table:
                return_clauses: List[str] = []
                return_params: List[Any] = []
                if start_val:
                    return_clauses.append("datetime(COALESCE(r.tanggal_return, '1970-01-01 00:00:00')) > datetime(?)")
                    return_params.append(start_val)
                if end_val:
                    return_clauses.append("datetime(COALESCE(r.tanggal_return, '1970-01-01 00:00:00')) <= datetime(?)")
                    return_params.append(end_val)
                return_where_sql = f"WHERE {' AND '.join(return_clauses)}" if return_clauses else ""
                cur.execute(
                    (
                        "SELECT r.*, "  # nosec B608
                        f"{transaksi_nomer_return_sql}, "
                        f"{transaksi_cabang_id_sql}, "
                        f"{transaksi_cabang_nama_sql}, "
                        f"{transaksi_gudang_id_sql}, "
                        f"{transaksi_oleh_id_sql}, "
                        f"{transaksi_oleh_nama_return_sql}, "
                        f"{transaksi_machine_id_sql}, "
                        f"{transaksi_cpu_info_sql}, "
                        f"{transaksi_com_info_sql}, "
                        "'return' AS event_source "
                        "FROM return_transaksi_penjualan r "
                        "LEFT JOIN transaksi t ON CAST(r.transaksi_id AS INTEGER) = t.id "
                        f"{return_where_sql} "
                        "ORDER BY datetime(COALESCE(r.tanggal_return, '1970-01-01 00:00:00')) ASC, r.id ASC"
                    ),
                    tuple(return_params),
                )
                rows = cur.fetchall() or []
                for row in rows:
                    data = dict(row)
                    event_key = (
                        "return",
                        self._to_int(data.get("id"), 0),
                        self._to_int(data.get("transaksi_id"), 0),
                    )
                    if event_key in seen_keys:
                        continue
                    seen_keys.add(event_key)
                    events.append(data)

            if has_cancel_table:
                cancel_clauses: List[str] = []
                cancel_params: List[Any] = []
                if start_val:
                    cancel_clauses.append("datetime(COALESCE(c.cancel_dtime, '1970-01-01 00:00:00')) > datetime(?)")
                    cancel_params.append(start_val)
                if end_val:
                    cancel_clauses.append("datetime(COALESCE(c.cancel_dtime, '1970-01-01 00:00:00')) <= datetime(?)")
                    cancel_params.append(end_val)
                cancel_where_sql = f"WHERE {' AND '.join(cancel_clauses)}" if cancel_clauses else ""
                cancellation_input_count = 0
                cancellation_valid_count = 0
                cur.execute(
                    (
                        "SELECT "  # nosec B608
                        "c.id AS id, "
                        "c.transaksi_id AS transaksi_id, "
                        "COALESCE(c.cancel_dtime, c.transaksi_dtime, '') AS tanggal_return, "
                        "COALESCE(c.transaksi_nilai, 0) AS total_return, "
                        "'' AS kode_voucher, "
                        "0 AS nilai_voucher, "
                        "'' AS customer_id, "
                        "COALESCE(c.nomer, '') AS keterangan, "
                        "'full' AS jenis_return, "
                        "'cash' AS refund_method, "
                        "COALESCE(c.transaksi_nilai, 0) AS refund_amount, "
                        f"{transaksi_nomer_cancel_sql}, "
                        f"{transaksi_cabang_id_sql}, "
                        f"{transaksi_cabang_nama_sql}, "
                        f"{transaksi_gudang_id_sql}, "
                        f"{transaksi_oleh_id_sql}, "
                        f"{transaksi_oleh_nama_cancel_sql}, "
                        f"{transaksi_machine_id_sql}, "
                        f"{transaksi_cpu_info_sql}, "
                        f"{transaksi_com_info_sql}, "
                        "'cancellation' AS event_source "
                        "FROM pembatalan_transaksi_history c "
                        "LEFT JOIN transaksi t ON CAST(c.transaksi_id AS INTEGER) = t.id "
                        f"{cancel_where_sql} "
                        "ORDER BY datetime(COALESCE(c.cancel_dtime, '1970-01-01 00:00:00')) ASC, c.id ASC"
                    ),
                    tuple(cancel_params),
                )
                rows = cur.fetchall() or []
                cancellation_input_count = len(rows)
                for row in rows:
                    data = dict(row)
                    event_key = (
                        "cancellation",
                        self._to_int(data.get("id"), 0),
                        self._to_int(data.get("transaksi_id"), 0),
                    )
                    if event_key in seen_keys:
                        continue
                    seen_keys.add(event_key)
                    events.append(data)
                    cancellation_valid_count += 1
                if cancellation_input_count > 0 and cancellation_valid_count <= 0:
                    LOGGER.warning(
                        "[DEBUG EXPORT] cancellation terdeteksi tetapi tidak ada event valid. window=%s..%s input=%s",
                        start_val or "-",
                        end_val or "-",
                        cancellation_input_count,
                    )

            events.sort(
                key=lambda row: (
                    str(
                        row.get("tanggal_return")
                        or row.get("cancel_dtime")
                        or ""
                    ),
                    int(self._to_int(row.get("id"), 0)),
                )
            )
            if limit and int(limit) > 0:
                events = events[: int(limit)]
            return [dict(row) for row in events if isinstance(row, dict)]
        except sqlite3.Error as exc:
            LOGGER.warning("[DEBUG EXPORT] gagal fetch return/cancellation transaksi export: %s", exc)
            return []
        finally:
            conn.close()

    # edited by glg
    def _fetch_detail_return_rows_for_return_ids(
        self,
        return_ids: List[int],
    ) -> List[Dict[str, Any]]:
        return self._get_snapshot_query_service().fetch_detail_return_rows_for_return_ids(
            return_ids=return_ids
        )

    # edited by glg
    def _build_legacy_return_items_payload(
        self,
        detail_rows: List[Dict[str, Any]],
        produk_map: Dict[str, Dict[str, Any]],
    ) -> Dict[str, Dict[str, Any]]:
        items_payload: Dict[str, Dict[str, Any]] = {}
        for detail in detail_rows or []:
            if not isinstance(detail, dict):
                continue
            produk_id = str(self._to_int(detail.get("produk_id"), 0))
            if not produk_id or produk_id == "0":
                continue
            qty = max(0, self._to_int(detail.get("jumlah"), 0))
            if qty <= 0:
                continue
            harga_jual = max(0, self._to_int(detail.get("harga"), 0))
            subtotal = max(0, self._to_int(detail.get("subtotal"), harga_jual * qty))
            produk_info = produk_map.get(produk_id) or {}
            nama_produk = str(
                detail.get("produk_nama")
                or produk_info.get("nama")
                or ""
            ).strip()
            existing = items_payload.get(produk_id)
            if existing:
                existing["jumlah"] = self._to_int(existing.get("jumlah"), 0) + qty
                existing["subtotal"] = self._to_int(existing.get("subtotal"), 0) + subtotal
                continue
            items_payload[produk_id] = {
                "jumlah": qty,
                "subtotal": subtotal,
                "satuan": str(produk_info.get("satuan") or "pcs"),
                "tipe_pajak": str(self._to_int(produk_info.get("tipe_pajak"), 1)),
                "tipe_produk": "return_item",
                "produk_id": produk_id,
                "nama": nama_produk,
                "satuan_id": str(self._to_int(produk_info.get("satuan_id"), 0)),
                "ppn": str(self._to_int(produk_info.get("ppn"), 0)),
                "ppn_produk": str(self._to_int(produk_info.get("ppn_produk"), 1)),
                "barcode": str(produk_info.get("barcode") or ""),
                "coa_code": str(produk_info.get("coa_code") or ""),
                "hpp": str(self._to_float(produk_info.get("hpp"), 0.0)),
                "discPersen": 0.0,
                "discNilai": 0,
                "harga_jual": harga_jual,
                "harga_disc": harga_jual,
                "ppnFactor": "0",
            }
        return items_payload

    # edited by glg
    # Fallback item return untuk pembatalan full nota:
    # jika detail return tidak tersedia, ambil detail dari transaksi_data transaksi asal.
    def _build_legacy_return_items_payload_from_transaksi_data(
        self,
        transaksi_data_rows: List[Dict[str, Any]],
        produk_map: Dict[str, Dict[str, Any]],
    ) -> Dict[str, Dict[str, Any]]:
        synthetic_rows: List[Dict[str, Any]] = []
        for detail in transaksi_data_rows or []:
            if not isinstance(detail, dict):
                continue
            produk_id = self._to_int(detail.get("produk_id"), 0)
            if produk_id <= 0:
                continue
            qty = max(
                0,
                self._to_int(
                    detail.get("valid_qty"),
                    self._to_int(detail.get("produk_ord_jml"), 0),
                ),
            )
            if qty <= 0:
                continue
            harga = max(0, self._to_int(detail.get("produk_ord_hrg"), 0))
            subtotal = max(0, harga * qty)
            synthetic_rows.append(
                {
                    "produk_id": int(produk_id),
                    "produk_nama": str(detail.get("produk_nama") or "").strip(),
                    "jumlah": int(qty),
                    "harga": int(harga),
                    "subtotal": int(subtotal),
                }
            )
        return self._build_legacy_return_items_payload(synthetic_rows, produk_map)

    # edited by glg
    # Ringkasan produk return supaya downstream mudah baca id/nama produk yang direturn.
    def _build_produk_return_list_from_items(
        self,
        items_payload: Dict[str, Dict[str, Any]],
    ) -> List[Dict[str, Any]]:
        out: List[Dict[str, Any]] = []
        if not isinstance(items_payload, dict):
            return out
        for key, item in items_payload.items():
            if not isinstance(item, dict):
                continue
            produk_id = str(item.get("produk_id") or key or "").strip()
            produk_nama = str(item.get("nama") or "").strip()
            if not produk_id and not produk_nama:
                continue
            out.append(
                {
                    "produk_id": produk_id,
                    "produk_nama": produk_nama,
                    "jumlah": int(max(0, self._to_int(item.get("jumlah"), 0))),
                    "subtotal": int(max(0, self._to_int(item.get("subtotal"), 0))),
                }
            )
        out.sort(
            key=lambda row: (
                str(row.get("produk_id") or ""),
                str(row.get("produk_nama") or ""),
            )
        )
        return out

    # edited by glg
    # Bangun baris return agar ikut berada pada payload file transaksi legacy.
    def _build_legacy_return_rows_payload(
        self,
        return_rows: List[Dict[str, Any]],
        batch_end: str,
    ) -> List[Dict[str, Any]]:
        return self._get_hotspot_use_case_service().build_legacy_return_rows_payload(
            return_rows=return_rows,
            batch_end=batch_end,
        )

    # edited by glg
    def _fetch_transaksi_settlement_candidates(
        self,
        settlement_dtime: str,
        cabang_id: int,
    ) -> List[Dict[str, Any]]:
        if not self._table_exists("transaksi_settlement"):
            return []

        table_meta = self._get_table_schema_meta("transaksi_settlement")
        table_cols = set(table_meta.keys())
        if not table_cols:
            return []

        clauses = []
        params: List[Any] = []
        settlement_date = str(settlement_dtime or "").strip()[:10]
        if settlement_date and "oleh_dtime" in table_cols:
            clauses.append("DATE(oleh_dtime) = DATE(?)")
            params.append(settlement_date)
        cabang_int = self._to_int(cabang_id, 0)
        if cabang_int > 0 and "cabang_id" in table_cols:
            clauses.append("cabang_id = ?")
            params.append(cabang_int)

        where_sql = f"WHERE {' AND '.join(clauses)}" if clauses else ""
        query = (
            "SELECT * FROM transaksi_settlement "  # nosec B608
            f"{where_sql} "
            "ORDER BY id DESC LIMIT 50"
        )

        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(query, tuple(params))
            rows = cur.fetchall() or []
            return [dict(row) for row in rows]
        except sqlite3.Error:
            return []
        finally:
            conn.close()

    # edited by glg
    def _pick_matching_transaksi_settlement_row(
        self,
        settlement_row: Dict[str, Any],
        transaksi_ids: List[int],
    ) -> Dict[str, Any]:
        row = settlement_row if isinstance(settlement_row, dict) else {}
        settlement_dtime = self._normalize_dtime_value(row.get("tanggal") or row.get("dtime"), "")
        cabang_id = self._to_int(row.get("cabang_id"), 0)
        candidates = self._fetch_transaksi_settlement_candidates(settlement_dtime, cabang_id)
        if not candidates:
            return {}

        transaksi_set = set(self._parse_int_list(transaksi_ids))
        kasir_hint = str(row.get("kasir") or "").strip().lower()
        settle_date = str(settlement_dtime)[:10]

        best = None
        best_score = -1
        for candidate in candidates:
            cand_ids = set(self._parse_int_list(candidate.get("data_transaksi_id")))
            overlap = len(transaksi_set.intersection(cand_ids))
            score = overlap * 1000
            cand_kasir = str(candidate.get("oleh_nama") or "").strip().lower()
            if kasir_hint and cand_kasir and kasir_hint == cand_kasir:
                score += 10
            cand_date = str(candidate.get("oleh_dtime") or "")[:10]
            if settle_date and cand_date and settle_date == cand_date:
                score += 2
            if score > best_score:
                best_score = score
                best = candidate

        if isinstance(best, dict):
            return best
        return candidates[0] if candidates else {}

    # edited by glg
    def _merge_settlement_payment_map(
        self,
        base_map: Dict[str, Dict[str, Any]],
        incoming_map: Dict[str, Dict[str, Any]],
    ) -> Dict[str, Dict[str, Any]]:
        # edited by glg
        # Pastikan alias payment settlement direct tetap sinkron setelah proses merge multi-row.
        def _apply_payment_legacy_aliases(target_payload: Dict[str, Any]) -> Dict[str, Any]:
            target = target_payload if isinstance(target_payload, dict) else {}
            nilai_diterima = max(0, self._to_int(target.get("nilai_diterima"), 0))
            nilai_settlement = max(0, self._to_int(target.get("nilai_settlement"), 0))
            selisih_settlement = self._to_int(
                target.get("selisih_settlement"),
                nilai_settlement - nilai_diterima,
            )
            diskon_produk = max(0, self._to_int(target.get("diskon_produk"), 0))
            diskon_tambahan = max(
                0,
                self._to_int(
                    target.get("diskon_tambahan_nilai"),
                    self._to_int(target.get("add_disc"), 0),
                ),
            )

            target["harga"] = int(nilai_settlement)
            target["harga_nett2"] = int(nilai_settlement)
            target["grand_total"] = int(nilai_settlement)
            target["tagihan"] = int(nilai_settlement)
            target["diskon_produk"] = int(diskon_produk)
            target["diskon_tambahan_nilai"] = int(diskon_tambahan)
            target["add_disc"] = int(diskon_tambahan)
            target["kas_nilai"] = int(nilai_settlement)
            target["selisih_plus"] = int(max(0, selisih_settlement))
            return target

        merged = {
            str(key): dict(value)
            for key, value in (base_map or {}).items()
            if isinstance(value, dict)
        }
        for account_id, payload in (incoming_map or {}).items():
            if not isinstance(payload, dict):
                continue
            key = str(account_id or "").strip()
            if not key:
                continue
            if key not in merged:
                merged[key] = dict(payload)
                _apply_payment_legacy_aliases(merged[key])
                continue
            current = merged[key]
            current["bank"] = str(current.get("bank") or payload.get("bank") or "")
            current["account_id"] = str(current.get("account_id") or payload.get("account_id") or key)
            current["metode_pembayaran"] = str(
                current.get("metode_pembayaran") or payload.get("metode_pembayaran") or "cash"
            )
            for field in ("return_nilai", "jml_payment", "nilai_diterima", "nilai_settlement"):
                current[field] = self._to_int(current.get(field), 0) + self._to_int(payload.get(field), 0)
            current["selisih_settlement"] = int(
                self._to_int(current.get("nilai_settlement"), 0)
                - self._to_int(current.get("nilai_diterima"), 0)
            )
            _apply_payment_legacy_aliases(current)

        for key in list(merged.keys()):
            _apply_payment_legacy_aliases(merged[key])

        sorted_keys = sorted(merged.keys(), key=lambda x: (self._to_int(x, 10**9), str(x)))
        return {key: merged[key] for key in sorted_keys}

    # edited by glg
    def _merge_settlement_section(
        self,
        base_section: Dict[str, Any],
        incoming_section: Dict[str, Any],
    ) -> Dict[str, Any]:
        merged = dict(base_section or {})
        incoming = incoming_section if isinstance(incoming_section, dict) else {}

        for field in (
            "total_penjualan_bruto",
            "total_bayar",
            "total_tagihan",
            "total_kembalian",
            "total_diskon_produk",
            "total_diskon_member",
            "total_additional_diskon",
            "total_return_penjualan",
            "total_payment",
            "point_transaksi",
            "jml_return_event",
            "jml_transaksi",
        ):
            merged[field] = self._to_int(merged.get(field), 0) + self._to_int(incoming.get(field), 0)

        merged["payment"] = self._merge_settlement_payment_map(
            cast(Dict[str, Dict[str, Any]], merged.get("payment"))
            if isinstance(merged.get("payment"), dict)
            else {},
            cast(Dict[str, Dict[str, Any]], incoming.get("payment"))
            if isinstance(incoming.get("payment"), dict)
            else {},
        )
        total_payment_from_group = 0
        for pay in (merged.get("payment") or {}).values():
            if isinstance(pay, dict):
                total_payment_from_group += self._to_int(pay.get("nilai_diterima"), 0)
        if total_payment_from_group > 0:
            merged["total_payment"] = total_payment_from_group

        merged["id_penjualan"] = self._merge_unique_int_list(
            cast(List[Any], merged.get("id_penjualan")) if isinstance(merged.get("id_penjualan"), list) else [],
            cast(List[Any], incoming.get("id_penjualan")) if isinstance(incoming.get("id_penjualan"), list) else [],
        )
        merged["id_return"] = self._merge_unique_int_list(
            cast(List[Any], merged.get("id_return")) if isinstance(merged.get("id_return"), list) else [],
            cast(List[Any], incoming.get("id_return")) if isinstance(incoming.get("id_return"), list) else [],
        )
        merged["jml_transaksi_unique"] = self._count_unique_settlement_transactions(
            cast(List[Any], merged.get("id_penjualan")) if isinstance(merged.get("id_penjualan"), list) else [],
            cast(List[Any], merged.get("id_return")) if isinstance(merged.get("id_return"), list) else [],
        )
        if self._to_int(merged.get("jml_transaksi"), 0) <= 0:
            merged["jml_transaksi"] = int(
                len(self._parse_int_list(merged.get("id_penjualan")))
                + max(0, self._to_int(merged.get("jml_return_event"), 0))
            )

        for field in (
            "cabang_id",
            "machine_id",
            "kasir_id",
            "version",
            "date_sales",
            "cabang_nama",
            "cpu_info",
            "com_name",
        ):
            if self._is_blank_value(merged.get(field)):
                merged[field] = incoming.get(field)

        incoming_settle = str(incoming.get("date_settlement") or "").strip()
        merged_settle = str(merged.get("date_settlement") or "").strip()
        if incoming_settle and (not merged_settle or incoming_settle > merged_settle):
            merged["date_settlement"] = incoming_settle

        if self._to_int(merged.get("settlement_oto_id"), 0) <= 0:
            merged["settlement_oto_id"] = self._to_int(incoming.get("settlement_oto_id"), 0)
        if self._is_blank_value(merged.get("settlement_oto_nama")):
            merged["settlement_oto_nama"] = incoming.get("settlement_oto_nama")

        # edited by glg
        # Jaga alias payload settlement direct tetap konsisten setelah merge.
        total_settlement = 0
        total_diterima = 0
        for pay in (merged.get("payment") or {}).values():
            if isinstance(pay, dict):
                total_settlement += max(0, self._to_int(pay.get("nilai_settlement"), 0))
                total_diterima += max(0, self._to_int(pay.get("nilai_diterima"), 0))
        total_tagihan = int(max(0, self._to_int(merged.get("total_tagihan"), 0)))
        total_bruto = int(max(0, self._to_int(merged.get("total_penjualan_bruto"), 0)))
        total_diskon_produk = int(max(0, self._to_int(merged.get("total_diskon_produk"), 0)))
        total_diskon_tambahan = int(max(0, self._to_int(merged.get("total_additional_diskon"), 0)))
        selisih_total = int(total_settlement - total_diterima)
        resolved_toko_id = self._as_positive_int(
            merged.get("tokoID"),
            self._as_positive_int(merged.get("toko_id"), 0),
        )
        merged["harga"] = total_bruto
        merged["harga_nett2"] = total_tagihan
        merged["grand_total"] = total_tagihan
        merged["tagihan"] = total_tagihan
        merged["diskon_produk"] = total_diskon_produk
        merged["diskon_tambahan_nilai"] = total_diskon_tambahan
        merged["add_disc"] = total_diskon_tambahan
        merged["kas_nilai"] = int(max(0, total_settlement))
        merged["selisih_plus"] = int(max(0, selisih_total))
        if resolved_toko_id > 0:
            merged["tokoID"] = int(resolved_toko_id)
            merged["toko_id"] = int(resolved_toko_id)

        return merged

    # edited by glg
    # Ambil batas awal jendela settlement untuk menangkap event pembatalan yang relevan per batch.
    def _resolve_settlement_window_start(self, settlement_row: Dict[str, Any]) -> str:
        row = settlement_row if isinstance(settlement_row, dict) else {}
        current_id = self._to_int(row.get("id"), 0)
        if current_id <= 0 or not self._table_exists("settlement_history"):
            return ""

        table_meta = self._get_table_schema_meta("settlement_history")
        table_cols = set(table_meta.keys())
        if "id" not in table_cols:
            return ""

        clauses = ["id < ?"]
        params: List[Any] = [current_id]
        kasir_hint = str(row.get("kasir") or "").strip()
        if kasir_hint and "kasir" in table_cols:
            clauses.append("LOWER(COALESCE(kasir, '')) = LOWER(?)")
            params.append(kasir_hint)
        cabang_hint = self._to_int(row.get("cabang_id"), 0)
        if cabang_hint > 0 and "cabang_id" in table_cols:
            clauses.append("CAST(COALESCE(cabang_id, 0) AS INTEGER) = ?")
            params.append(cabang_hint)
        where_sql = " AND ".join(clauses)

        conn = self._connect()
        try:
            cur = conn.cursor()
            query = (
                "SELECT tanggal "  # nosec B608
                "FROM settlement_history "
                f"WHERE {where_sql} "
                "ORDER BY id DESC "
                "LIMIT 1"
            ) 
            cur.execute(query, tuple(params))
            row_prev = cur.fetchone()
            if not row_prev:
                return ""
            prev = row_prev[0] if isinstance(row_prev, (tuple, list)) else row_prev["tanggal"]
            return str(prev or "").strip()
        except sqlite3.Error:
            return ""
        finally:
            conn.close()

    # edited by glg
    # Payload legacy settlement: wrapper JSON berisi agregat per tanggal sales.
    def _build_legacy_settlement_history_payload(
        self,
        settlement_rows: List[Dict[str, Any]],
        batch_end: str,
    ) -> Dict[str, Any]:
        return self._get_settlement_history_payload_service().build_legacy_payload(
            settlement_rows=settlement_rows,
            batch_end=batch_end,
        )

    # edited by glg
    # Ambil 1 row settlement_history terbaik berdasarkan transaksi_ids target.
    def _pick_settlement_history_row_by_transaksi_ids(
        self,
        cursor,
        transaksi_ids: List[int],
        limit: int = 300,
    ) -> Dict[str, Any]:
        normalized_ids = self._parse_int_list(transaksi_ids)
        if not normalized_ids:
            return {}
        target_set = set(normalized_ids)
        if not target_set:
            return {}
        try:
            cursor.execute(
                """
                SELECT *
                FROM settlement_history
                ORDER BY id DESC
                LIMIT ?
                """,
                (max(1, int(limit or 300)),),
            )
            rows = cursor.fetchall() or []
        except sqlite3.Error:
            return {}

        exact_match = {}
        subset_match = {}
        subset_size = 10**9
        for raw in rows:
            row = dict(raw) if isinstance(raw, (sqlite3.Row, dict)) else {}
            trx_ids = self._parse_int_list(row.get("data_transaksi_id"))
            if not trx_ids:
                continue
            row_set = set(trx_ids)
            if not row_set:
                continue
            if row_set == target_set:
                exact_match = row
                break
            if target_set.issubset(row_set):
                if len(row_set) < subset_size:
                    subset_match = row
                    subset_size = len(row_set)
        if exact_match:
            return exact_match
        return subset_match

    # edited by glg
    def _resolve_transaksi_ids_from_settlement_counter(self, cursor, settlement_counter: str) -> List[int]:
        counter = str(settlement_counter or "").strip()
        if not counter:
            return []
        try:
            cursor.execute(
                """
                SELECT data_transaksi_id
                FROM transaksi_settlement
                WHERE counter = ?
                ORDER BY id DESC
                LIMIT 1
                """,
                (counter,),
            )
            row = cursor.fetchone()
        except sqlite3.Error:
            return []
        if not row:
            return []
        raw = row[0] if isinstance(row, (tuple, list)) else row["data_transaksi_id"]
        return self._parse_int_list(raw)

    # edited by glg
    # Bangun payload settlement direct (single batch) dari data settlement lokal terkini.
    # Dipakai oleh direct endpoint tanpa mengganti pipeline export/upload existing.
    def build_direct_settlement_payload(
        self,
        transaksi_ids: List[int] = None,
        settlement_counter: str = "",
    ) -> Dict[str, Any]:
        normalized_ids = self._parse_int_list(transaksi_ids)
        counter = str(settlement_counter or "").strip()
        if not self._table_exists("settlement_history"):
            return {}

        conn = self._connect()
        conn.row_factory = sqlite3.Row
        try:
            cur = conn.cursor()
            target_row = {}
            if normalized_ids:
                target_row = self._pick_settlement_history_row_by_transaksi_ids(cur, normalized_ids)

            if not target_row and counter and self._table_exists("transaksi_settlement"):
                counter_ids = self._resolve_transaksi_ids_from_settlement_counter(cur, counter)
                if counter_ids:
                    target_row = self._pick_settlement_history_row_by_transaksi_ids(cur, counter_ids)
                    if not normalized_ids:
                        normalized_ids = counter_ids

            if not target_row:
                cur.execute(
                    """
                    SELECT *
                    FROM settlement_history
                    ORDER BY id DESC
                    LIMIT 1
                    """
                )
                row = cur.fetchone()
                target_row = dict(row) if row else {}

            if not target_row:
                return {}

            batch_end = self._normalize_dtime_value(
                target_row.get("tanggal") or target_row.get("dtime"),
                datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
            )
            payload = self._build_legacy_settlement_history_payload(
                [target_row],
                batch_end,
            )
            if isinstance(payload, dict):
                return payload
            return {}
        finally:
            conn.close()

    def _map_jenis_kode(self, transaksi_row: Dict[str, Any]) -> str:
        jenis_label = str((transaksi_row or {}).get("jenis_label") or "").strip().lower()
        if jenis_label in {"invoice", "penjualan", "jual", "transaksi"}:
            return "582"
        if jenis_label in {"settlement"}:
            return "758"
        raw_jenis = self._to_int((transaksi_row or {}).get("jenis"), 0)
        if raw_jenis > 0:
            return str(raw_jenis)
        raw_transaksi_jenis = self._to_int((transaksi_row or {}).get("transaksi_jenis"), 0)
        if raw_transaksi_jenis > 0:
            return str(raw_transaksi_jenis)
        return "582"

    def _fetch_produk_snapshot(self, produk_ids: List[int]) -> Dict[str, Dict[str, Any]]:
        return self._get_snapshot_query_service().fetch_produk_snapshot(produk_ids=produk_ids)

    def _fetch_diskon_free_snapshot(self, produk_ids: List[int]) -> Dict[str, Dict[str, Any]]:
        return self._get_snapshot_query_service().fetch_diskon_free_snapshot(
            produk_ids=produk_ids
        )

    def _build_legacy_items_payload(
        self,
        detail_rows: List[Dict[str, Any]],
        produk_map: Dict[str, Dict[str, Any]],
    ) -> Dict[str, Dict[str, Any]]:
        items_payload: Dict[str, Dict[str, Any]] = {}
        for detail in detail_rows or []:
            if not isinstance(detail, dict):
                continue
            produk_id = str(self._to_int(detail.get("produk_id"), 0))
            if not produk_id or produk_id == "0":
                continue

            qty = self._to_int(detail.get("valid_qty"), self._to_int(detail.get("produk_ord_jml"), 0))
            if qty <= 0:
                continue

            produk_info = produk_map.get(produk_id) or {}
            harga_jual = self._to_int(detail.get("produk_ord_hrg"), 0)
            produk_jenis = str(detail.get("produk_jenis") or "item").strip().lower()
            # edited by glg
            # Qty jual harus murni item jual.
            # free_produk diproses khusus di free_items agar tidak menambah jumlah jual.
            if produk_jenis == "free_produk":
                continue
            subtotal = max(0, harga_jual * qty)
            disc_nilai_total = max(0, self._resolve_detail_discount_nominal(detail))

            if qty > 0:
                disc_per_unit = int(round(float(disc_nilai_total) / float(qty)))
            else:
                disc_per_unit = 0
            disc_per_unit = max(0, min(harga_jual, disc_per_unit))
            harga_disc = max(0, harga_jual - disc_per_unit)
            if subtotal > 0 and disc_nilai_total > 0:
                disc_persen = min(100.0, (float(disc_nilai_total) / float(subtotal)) * 100.0)
            else:
                disc_persen = 0.0

            nama_produk = str(
                detail.get("produk_nama")
                or produk_info.get("nama")
                or ""
            ).strip()

            existing = items_payload.get(produk_id)
            if existing:
                existing["jumlah"] = self._to_int(existing.get("jumlah"), 0) + qty
                existing["subtotal"] = self._to_int(existing.get("subtotal"), 0) + subtotal
                existing["discNilai"] = self._to_int(existing.get("discNilai"), 0) + disc_nilai_total
                continue

            items_payload[produk_id] = {
                "jumlah": qty,
                "subtotal": subtotal,
                "satuan": str(detail.get("satuan") or produk_info.get("satuan") or "pcs"),
                "tipe_pajak": str(self._to_int(produk_info.get("tipe_pajak"), 1)),
                "tipe_produk": str(detail.get("produk_jenis") or "item"),
                "produk_id": produk_id,
                "nama": nama_produk,
                "satuan_id": str(self._to_int(produk_info.get("satuan_id"), 0)),
                "ppn": str(self._to_int(detail.get("ppn"), self._to_int(produk_info.get("ppn"), 0))),
                "ppn_produk": str(self._to_int(produk_info.get("ppn_produk"), 1)),
                "barcode": str(produk_info.get("barcode") or ""),
                "coa_code": str(produk_info.get("coa_code") or ""),
                "hpp": str(
                    self._to_float(
                        detail.get("hpp"),
                        self._to_float(produk_info.get("hpp"), 0.0),
                    )
                ),
                "discPersen": float(round(disc_persen, 6)),
                "discNilai": disc_nilai_total,
                "harga_jual": harga_jual,
                "harga_disc": harga_disc,
                "ppnFactor": "0",
            }
        return items_payload

    def _build_legacy_free_items_payload(
        self,
        detail_rows: List[Dict[str, Any]],
        produk_map: Dict[str, Dict[str, Any]],
        diskon_map: Dict[str, Dict[str, Any]],
        dtime_value: str,
    ) -> Dict[str, Dict[str, Dict[str, Any]]]:
        # edited by glg
        # Parse metadata relasi free produk dari transaksi_data.ext_intext.
        def _extract_free_relasi(detail_row: Dict[str, Any]) -> Dict[str, Any]:
            if not isinstance(detail_row, dict):
                return {}
            raw_meta = detail_row.get("ext_intext")
            if self._is_blank_value(raw_meta):
                return {}
            try:
                loaded = json.loads(str(raw_meta))
            except (json.JSONDecodeError, TypeError, ValueError):
                return {}
            if not isinstance(loaded, dict):
                return {}
            rel = loaded.get("free_relasi")
            return rel if isinstance(rel, dict) else {}

        free_items: Dict[str, Dict[str, Dict[str, Any]]] = {}
        for detail in detail_rows or []:
            if not isinstance(detail, dict):
                continue
            produk_jenis = str(detail.get("produk_jenis") or "").strip().lower()
            if produk_jenis != "free_produk":
                continue

            detail_produk_id = str(self._to_int(detail.get("produk_id"), 0))
            if not detail_produk_id or detail_produk_id == "0":
                continue
            qty = self._to_int(detail.get("valid_qty"), self._to_int(detail.get("produk_ord_jml"), 0))
            if qty <= 0:
                continue

            relasi = _extract_free_relasi(detail)
            source_produk_id = str(
                self._to_int(
                    relasi.get("source_produk_id"),
                    self._to_int(detail_produk_id, 0),
                )
            )
            if not source_produk_id or source_produk_id == "0":
                continue

            promo = diskon_map.get(source_produk_id) or diskon_map.get(detail_produk_id) or {}
            produk_info = produk_map.get(source_produk_id) or produk_map.get(detail_produk_id) or {}

            source_produk_nama = str(
                relasi.get("source_produk_nama")
                or detail.get("produk_nama")
                or produk_info.get("nama")
                or ""
            ).strip()
            source_qty = self._to_int(relasi.get("source_qty"), qty)
            if source_qty <= 0:
                source_qty = qty

            free_produk_id = str(
                self._to_int(
                    relasi.get("free_produk_id"),
                    self._to_int(
                        detail_produk_id,
                        self._to_int(promo.get("free_produk_id"), 0),
                    ),
                )
            )
            free_produk_nama = str(
                relasi.get("free_produk_nama")
                or detail.get("produk_nama")
                or produk_info.get("nama")
                or promo.get("free_produk_nama")
                or ""
            ).strip()
            kelipatan = self._to_int(promo.get("kelipatan"), 1)
            minimal = self._to_int(promo.get("minim"), 0)
            quota_global = self._to_int(promo.get("quota_global"), 0)
            quota_used = self._to_int(promo.get("quota_used"), 0)
            quota_sisa = max(0, quota_global - quota_used)
            free_qty_max = quota_sisa if quota_sisa > 0 else qty
            event_id = str(self._to_int(detail.get("id"), int(datetime.now().timestamp())))

            if minimal > 0:
                note = (
                    f"**Free (1x {free_produk_nama}) minimal pembelian ({minimal}x {free_produk_nama}) "
                    f"berlaku kelipatan ({max(kelipatan, 1)})"
                )
            else:
                note = f"**Free ({qty}x {free_produk_nama})"

            free_items.setdefault(source_produk_id, {})
            free_items[source_produk_id][event_id] = {
                "ori_produk_id": source_produk_id,
                "ori_produk_nama": source_produk_nama or free_produk_nama,
                "produk_beli_jml": source_qty,
                "free_produk_id": free_produk_id,
                "free_produk_nama": free_produk_nama,
                "free_qty": qty,
                "free_qty_max": str(max(0, free_qty_max)),
                "free_produk_note": note,
                "kelipatan": str(max(kelipatan, 1)),
                "minimal": str(max(minimal, 0)),
                "quota_global": str(max(quota_global, 0)),
                "quota_used": str(max(quota_used, 0)),
                "quota_sisa": str(max(quota_sisa, 0)),
                "date": dtime_value,
            }
        return free_items

    def _build_legacy_payment_payload(
        self,
        transaksi_row: Dict[str, Any],
        id_penjualan: int,
        cabang_id: int,
    ) -> List[Dict[str, Any]]:
        service = self._get_legacy_payment_payload_service()
        return service.build_payload(
            transaksi_row=transaksi_row,
            id_penjualan=id_penjualan,
            cabang_id=cabang_id,
        )

    def _build_legacy_transaksi_rows(
        self,
        transaksi_rows: List[Dict[str, Any]],
        batch_end: str,
        extra_return_rows: List[Dict[str, Any]] = None,
    ) -> List[Dict[str, Any]]:
        return self._get_hotspot_use_case_service().build_legacy_transaksi_rows(
            transaksi_rows=transaksi_rows,
            batch_end=batch_end,
            extra_return_rows=extra_return_rows,
        )

    def _build_export_payload(
        self,
        table_name: str,
        last_id: int,
        new_last_id: int,
        batch_end: str,
        rows: List[Dict[str, Any]],
        extra_transaksi_rows: List[Dict[str, Any]] = None,
    ) -> Any:
        config = read_config()
        payload_profile = self._resolve_payload_profile(config)
        schema_version = str(config.get("export_schema_version") or "")
        server_hash = get_current_server_hash()
        base_url = normalize_base_url(config.get("api_base_url"))
        device_context = self._get_export_device_context()
        machine_id = str(device_context.get("machine_id") or get_device_id() or "").strip()
        payload_rows = self._normalize_rows_for_payload_profile(
            table_name=table_name,
            rows=rows,
            machine_id=machine_id,
            payload_profile=payload_profile,
        )
        if payload_profile == "legacy_bundle" and table_name == "transaksi":
            # edited by glg
            # Format isi file lama: array transaksi dengan items/free_items/payment.
            return self._build_legacy_transaksi_rows(
                payload_rows,
                batch_end,
                extra_return_rows=extra_transaksi_rows,
            )
        if payload_profile == "legacy_bundle" and table_name == "settlement_history":
            # edited by glg
            # Format isi settlement lama: wrapper settlement + data agregat per tanggal sales.
            return self._build_legacy_settlement_history_payload(payload_rows, batch_end)
        if payload_profile in {"legacy_bundle", "legacy_table"}:
            return {
                "table": table_name,
                "last_id": str(last_id),
                "new_last_id": str(new_last_id),
                "row": len(payload_rows),
                "datetime": batch_end,
                "data": payload_rows,
                "machine_id": machine_id,
            }
        return {
            "table": table_name,
            "last_id": str(last_id),
            "new_last_id": str(new_last_id),
            "row": len(payload_rows),
            "datetime": batch_end,
            "schema_version": schema_version,
            "server_hash": server_hash,
            "base_url": base_url,
            "machine_id": machine_id,
            "data": payload_rows,
        }

    def _write_json_file(
        self,
        batch_end: str,
        table_name: str,
        payload: Any,
        first_id_hint: Any = None,
        last_id_hint: Any = None,
    ) -> tuple[str, int, str | None, int]:
        config = read_config()
        rel_dir = config.get("export_json_dir", "exports/transactions")
        base_dir = get_app_data_dir()
        out_dir = os.path.join(base_dir, rel_dir)
        os.makedirs(out_dir, exist_ok=True)

        device_context = self._get_export_device_context()
        machine_id = str(device_context.get("machine_id") or get_device_id() or "").strip()
        # edited by glg
        # Nama file untuk endpoint: pos_{ts}_{machine_id}_{first_id}_{last_id}_{table}_{seq}.xz
        safe_machine_id = re.sub(r"[^A-Za-z0-9._-]+", "-", machine_id).strip("-") or "UNKNOWN"
        file_seq = f"{self._next_seq():04d}"
        safe_ts = batch_end.replace(":", "").replace("-", "").replace(" ", "")
        first_id = str(first_id_hint if first_id_hint is not None else "").strip()
        last_id = str(last_id_hint if last_id_hint is not None else "").strip()
        if not first_id and isinstance(payload, dict):
            first_id = str((payload or {}).get("last_id") or "0").strip()
        if not last_id and isinstance(payload, dict):
            last_id = str((payload or {}).get("new_last_id") or "0").strip()
        safe_first_id = first_id if first_id.isdigit() else "0"
        safe_last_id = last_id if last_id.isdigit() else safe_first_id
        compression = self._resolve_export_compression(config)
        file_name = (
            f"pos_{safe_ts}_{safe_machine_id}_{safe_first_id}_{safe_last_id}_"
            f"{table_name}_{file_seq}.{compression}"
        )
        file_path = os.path.join(out_dir, file_name)

        tmp_path = f"{file_path}.tmp"
        try:
            self._write_compressed_payload(
                tmp_path=tmp_path,
                payload=payload,
                codec=compression,
                config=config,
            )
            os.replace(tmp_path, file_path)
        finally:
            if os.path.exists(tmp_path):
                try:
                    os.remove(tmp_path)
                except OSError as exc:
                    LOGGER.debug("[DEBUG EXPORT] gagal hapus file tmp export: %s", exc)
        file_size = os.path.getsize(file_path) if os.path.exists(file_path) else 0
        file_hash = self._hash_file(file_path) if os.path.exists(file_path) else None
        return file_path, int(file_seq), file_hash, file_size

    def _next_seq(self) -> int:
        conn = self._connect()
        cur = conn.cursor()
        cur.execute("SELECT last_no FROM export_file_seq WHERE id = 1")
        row = cur.fetchone()
        last_no = int(row[0]) if row else 0
        next_no = last_no + 1
        cur.execute("UPDATE export_file_seq SET last_no = ? WHERE id = 1", (next_no,))
        conn.commit()
        conn.close()
        return next_no

    def _get_max_ids(self, table_names):
        conn = self._connect()
        try:
            cur = conn.cursor()
            max_ids = {}
            for name in table_names:
                try:
                    table_sql = self._quote_identifier(name)
                    if name == "transaksi_data_registry":
                        cur.execute(
                            "SELECT name FROM sqlite_master WHERE type='table' AND name = ?",
                            ("transaksi_data_registry",),
                        )
                        has_registry = cur.fetchone() is not None
                        if has_registry:
                            cur.execute(f"SELECT MAX(transaksi_id) FROM {table_sql}")  # nosec B608
                        else:
                            cur.execute("SELECT MAX(id) FROM transaksi")
                    else:
                        cur.execute(f"SELECT MAX(id) FROM {table_sql}")  # nosec B608
                    row = cur.fetchone()
                    max_ids[name] = int(row[0]) if row and row[0] is not None else 0
                except (sqlite3.Error, ValueError, TypeError) as e:
                    LOGGER.warning("[DEBUG EXPORT] gagal baca max id table=%s: %s", name, e)
                    max_ids[name] = 0
            return max_ids
        finally:
            conn.close()

    def _get_exportable_tables(self, table_names):
        normalized = []
        seen = set()
        for raw_name in table_names or []:
            table_name = str(raw_name or "").strip()
            if not table_name or table_name in seen:
                continue
            seen.add(table_name)
            normalized.append(table_name)
        if not normalized:
            return []

        conn = self._connect()
        try:
            cur = conn.cursor()
            exportable = []
            for table_name in normalized:
                try:
                    table_sql = self._quote_identifier(table_name)
                except ValueError:
                    LOGGER.warning("[DEBUG EXPORT] skip table=%s: nama tabel tidak valid", table_name)
                    continue
                cur.execute(
                    "SELECT name FROM sqlite_master WHERE type='table' AND name = ?",
                    (table_name,),
                )
                if cur.fetchone() is None:
                    if table_name == "transaksi_data_registry":
                        # edited by glg
                        # Virtual registry export tetap diperbolehkan meski tabel lokal tidak ada.
                        exportable.append(table_name)
                        continue
                    LOGGER.warning("[DEBUG EXPORT] skip table=%s: tabel tidak ditemukan", table_name)
                    continue
                cur.execute(build_pragma_table_info_sql(table_name))
                columns = {str(row[1]) for row in cur.fetchall() if row and len(row) > 1}
                if "id" not in columns:
                    if table_name == "transaksi_data_registry" and "transaksi_id" in columns:
                        exportable.append(table_name)
                        continue
                    LOGGER.warning("[DEBUG EXPORT] skip table=%s: kolom id tidak ditemukan", table_name)
                    continue
                exportable.append(table_name)
            return exportable
        finally:
            conn.close()

    def _hash_file(self, file_path: str) -> str:
        h = hashlib.sha256()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(8192), b""):
                h.update(chunk)
        return h.hexdigest()

    def _apply_column_whitelist(self, rows: List[Dict[str, Any]], table_name: str, config: Dict[str, Any]) -> List[Dict[str, Any]]:
        if not rows:
            return rows
        specific_key = f"export_columns_{table_name}"
        if table_name == "transaksi":
            key = "export_columns_transaksi"
        elif table_name == "transaksi_data":
            key = "export_columns_transaksi_data"
        else:
            key = specific_key
        cols = config.get(key)
        if cols is None and key != specific_key:
            cols = config.get(specific_key)
        cols = cols or []
        if not isinstance(cols, list) or len(cols) == 0:
            return rows
        whitelist = set(str(c) for c in cols if str(c))
        whitelist.add("id")
        if table_name == "transaksi_data_registry":
            # edited by glg
            # Jaga kontrak minimum registry walau whitelist custom aktif.
            whitelist.update({"transaksi_id", "x_id", "datetime", "machine_id", "main", "items"})
        if not whitelist:
            return rows
        filtered = []
        for r in rows:
            filtered.append({k: v for k, v in r.items() if k in whitelist})
        return filtered

    def _sanitize_rows(self, rows: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
        def _sanitize_value(val):
            if isinstance(val, (bytes, bytearray)):
                try:
                    return val.decode("utf-8")
                except UnicodeDecodeError:
                    return base64.b64encode(val).decode("ascii")
            if isinstance(val, dict):
                return {k: _sanitize_value(v) for k, v in val.items()}
            if isinstance(val, list):
                return [_sanitize_value(v) for v in val]
            return val

        import base64
        sanitized = []
        for row in rows:
            sanitized.append({k: _sanitize_value(v) for k, v in row.items()})
        return sanitized

    def _now_db(self) -> str:
        try:
            conn = self._connect()
            cur = conn.cursor()
            # edited by glg
            # Sinkronkan window export dengan timestamp transaksi lokal (cancel_dtime/tanggal_return)
            # agar event return/pembatalan tidak terlewat karena mismatch UTC vs localtime.
            cur.execute("SELECT datetime('now','localtime') as now")
            row = cur.fetchone()
            conn.close()
            if row and row[0]:
                return row[0]
        except (sqlite3.Error, RuntimeError, ValueError):
            LOGGER.debug("[DEBUG EXPORT] fallback _now_db ke datetime.now()")
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    # edited by glg
    def _finalize_flux_success(
        self,
        *,
        table_name: str,
        server_hash: str,
        flux_id: int,
        row_count: int,
        file_path,
        file_seq,
        file_hash,
        file_size,
        new_last_id=None,
    ):
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                UPDATE export_flux
                SET status = 'SUCCESS',
                    row_count = ?,
                    file_path = ?,
                    file_seq = ?,
                    file_hash = ?,
                    file_size = ?,
                    updated_at = ?,
                    error_log = NULL
                WHERE id = ?
                """,
                (int(row_count or 0), file_path, file_seq, file_hash, file_size, now, int(flux_id)),
            )

            if new_last_id is not None:
                cur.execute(
                    """
                    INSERT INTO export_cursor_table_scoped (table_name, server_hash, last_id)
                    VALUES (?, ?, ?)
                    ON CONFLICT(table_name, server_hash) DO UPDATE SET last_id = excluded.last_id
                    """,
                    (table_name, server_hash, int(new_last_id)),
                )

            cur.execute(
                "DELETE FROM export_retry_state WHERE table_name = ? AND server_hash = ?",
                (table_name, server_hash),
            )
            conn.commit()
        finally:
            conn.close()

    def _resolve_cabang_code(self, device_info: Dict[str, Any], config: Dict[str, Any]) -> str:
        source = str(config.get("export_cabang_code_source") or "cabang_id").lower()
        if source == "toko_id":
            val = self._as_positive_int(device_info.get("toko_id"), self._as_positive_int(config.get("toko_id"), 0))
        elif source == "device_row_id":
            val = self._as_positive_int(device_info.get("id"), 0)
        else:
            val = self._as_positive_int(device_info.get("cabang_id"), 0)
        return f"{val:04d}"

    def _guess_last_seq(self) -> int:
        config = read_config()
        rel_dir = config.get("export_json_dir", "exports/transactions")
        base_dir = get_app_data_dir()
        out_dir = os.path.join(base_dir, rel_dir)
        if not os.path.isdir(out_dir):
            return 0
        last_no = 0
        for name in os.listdir(out_dir):
            lower_name = name.lower()
            if not (lower_name.endswith(".gz") or lower_name.endswith(".xz")):
                continue
            if name.startswith("f_"):
                parts = name.split("_")
                if len(parts) >= 5:
                    seq_part = parts[3]
                    if seq_part.isdigit():
                        last_no = max(last_no, int(seq_part))
                continue
            if name.startswith("pos_"):
                base_name, _, _ = name.rpartition(".")
                parts = base_name.split("_")
                if parts:
                    seq_part = parts[-1]
                    if seq_part.isdigit():
                        last_no = max(last_no, int(seq_part))
        return last_no
