# edited by glg
import hashlib
import json
import sqlite3
from datetime import datetime, timedelta
from typing import Any, Dict

from pypos.core.utils.db_helper import connect_sqlite
from pypos.core.utils.path_utils import get_db_path
from pypos.modules.penjualan.services.free_produk_sync_outbox_service import (
    FreeProdukSyncOutboxService,
)


class TransaksiEnterpriseControlService:
    """
    Kontrol enterprise jalur transaksi:
    - Idempotency key lock
    - Immutable audit trail
    - Approval trail
    - Reconciliation job
    """

    _TRIGGER_PERSIST_AUDIT_UPDATE = "trg_transaksi_persist_audit_immutable_update"
    _TRIGGER_PERSIST_AUDIT_DELETE = "trg_transaksi_persist_audit_immutable_delete"
    _TRIGGER_APPROVAL_UPDATE = "trg_transaksi_approval_trail_immutable_update"
    _TRIGGER_APPROVAL_DELETE = "trg_transaksi_approval_trail_immutable_delete"

    _TRIGGER_PERSIST_AUDIT_UPDATE_SQL = """
        CREATE TRIGGER IF NOT EXISTS trg_transaksi_persist_audit_immutable_update
        BEFORE UPDATE ON transaksi_persist_audit
        BEGIN
            SELECT RAISE(FAIL, 'immutable_audit_log');
        END;
    """
    _TRIGGER_PERSIST_AUDIT_DELETE_SQL = """
        CREATE TRIGGER IF NOT EXISTS trg_transaksi_persist_audit_immutable_delete
        BEFORE DELETE ON transaksi_persist_audit
        BEGIN
            SELECT RAISE(FAIL, 'immutable_audit_log');
        END;
    """
    _TRIGGER_APPROVAL_UPDATE_SQL = """
        CREATE TRIGGER IF NOT EXISTS trg_transaksi_approval_trail_immutable_update
        BEFORE UPDATE ON transaksi_approval_trail
        BEGIN
            SELECT RAISE(FAIL, 'immutable_approval_log');
        END;
    """
    _TRIGGER_APPROVAL_DELETE_SQL = """
        CREATE TRIGGER IF NOT EXISTS trg_transaksi_approval_trail_immutable_delete
        BEFORE DELETE ON transaksi_approval_trail
        BEGIN
            SELECT RAISE(FAIL, 'immutable_approval_log');
        END;
    """

    def __init__(self, db_path: str = None, *, stale_inprogress_seconds: int = 300):
        self.db_path = str(db_path or get_db_path())
        self.stale_inprogress_seconds = max(60, int(stale_inprogress_seconds or 60))
        self.ensure_schema()

    def _connect(self):
        conn = connect_sqlite(self.db_path)
        conn.row_factory = sqlite3.Row
        return conn

    @staticmethod
    def _now_text():
        return datetime.now().strftime("%Y-%m-%d %H:%M:%S")

    @staticmethod
    def _safe_text(value: Any, default: str = "") -> str:
        if value is None:
            return str(default or "")
        text = str(value).strip()
        return text if text else str(default or "")

    @classmethod
    def _safe_json(cls, value: Any) -> str:
        try:
            return json.dumps(value, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
        except (TypeError, ValueError):
            return cls._safe_text(value, "")

    @staticmethod
    def _sha256(text: str) -> str:
        return hashlib.sha256(str(text or "").encode("utf-8")).hexdigest()

    @classmethod
    def build_payload_hash(
        cls,
        *,
        transaksi_data,
        detail_data,
        transaksi_data_dict,
        audit_message: str = "",
    ) -> str:
        payload = {
            "transaksi_data": transaksi_data,
            "detail_data": detail_data,
            "transaksi_data_dict": transaksi_data_dict,
            "audit_message": str(audit_message or "").strip(),
        }
        return cls._sha256(cls._safe_json(payload))

    def ensure_schema(self):
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                CREATE TABLE IF NOT EXISTS transaksi_idempotency (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    idempotency_key TEXT NOT NULL UNIQUE,
                    payload_hash TEXT NOT NULL,
                    status TEXT NOT NULL DEFAULT 'IN_PROGRESS',
                    transaksi_id INTEGER NOT NULL DEFAULT 0,
                    trace_id TEXT,
                    error_code TEXT,
                    error_reason TEXT,
                    created_at TEXT NOT NULL,
                    updated_at TEXT NOT NULL
                )
                """
            )
            cur.execute(
                """
                CREATE INDEX IF NOT EXISTS ix_transaksi_idempotency_status_updated
                ON transaksi_idempotency(status, updated_at, id)
                """
            )

            cur.execute(
                """
                CREATE TABLE IF NOT EXISTS transaksi_persist_audit (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    event_dtime TEXT NOT NULL,
                    trace_id TEXT,
                    idempotency_key TEXT NOT NULL,
                    event_type TEXT NOT NULL,
                    transaksi_id INTEGER NOT NULL DEFAULT 0,
                    status TEXT NOT NULL,
                    error_code TEXT,
                    reason TEXT,
                    payload_json TEXT,
                    prev_hash TEXT,
                    row_hash TEXT NOT NULL
                )
                """
            )
            cur.execute(
                """
                CREATE INDEX IF NOT EXISTS ix_transaksi_persist_audit_dtime
                ON transaksi_persist_audit(event_dtime, id)
                """
            )
            cur.execute(
                self._TRIGGER_PERSIST_AUDIT_UPDATE_SQL
            )
            cur.execute(
                self._TRIGGER_PERSIST_AUDIT_DELETE_SQL
            )

            cur.execute(
                """
                CREATE TABLE IF NOT EXISTS transaksi_approval_trail (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    event_dtime TEXT NOT NULL,
                    action_name TEXT NOT NULL,
                    actor_name TEXT,
                    approval_name TEXT,
                    approval_status TEXT NOT NULL,
                    trace_id TEXT,
                    payload_json TEXT,
                    prev_hash TEXT,
                    row_hash TEXT NOT NULL
                )
                """
            )
            cur.execute(
                """
                CREATE INDEX IF NOT EXISTS ix_transaksi_approval_trail_dtime
                ON transaksi_approval_trail(event_dtime, id)
                """
            )
            cur.execute(
                self._TRIGGER_APPROVAL_UPDATE_SQL
            )
            cur.execute(
                self._TRIGGER_APPROVAL_DELETE_SQL
            )
            conn.commit()
        finally:
            conn.close()

    # edited by glg
    @staticmethod
    def _cutoff_text(days: int) -> str:
        retention_days = max(1, int(days or 1))
        cutoff = datetime.now() - timedelta(days=retention_days)
        return cutoff.strftime("%Y-%m-%d %H:%M:%S")

    # edited by glg
    @staticmethod
    def _drop_trigger_if_exists(cursor, trigger_name: str) -> None:
        cursor.execute(f"DROP TRIGGER IF EXISTS {str(trigger_name or '').strip()}")

    # edited by glg
    def _recreate_immutable_triggers(self, cursor, table_name: str) -> None:
        if table_name == "transaksi_persist_audit":
            cursor.execute(self._TRIGGER_PERSIST_AUDIT_UPDATE_SQL)
            cursor.execute(self._TRIGGER_PERSIST_AUDIT_DELETE_SQL)
            return
        cursor.execute(self._TRIGGER_APPROVAL_UPDATE_SQL)
        cursor.execute(self._TRIGGER_APPROVAL_DELETE_SQL)

    # edited by glg
    def _purge_rows_before(
        self,
        cursor,
        *,
        table_name: str,
        time_column: str,
        cutoff_text: str,
        limit: int,
    ) -> int:
        cursor.execute(
            f"""
            DELETE FROM {table_name}
            WHERE id IN (
                SELECT id
                FROM {table_name}
                WHERE COALESCE({time_column}, '') <> ''
                  AND COALESCE({time_column}, '') <= ?
                ORDER BY id ASC
                LIMIT ?
            )
            """,
            (str(cutoff_text or "").strip(), max(1, int(limit or 1))),
        )
        return int(cursor.rowcount or 0)

    # edited by glg
    def _purge_idempotency(self, cursor, *, cutoff_text: str, limit: int) -> int:
        cursor.execute(
            """
            DELETE FROM transaksi_idempotency
            WHERE id IN (
                SELECT id
                FROM transaksi_idempotency
                WHERE status IN ('SUCCESS', 'FAILED')
                  AND COALESCE(updated_at, created_at, '') <> ''
                  AND COALESCE(updated_at, created_at, '') <= ?
                ORDER BY id ASC
                LIMIT ?
            )
            """,
            (str(cutoff_text or "").strip(), max(1, int(limit or 1))),
        )
        return int(cursor.rowcount or 0)

    # edited by glg
    def _purge_immutable_table(
        self,
        cursor,
        *,
        table_name: str,
        time_column: str,
        cutoff_text: str,
        limit: int,
    ) -> int:
        if table_name == "transaksi_persist_audit":
            trigger_update = self._TRIGGER_PERSIST_AUDIT_UPDATE
            trigger_delete = self._TRIGGER_PERSIST_AUDIT_DELETE
        else:
            trigger_update = self._TRIGGER_APPROVAL_UPDATE
            trigger_delete = self._TRIGGER_APPROVAL_DELETE
        self._drop_trigger_if_exists(cursor, trigger_update)
        self._drop_trigger_if_exists(cursor, trigger_delete)
        purged = self._purge_rows_before(
            cursor,
            table_name=table_name,
            time_column=time_column,
            cutoff_text=cutoff_text,
            limit=limit,
        )
        self._recreate_immutable_triggers(cursor, table_name)
        return int(purged)

    # edited by glg
    def purge_retention(
        self,
        *,
        idempotency_days: int = 30,
        audit_days: int = 90,
        approval_days: int = 90,
        outbox_days: int = 30,
        purge_limit: int = 500,
    ) -> Dict[str, Any]:
        """
        Retensi terkontrol untuk tabel enterprise control agar DB tetap ramping.
        """
        max_rows = max(1, int(purge_limit or 1))
        summary = {
            "idempotency_purged": 0,
            "persist_audit_purged": 0,
            "approval_trail_purged": 0,
            "outbox_purged_sent": 0,
            "outbox_purged_dead": 0,
            "purge_limit": int(max_rows),
        }

        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute("BEGIN IMMEDIATE")
            summary["idempotency_purged"] = self._purge_idempotency(
                cur,
                cutoff_text=self._cutoff_text(idempotency_days),
                limit=max_rows,
            )
            summary["persist_audit_purged"] = self._purge_immutable_table(
                cur,
                table_name="transaksi_persist_audit",
                time_column="event_dtime",
                cutoff_text=self._cutoff_text(audit_days),
                limit=max_rows,
            )
            summary["approval_trail_purged"] = self._purge_immutable_table(
                cur,
                table_name="transaksi_approval_trail",
                time_column="event_dtime",
                cutoff_text=self._cutoff_text(approval_days),
                limit=max_rows,
            )
            conn.commit()
        except sqlite3.Error:
            try:
                conn.rollback()
            except sqlite3.Error:
                pass
            raise
        finally:
            conn.close()

        outbox_service = FreeProdukSyncOutboxService(db_path=self.db_path)
        outbox_summary = outbox_service.purge_terminal(
            retention_days=max(1, int(outbox_days or 1)),
            limit=max_rows,
        )
        summary["outbox_purged_sent"] = int(outbox_summary.get("purged_sent") or 0)
        summary["outbox_purged_dead"] = int(outbox_summary.get("purged_dead") or 0)
        self.append_persist_audit(
            event_type="RETENTION_PURGE",
            status="ok",
            idempotency_key="retention-job",
            trace_id="retention-job",
            payload=summary,
        )
        return summary

    def _is_stale(self, updated_at_text: str) -> bool:
        text = self._safe_text(updated_at_text, "")
        if not text:
            return True
        try:
            updated_at = datetime.strptime(text, "%Y-%m-%d %H:%M:%S")
        except ValueError:
            return True
        threshold = datetime.now() - timedelta(seconds=int(self.stale_inprogress_seconds))
        return updated_at <= threshold

    def acquire_idempotency_lock(self, *, idempotency_key: str, payload_hash: str, trace_id: str = "") -> Dict[str, Any]:
        key = self._safe_text(idempotency_key, "")
        hash_value = self._safe_text(payload_hash, "")
        if not key or not hash_value:
            return {
                "proceed": False,
                "state": "INVALID",
                "reason": "missing_idempotency_payload",
                "error_code": "TRX_IDEMPOTENCY_INVALID",
            }

        now_text = self._now_text()
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute("BEGIN IMMEDIATE")
            cur.execute(
                """
                SELECT payload_hash, status, transaksi_id, updated_at
                FROM transaksi_idempotency
                WHERE idempotency_key = ?
                LIMIT 1
                """,
                (key,),
            )
            row = cur.fetchone()
            if row is None:
                cur.execute(
                    """
                    INSERT INTO transaksi_idempotency (
                        idempotency_key, payload_hash, status, transaksi_id,
                        trace_id, error_code, error_reason, created_at, updated_at
                    )
                    VALUES (?, ?, 'IN_PROGRESS', 0, ?, NULL, NULL, ?, ?)
                    """,
                    (key, hash_value, self._safe_text(trace_id, ""), now_text, now_text),
                )
                conn.commit()
                return {"proceed": True, "state": "STARTED"}

            db_hash = self._safe_text(row["payload_hash"], "")
            if db_hash != hash_value:
                conn.commit()
                return {
                    "proceed": False,
                    "state": "CONFLICT",
                    "reason": "idempotency_key_conflict",
                    "error_code": "TRX_IDEMPOTENCY_CONFLICT",
                }

            status = self._safe_text(row["status"], "").upper()
            transaksi_id = int(row["transaksi_id"] or 0)
            if status == "SUCCESS" and transaksi_id > 0:
                conn.commit()
                return {
                    "proceed": False,
                    "state": "DUPLICATE_SUCCESS",
                    "transaksi_id": transaksi_id,
                }

            if status == "IN_PROGRESS" and not self._is_stale(row["updated_at"]):
                conn.commit()
                return {
                    "proceed": False,
                    "state": "IN_PROGRESS",
                    "reason": "idempotency_in_progress",
                    "error_code": "TRX_IDEMPOTENCY_IN_PROGRESS",
                }

            cur.execute(
                """
                UPDATE transaksi_idempotency
                SET status = 'IN_PROGRESS',
                    transaksi_id = 0,
                    trace_id = ?,
                    error_code = NULL,
                    error_reason = NULL,
                    updated_at = ?
                WHERE idempotency_key = ?
                """,
                (self._safe_text(trace_id, ""), now_text, key),
            )
            conn.commit()
            return {"proceed": True, "state": "RESUMED"}
        except sqlite3.Error:
            try:
                conn.rollback()
            except sqlite3.Error:
                pass
            raise
        finally:
            conn.close()

    def mark_idempotency_success(self, *, idempotency_key: str, transaksi_id: int, trace_id: str = "") -> None:
        now_text = self._now_text()
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                UPDATE transaksi_idempotency
                SET status = 'SUCCESS',
                    transaksi_id = ?,
                    trace_id = ?,
                    error_code = NULL,
                    error_reason = NULL,
                    updated_at = ?
                WHERE idempotency_key = ?
                """,
                (
                    max(0, int(transaksi_id or 0)),
                    self._safe_text(trace_id, ""),
                    now_text,
                    self._safe_text(idempotency_key, ""),
                ),
            )
            conn.commit()
        finally:
            conn.close()

    def mark_idempotency_failed(
        self,
        *,
        idempotency_key: str,
        error_code: str = "",
        reason: str = "",
        trace_id: str = "",
    ) -> None:
        now_text = self._now_text()
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                UPDATE transaksi_idempotency
                SET status = 'FAILED',
                    trace_id = ?,
                    error_code = ?,
                    error_reason = ?,
                    updated_at = ?
                WHERE idempotency_key = ?
                """,
                (
                    self._safe_text(trace_id, ""),
                    self._safe_text(error_code, "")[:120],
                    self._safe_text(reason, "")[:300],
                    now_text,
                    self._safe_text(idempotency_key, ""),
                ),
            )
            conn.commit()
        finally:
            conn.close()

    def _append_immutable_row(
        self,
        *,
        table_name: str,
        payload: Dict[str, Any],
        row_hash_source: str,
    ) -> int:
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(f"SELECT row_hash FROM {table_name} ORDER BY id DESC LIMIT 1")
            row = cur.fetchone()
            prev_hash = self._safe_text((row["row_hash"] if row else ""), "")
            row_hash = self._sha256(f"{prev_hash}|{row_hash_source}")
            payload_json = self._safe_json(payload)
            if table_name == "transaksi_persist_audit":
                cur.execute(
                    """
                    INSERT INTO transaksi_persist_audit (
                        event_dtime, trace_id, idempotency_key, event_type,
                        transaksi_id, status, error_code, reason, payload_json, prev_hash, row_hash
                    )
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """,
                    (
                        payload.get("event_dtime"),
                        payload.get("trace_id"),
                        payload.get("idempotency_key"),
                        payload.get("event_type"),
                        int(payload.get("transaksi_id") or 0),
                        payload.get("status"),
                        payload.get("error_code"),
                        payload.get("reason"),
                        payload_json,
                        prev_hash,
                        row_hash,
                    ),
                )
            else:
                cur.execute(
                    """
                    INSERT INTO transaksi_approval_trail (
                        event_dtime, action_name, actor_name, approval_name,
                        approval_status, trace_id, payload_json, prev_hash, row_hash
                    )
                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """,
                    (
                        payload.get("event_dtime"),
                        payload.get("action_name"),
                        payload.get("actor_name"),
                        payload.get("approval_name"),
                        payload.get("approval_status"),
                        payload.get("trace_id"),
                        payload_json,
                        prev_hash,
                        row_hash,
                    ),
                )
            conn.commit()
            return int(cur.lastrowid or 0)
        finally:
            conn.close()

    def append_persist_audit(
        self,
        *,
        event_type: str,
        status: str,
        idempotency_key: str,
        trace_id: str = "",
        transaksi_id: int = 0,
        error_code: str = "",
        reason: str = "",
        payload: Dict[str, Any] = None,
    ) -> int:
        event_dtime = self._now_text()
        row_payload = {
            "event_dtime": event_dtime,
            "trace_id": self._safe_text(trace_id, ""),
            "idempotency_key": self._safe_text(idempotency_key, ""),
            "event_type": self._safe_text(event_type, "").upper(),
            "transaksi_id": int(transaksi_id or 0),
            "status": self._safe_text(status, ""),
            "error_code": self._safe_text(error_code, ""),
            "reason": self._safe_text(reason, ""),
            "payload": payload if isinstance(payload, dict) else {},
        }
        hash_source = self._safe_json(row_payload)
        return self._append_immutable_row(
            table_name="transaksi_persist_audit",
            payload=row_payload,
            row_hash_source=hash_source,
        )

    def record_approval_trail(
        self,
        *,
        action_name: str,
        actor_name: str = "",
        approval_name: str = "",
        approval_status: str = "",
        trace_id: str = "",
        payload: Dict[str, Any] = None,
    ) -> int:
        event_dtime = self._now_text()
        row_payload = {
            "event_dtime": event_dtime,
            "action_name": self._safe_text(action_name, ""),
            "actor_name": self._safe_text(actor_name, ""),
            "approval_name": self._safe_text(approval_name, ""),
            "approval_status": self._safe_text(approval_status, ""),
            "trace_id": self._safe_text(trace_id, ""),
            "payload": payload if isinstance(payload, dict) else {},
        }
        hash_source = self._safe_json(row_payload)
        return self._append_immutable_row(
            table_name="transaksi_approval_trail",
            payload=row_payload,
            row_hash_source=hash_source,
        )

    def _reconcile_stale_inprogress(self) -> int:
        cutoff = datetime.now() - timedelta(seconds=int(self.stale_inprogress_seconds))
        cutoff_text = cutoff.strftime("%Y-%m-%d %H:%M:%S")
        now_text = self._now_text()
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                UPDATE transaksi_idempotency
                SET status = 'FAILED',
                    error_code = 'TRX_IDEMPOTENCY_STALE',
                    error_reason = 'stale_in_progress',
                    updated_at = ?
                WHERE status = 'IN_PROGRESS'
                  AND updated_at <= ?
                """,
                (now_text, cutoff_text),
            )
            conn.commit()
            return int(cur.rowcount or 0)
        finally:
            conn.close()

    def run_reconciliation(self, *, retention_policy: Dict[str, Any] = None) -> Dict[str, Any]:
        stale_failed = self._reconcile_stale_inprogress()
        outbox_service = FreeProdukSyncOutboxService(db_path=self.db_path)
        recovered_stale_outbox = int(outbox_service.recover_stale_inflight_leases())
        summary = {
            "reconcile_dtime": self._now_text(),
            "stale_idempotency_failed": int(stale_failed),
            "outbox_stale_recovered": int(recovered_stale_outbox),
            "outbox_pending": int(outbox_service.count_pending() or 0),
            "outbox_inflight": int(outbox_service.count_inflight() or 0),
            "outbox_dead": int(outbox_service.count_dead() or 0),
        }
        retention_cfg = retention_policy if isinstance(retention_policy, dict) else {}
        if bool(retention_cfg.get("enabled")):
            retention_result = self.purge_retention(
                idempotency_days=int(retention_cfg.get("idempotency_days") or 30),
                audit_days=int(retention_cfg.get("audit_days") or 90),
                approval_days=int(retention_cfg.get("approval_days") or 90),
                outbox_days=int(retention_cfg.get("outbox_days") or 30),
                purge_limit=int(retention_cfg.get("limit") or 500),
            )
            summary["retention"] = retention_result
        self.append_persist_audit(
            event_type="RECONCILIATION",
            status="ok",
            idempotency_key="reconciliation-job",
            trace_id="reconcile-job",
            payload=summary,
        )
        return summary
