import hashlib
import json
import os
import sqlite3
import uuid
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List

from pypos.core.utils.db_helper import connect_sqlite
from pypos.core.utils.path_utils import get_db_path
from pypos.core.utils.sql_query_builder import build_sql_with_identifier_in_clause

# edited by glg


class EventOutboxService:
    def __init__(self, db_path: str = None):
        self.db_path = str(db_path or get_db_path())
        self.ensure_schema()

    def _connect(self):
        conn = connect_sqlite(self.db_path)
        conn.row_factory = sqlite3.Row
        return conn

    @staticmethod
    def _utc_now():
        return datetime.now(timezone.utc)

    def _utc_now_text(self) -> str:
        return self._utc_now().strftime("%Y-%m-%d %H:%M:%S")

    @staticmethod
    def _dump_json(payload: Any) -> str:
        return json.dumps(payload, ensure_ascii=False, sort_keys=True, separators=(",", ":"))

    @staticmethod
    def _build_id_scoped_sql(sql_prefix: str, ids: List[int], sql_suffix: str = ""):
        return build_sql_with_identifier_in_clause(
            sql_prefix=sql_prefix,
            sql_suffix=sql_suffix,
            identifier="id",
            values=ids,
            cast_int=True,
            positive_only=True,
            unique=True,
        )

    def ensure_schema(self) -> None:
        os.makedirs(os.path.dirname(os.path.abspath(self.db_path)), exist_ok=True)
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                CREATE TABLE IF NOT EXISTS event_outbox (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    event_id TEXT NOT NULL UNIQUE,
                    dedup_key TEXT NOT NULL,
                    topic TEXT NOT NULL,
                    payload_json TEXT NOT NULL,
                    headers_json TEXT,
                    status TEXT NOT NULL DEFAULT 'PENDING',
                    attempt_count INTEGER NOT NULL DEFAULT 0,
                    available_at TEXT NOT NULL,
                    lease_token TEXT,
                    lease_until TEXT,
                    error_code TEXT,
                    error_message TEXT,
                    source_id TEXT,
                    created_at TEXT NOT NULL,
                    updated_at TEXT NOT NULL
                )
                """
            )
            cur.execute(
                """
                CREATE UNIQUE INDEX IF NOT EXISTS ux_event_outbox_dedup
                ON event_outbox(dedup_key)
                """
            )
            cur.execute(
                """
                CREATE INDEX IF NOT EXISTS ix_event_outbox_status_available
                ON event_outbox(status, available_at, id)
                """
            )
            cur.execute(
                """
                CREATE INDEX IF NOT EXISTS ix_event_outbox_lease_until
                ON event_outbox(lease_until)
                """
            )
            conn.commit()
        finally:
            conn.close()

    def build_dedup_key(self, topic: str, payload: Dict, source_id: str = "") -> str:
        normalized_topic = str(topic or "").strip().lower()
        normalized_source = str(source_id or "").strip().lower()
        body = self._dump_json(payload if isinstance(payload, dict) else {"value": payload})
        raw = f"{normalized_topic}|{normalized_source}|{body}"
        return hashlib.sha256(raw.encode("utf-8")).hexdigest()

    def enqueue_event(
        self,
        *,
        topic: str,
        payload: Dict,
        source_id: str = "",
        dedup_key: str = "",
        headers: Dict = None,
    ) -> Dict[str, Any]:
        normalized_topic = str(topic or "").strip()
        if not normalized_topic:
            raise ValueError("topic wajib diisi")

        payload_json = self._dump_json(payload if isinstance(payload, dict) else {"value": payload})
        headers_json = self._dump_json(headers or {})
        dedup = str(dedup_key or "").strip() or self.build_dedup_key(
            normalized_topic,
            payload if isinstance(payload, dict) else {"value": payload},
            source_id=source_id,
        )
        now_text = self._utc_now_text()
        event_id = str(uuid.uuid4())

        conn = self._connect()
        try:
            cur = conn.cursor()
            try:
                cur.execute(
                    """
                    INSERT INTO event_outbox (
                        event_id, dedup_key, topic, payload_json, headers_json,
                        status, attempt_count, available_at, lease_token, lease_until,
                        error_code, error_message, source_id, created_at, updated_at
                    ) VALUES (?, ?, ?, ?, ?, 'PENDING', 0, ?, NULL, NULL, NULL, NULL, ?, ?, ?)
                    """,
                    (
                        event_id,
                        dedup,
                        normalized_topic,
                        payload_json,
                        headers_json,
                        now_text,
                        str(source_id or "").strip(),
                        now_text,
                        now_text,
                    ),
                )
                conn.commit()
                return {"inserted": True, "event_id": event_id, "dedup_key": dedup}
            except sqlite3.IntegrityError:
                cur.execute(
                    """
                    SELECT event_id, dedup_key, status, attempt_count
                    FROM event_outbox
                    WHERE dedup_key = ?
                    LIMIT 1
                    """,
                    (dedup,),
                )
                row = cur.fetchone()
                if not row:
                    return {"inserted": False, "event_id": "", "dedup_key": dedup}
                return {
                    "inserted": False,
                    "event_id": str(row["event_id"] or ""),
                    "dedup_key": str(row["dedup_key"] or dedup),
                    "status": str(row["status"] or ""),
                    "attempt_count": int(row["attempt_count"] or 0),
                }
        finally:
            conn.close()

    def get_queue_metrics(self) -> Dict[str, int]:
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                SELECT status, COUNT(1) AS total
                FROM event_outbox
                GROUP BY status
                """
            )
            grouped = {str(row["status"] or ""): int(row["total"] or 0) for row in (cur.fetchall() or [])}
            return {
                "pending": int(grouped.get("PENDING", 0) + grouped.get("FAILED", 0)),
                "inflight": int(grouped.get("INFLIGHT", 0)),
                "sent": int(grouped.get("SENT", 0)),
            }
        finally:
            conn.close()

    def release_expired_leases(self) -> int:
        now_text = self._utc_now_text()
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                UPDATE event_outbox
                SET status = 'FAILED',
                    lease_token = NULL,
                    error_code = COALESCE(error_code, 'LEASE_EXPIRED'),
                    error_message = COALESCE(error_message, 'lease timeout'),
                    updated_at = ?
                WHERE status = 'INFLIGHT'
                  AND lease_until IS NOT NULL
                  AND lease_until <= ?
                """,
                (now_text, now_text),
            )
            affected = int(cur.rowcount or 0)
            conn.commit()
            return affected
        finally:
            conn.close()

    def claim_batch(self, limit: int = 100, lease_seconds: int = 60, max_inflight: int = 1000) -> List[Dict]:
        batch_limit = max(1, int(limit or 1))
        lease_sec = max(10, int(lease_seconds or 60))
        inflight_cap = max(1, int(max_inflight or 1000))
        now_text = self._utc_now_text()
        lease_until = (self._utc_now() + timedelta(seconds=lease_sec)).strftime("%Y-%m-%d %H:%M:%S")
        lease_token = str(uuid.uuid4())

        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute("BEGIN IMMEDIATE")
            cur.execute("SELECT COUNT(1) AS total FROM event_outbox WHERE status='INFLIGHT'")
            inflight = int((cur.fetchone() or {"total": 0})["total"] or 0)
            if inflight >= inflight_cap:
                conn.rollback()
                return []

            effective_limit = min(batch_limit, max(0, inflight_cap - inflight))
            if effective_limit <= 0:
                conn.rollback()
                return []

            cur.execute(
                """
                SELECT id
                FROM event_outbox
                WHERE status IN ('PENDING', 'FAILED')
                  AND available_at <= ?
                ORDER BY id ASC
                LIMIT ?
                """,
                (now_text, effective_limit),
            )
            ids = [int(row["id"]) for row in (cur.fetchall() or [])]
            if not ids:
                conn.rollback()
                return []

            update_sql, update_params = self._build_id_scoped_sql(
                """
                UPDATE event_outbox
                SET status='INFLIGHT',
                    lease_token=?,
                    lease_until=?,
                    attempt_count=attempt_count+1,
                    updated_at=?
                WHERE
                """,
                ids,
            )
            cur.execute(
                update_sql,
                (lease_token, lease_until, now_text, *update_params),
            )
            select_sql, select_params = self._build_id_scoped_sql(
                """
                SELECT id, event_id, dedup_key, topic, payload_json, headers_json, status,
                       attempt_count, source_id, created_at, updated_at
                FROM event_outbox
                WHERE
                """,
                ids,
                sql_suffix="ORDER BY id ASC",
            )
            cur.execute(
                select_sql,
                select_params,
            )
            rows = cur.fetchall() or []
            conn.commit()

            out = []
            for row in rows:
                try:
                    payload = json.loads(str(row["payload_json"] or "{}"))
                except Exception:
                    payload = {}
                try:
                    headers = json.loads(str(row["headers_json"] or "{}"))
                except Exception:
                    headers = {}
                out.append(
                    {
                        "id": int(row["id"]),
                        "event_id": str(row["event_id"] or ""),
                        "dedup_key": str(row["dedup_key"] or ""),
                        "topic": str(row["topic"] or ""),
                        "payload": payload,
                        "headers": headers,
                        "status": str(row["status"] or ""),
                        "attempt_count": int(row["attempt_count"] or 0),
                        "source_id": str(row["source_id"] or ""),
                        "lease_token": lease_token,
                        "lease_until": lease_until,
                    }
                )
            return out
        finally:
            conn.close()

    def ack_events(self, event_ids: List[int]) -> int:
        ids = [int(x) for x in (event_ids or []) if int(x) > 0]
        if not ids:
            return 0
        now_text = self._utc_now_text()
        query, params = self._build_id_scoped_sql(
            """
            UPDATE event_outbox
            SET status='SENT',
                lease_token=NULL,
                lease_until=NULL,
                error_code=NULL,
                error_message=NULL,
                updated_at=?
            WHERE
            """,
            ids,
        )
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                query,
                (now_text, *params),
            )
            affected = int(cur.rowcount or 0)
            conn.commit()
            return affected
        finally:
            conn.close()

    def fail_events(self, event_ids: List[int], error_code: str = "", error_message: str = "", retry_delay_seconds: int = 60) -> int:
        ids = [int(x) for x in (event_ids or []) if int(x) > 0]
        if not ids:
            return 0
        delay = max(5, int(retry_delay_seconds or 60))
        now_dt = self._utc_now()
        now_text = now_dt.strftime("%Y-%m-%d %H:%M:%S")
        next_text = (now_dt + timedelta(seconds=delay)).strftime("%Y-%m-%d %H:%M:%S")
        query, params = self._build_id_scoped_sql(
            """
            UPDATE event_outbox
            SET status='FAILED',
                lease_token=NULL,
                lease_until=NULL,
                available_at=?,
                error_code=?,
                error_message=?,
                updated_at=?
            WHERE
            """,
            ids,
        )
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                query,
                (
                    next_text,
                    str(error_code or "").strip(),
                    str(error_message or "").strip(),
                    now_text,
                    *params,
                ),
            )
            affected = int(cur.rowcount or 0)
            conn.commit()
            return affected
        finally:
            conn.close()

    def purge_sent(self, retention_days: int = 7, limit: int = 2000) -> int:
        keep_days = max(1, int(retention_days or 7))
        purge_limit = max(1, int(limit or 2000))
        cutoff_text = (self._utc_now() - timedelta(days=keep_days)).strftime("%Y-%m-%d %H:%M:%S")
        conn = self._connect()
        try:
            cur = conn.cursor()
            cur.execute(
                """
                SELECT id
                FROM event_outbox
                WHERE status='SENT'
                  AND updated_at <= ?
                ORDER BY id ASC
                LIMIT ?
                """,
                (cutoff_text, purge_limit),
            )
            ids = [int(row["id"]) for row in (cur.fetchall() or [])]
            if not ids:
                return 0
            query, params = self._build_id_scoped_sql(
                "DELETE FROM event_outbox WHERE",
                ids,
            )
            cur.execute(
                query,
                params,
            )
            affected = int(cur.rowcount or 0)
            conn.commit()
            return affected
        finally:
            conn.close()
