# edited by glg
import hashlib
import logging
import time
from typing import Any, Callable, Dict, List

from pypos.modules.sinkronisasi.services.event_ingestion_gateway_service import (
    EventIngestionGatewayService,
)
from pypos.modules.sinkronisasi.services.event_outbox_service import EventOutboxService

LOGGER = logging.getLogger(__name__)


class EventFirstExportRuntimeService:
    TOPIC_EXPORT_CYCLE_TRIGGER = "export_cycle.trigger.v1"
    _NON_FATAL_EXCEPTIONS = (
        TypeError,
        ValueError,
        KeyError,
        AttributeError,
        RuntimeError,
        OSError,
        LookupError,
        ArithmeticError,
        ImportError,
    )

    def __init__(self, gateway_service=None, outbox_service=None, logger=None, now_fn=None):
        self.gateway_service = gateway_service or EventIngestionGatewayService()
        self.outbox_service = outbox_service or EventOutboxService()
        self.logger = logger or LOGGER
        self.now_fn = now_fn or time.time

    @staticmethod
    def _to_int(value: Any, default: int = 0, minimum: int = 0) -> int:
        try:
            parsed = int(value)
        except (TypeError, ValueError):
            parsed = int(default)
        return max(int(minimum), int(parsed))

    @staticmethod
    def _to_bool(value: Any, default: bool = False) -> bool:
        if isinstance(value, bool):
            return bool(value)
        if value is None:
            return bool(default)
        if isinstance(value, (int, float)):
            return float(value) > 0.0
        text = str(value).strip().lower()
        if text in {"1", "true", "yes", "on"}:
            return True
        if text in {"0", "false", "no", "off"}:
            return False
        return bool(default)

    @staticmethod
    def _to_text(value: Any, default: str = "") -> str:
        text = str(value or "").strip()
        return text or str(default or "").strip()

    def _now_bucket(self, window_seconds: int) -> int:
        window = max(1, int(window_seconds or 1))
        try:
            now_ts = float(self.now_fn())
        except self._NON_FATAL_EXCEPTIONS:
            now_ts = float(time.time())
        return int(now_ts // float(window))

    @staticmethod
    def _empty_cycle_result(skipped: str = "") -> Dict[str, Any]:
        return {
            "ran": False,
            "skipped": str(skipped or "").strip(),
            "guard_source": "",
            "guard_started_at": 0.0,
            "exported_rows": 0,
            "uploaded": 0,
            "failed": 0,
            "retried": 0,
            "cleaned_files": 0,
            "cleaned_empty_flux": 0,
            "retry_items": [],
            "failed_items": [],
            "requeued_failed_transient": 0,
            "skipped_upload": "",
            "suppressed_direct_only": 0,
            "suppressed_table_toggle": 0,
        }

    @staticmethod
    def _merge_cycle_results(base: Dict[str, Any], current: Dict[str, Any]) -> Dict[str, Any]:
        merged = dict(base or {})
        payload = current if isinstance(current, dict) else {}
        merged["ran"] = bool(merged.get("ran")) or bool(payload.get("ran"))

        for key in (
            "exported_rows",
            "uploaded",
            "failed",
            "retried",
            "cleaned_files",
            "cleaned_empty_flux",
            "requeued_failed_transient",
            "suppressed_direct_only",
            "suppressed_table_toggle",
        ):
            merged[key] = int(merged.get(key) or 0) + int(payload.get(key) or 0)

        for list_key in ("retry_items", "failed_items"):
            merged_list = list(merged.get(list_key) or [])
            current_list = list(payload.get(list_key) or [])
            for item in current_list:
                if len(merged_list) >= 20:
                    break
                merged_list.append(item)
            merged[list_key] = merged_list

        skipped = str(payload.get("skipped") or "").strip()
        if skipped and not str(merged.get("skipped") or "").strip():
            merged["skipped"] = skipped

        skipped_upload = str(payload.get("skipped_upload") or "").strip()
        if skipped_upload and not str(merged.get("skipped_upload") or "").strip():
            merged["skipped_upload"] = skipped_upload

        guard_source = str(payload.get("guard_source") or "").strip()
        if guard_source:
            merged["guard_source"] = guard_source
        guard_started_at = payload.get("guard_started_at")
        if guard_started_at not in (None, ""):
            try:
                guard_started_at_numeric = guard_started_at if isinstance(guard_started_at, (int, float, str)) else 0.0
                merged["guard_started_at"] = float(guard_started_at_numeric)
            except (TypeError, ValueError):
                merged["guard_started_at"] = float(merged.get("guard_started_at") or 0.0)
        return merged

    def _build_dedup_key(
        self,
        *,
        source: str,
        dedup_window_seconds: int,
        skip_upload: bool,
        upload_limit_override: Any,
        requeue_failed_transient: bool,
        upload_timeout_override: Any,
    ) -> str:
        bucket = self._now_bucket(dedup_window_seconds)
        source_norm = self._to_text(source, "timer").lower()
        raw = (
            f"{self.TOPIC_EXPORT_CYCLE_TRIGGER}|{source_norm}|{bucket}|"
            f"{int(bool(skip_upload))}|{self._to_text(upload_limit_override)}|"
            f"{int(bool(requeue_failed_transient))}|{self._to_text(upload_timeout_override)}"
        )
        return hashlib.sha256(raw.encode("utf-8")).hexdigest()

    def enqueue_trigger(
        self,
        *,
        source: str,
        source_id: str,
        metrics: Dict[str, Any],
        skip_upload: bool,
        upload_limit_override: Any,
        requeue_failed_transient: bool,
        upload_timeout_override: Any,
        dedup_window_seconds: int = 5,
    ) -> Dict[str, Any]:
        dedup_key = self._build_dedup_key(
            source=source,
            dedup_window_seconds=dedup_window_seconds,
            skip_upload=skip_upload,
            upload_limit_override=upload_limit_override,
            requeue_failed_transient=requeue_failed_transient,
            upload_timeout_override=upload_timeout_override,
        )
        event_payload = {
            "source": self._to_text(source, "timer"),
            "skip_upload": bool(skip_upload),
            "upload_limit_override": upload_limit_override,
            "requeue_failed_transient": bool(requeue_failed_transient),
            "upload_timeout_override": upload_timeout_override,
            "requested_at_unix": int(time.time()),
            "dedup_key": dedup_key,
        }
        try:
            result = self.gateway_service.ingest_events(
                topic=self.TOPIC_EXPORT_CYCLE_TRIGGER,
                events=[event_payload],
                source_id=self._to_text(source_id, ""),
                metrics=metrics if isinstance(metrics, dict) else {},
            )
            payload = result if isinstance(result, dict) else {}
            payload["dedup_key"] = dedup_key
            return payload
        except self._NON_FATAL_EXCEPTIONS as exc:
            return {
                "accepted": False,
                "reason": "ingestion_runtime_error",
                "mode": "error",
                "ingested_count": 0,
                "duplicate_count": 0,
                "invalid_count": 1,
                "dropped_count": 1,
                "total_events": 1,
                "recommended_batch_size": 1,
                "error": str(exc or ""),
                "dedup_key": dedup_key,
            }

    def _invoke_cycle_handler(self, cycle_handler: Callable, payload: Dict[str, Any]) -> Dict[str, Any]:
        if not callable(cycle_handler):
            raise ValueError("cycle_handler wajib callable.")
        event_payload = payload if isinstance(payload, dict) else {}
        source = self._to_text(event_payload.get("source"), "timer")
        skip_upload = self._to_bool(event_payload.get("skip_upload"), False)
        requeue_failed_transient = self._to_bool(event_payload.get("requeue_failed_transient"), False)
        upload_limit_override = event_payload.get("upload_limit_override")
        upload_timeout_override = event_payload.get("upload_timeout_override")

        try:
            result = cycle_handler(
                source=source,
                skip_upload=skip_upload,
                upload_limit_override=upload_limit_override,
                requeue_failed_transient=requeue_failed_transient,
                upload_timeout_override=upload_timeout_override,
            )
        except TypeError:
            result = cycle_handler(event_payload)
        if isinstance(result, dict):
            return dict(result)
        return self._empty_cycle_result(skipped="invalid_cycle_handler_result")

    def drain_triggers(
        self,
        *,
        cycle_handler: Callable,
        claim_limit: int = 100,
        lease_seconds: int = 60,
        max_inflight: int = 1000,
        retry_delay_seconds: int = 30,
        purge_sent_days: int = 7,
        purge_limit: int = 2000,
        release_expired_each_cycle: bool = True,
    ) -> Dict[str, Any]:
        released_expired = 0
        claimed = []
        ack_ids: List[int] = []
        fail_ids: List[int] = []
        aggregate = self._empty_cycle_result(skipped="")

        if bool(release_expired_each_cycle):
            try:
                released_expired = int(self.outbox_service.release_expired_leases() or 0)
            except self._NON_FATAL_EXCEPTIONS as exc:
                self.logger.warning("[EVENT_FIRST] release_expired_leases gagal: %s", exc)

        try:
            claimed = self.outbox_service.claim_batch(
                limit=max(1, int(claim_limit or 1)),
                lease_seconds=max(10, int(lease_seconds or 10)),
                max_inflight=max(1, int(max_inflight or 1)),
            )
            claimed = claimed if isinstance(claimed, list) else []
        except self._NON_FATAL_EXCEPTIONS as exc:
            self.logger.warning("[EVENT_FIRST] claim_batch gagal: %s", exc)
            return {
                "released_expired": int(released_expired),
                "claimed": 0,
                "processed": 0,
                "acked": 0,
                "failed": 0,
                "failed_unsupported_topic": 0,
                "failed_runtime_error": 0,
                "purged_sent": 0,
                "cycle_result": self._empty_cycle_result(skipped="claim_batch_error"),
            }

        failed_unsupported_topic = 0
        failed_runtime_error = 0
        processed = 0
        for item in claimed:
            row = item if isinstance(item, dict) else {}
            row_id = self._to_int(row.get("id"), default=0, minimum=0)
            if row_id <= 0:
                continue
            topic = self._to_text(row.get("topic"), "")
            if topic != self.TOPIC_EXPORT_CYCLE_TRIGGER:
                fail_ids.append(row_id)
                failed_unsupported_topic += 1
                continue
            try:
                payload_obj = row.get("payload")
                result = self._invoke_cycle_handler(
                    cycle_handler,
                    payload_obj if isinstance(payload_obj, dict) else {},
                )
                aggregate = self._merge_cycle_results(aggregate, result)
                ack_ids.append(row_id)
                processed += 1
            except self._NON_FATAL_EXCEPTIONS as exc:
                fail_ids.append(row_id)
                failed_runtime_error += 1
                self.logger.warning("[EVENT_FIRST] proses event trigger gagal (row_id=%s): %s", row_id, exc)

        acked = 0
        if ack_ids:
            try:
                acked = int(self.outbox_service.ack_events(ack_ids) or 0)
            except self._NON_FATAL_EXCEPTIONS as exc:
                self.logger.warning("[EVENT_FIRST] ack_events gagal: %s", exc)
                acked = 0

        failed = 0
        if fail_ids:
            try:
                failed = int(
                    self.outbox_service.fail_events(
                        fail_ids,
                        error_code="EVENT_FIRST_RUNTIME_FAIL",
                        error_message="event_first_export_runtime_failed",
                        retry_delay_seconds=max(5, int(retry_delay_seconds or 5)),
                    )
                    or 0
                )
            except self._NON_FATAL_EXCEPTIONS as exc:
                self.logger.warning("[EVENT_FIRST] fail_events gagal: %s", exc)
                failed = 0

        purged_sent = 0
        try:
            purged_sent = int(
                self.outbox_service.purge_sent(
                    retention_days=max(1, int(purge_sent_days or 1)),
                    limit=max(1, int(purge_limit or 1)),
                )
                or 0
            )
        except self._NON_FATAL_EXCEPTIONS as exc:
            self.logger.warning("[EVENT_FIRST] purge_sent gagal: %s", exc)
            purged_sent = 0

        return {
            "released_expired": int(released_expired),
            "claimed": int(len(claimed)),
            "processed": int(processed),
            "acked": int(acked),
            "failed": int(failed),
            "failed_unsupported_topic": int(failed_unsupported_topic),
            "failed_runtime_error": int(failed_runtime_error),
            "purged_sent": int(purged_sent),
            "cycle_result": aggregate,
        }

    def run_trigger_cycle(
        self,
        *,
        source: str,
        source_id: str,
        cycle_handler: Callable,
        metrics: Dict[str, Any] = None,
        skip_upload: bool = False,
        upload_limit_override: Any = None,
        requeue_failed_transient: bool = False,
        upload_timeout_override: Any = None,
        dedup_window_seconds: int = 5,
        claim_limit: int = 100,
        lease_seconds: int = 60,
        max_inflight: int = 1000,
        retry_delay_seconds: int = 30,
        purge_sent_days: int = 7,
        purge_limit: int = 2000,
        release_expired_each_cycle: bool = True,
    ) -> Dict[str, Any]:
        enqueue = self.enqueue_trigger(
            source=source,
            source_id=source_id,
            metrics=metrics if isinstance(metrics, dict) else {},
            skip_upload=bool(skip_upload),
            upload_limit_override=upload_limit_override,
            requeue_failed_transient=bool(requeue_failed_transient),
            upload_timeout_override=upload_timeout_override,
            dedup_window_seconds=max(1, int(dedup_window_seconds or 1)),
        )
        drain = self.drain_triggers(
            cycle_handler=cycle_handler,
            claim_limit=max(1, int(claim_limit or 1)),
            lease_seconds=max(10, int(lease_seconds or 10)),
            max_inflight=max(1, int(max_inflight or 1)),
            retry_delay_seconds=max(5, int(retry_delay_seconds or 5)),
            purge_sent_days=max(1, int(purge_sent_days or 1)),
            purge_limit=max(1, int(purge_limit or 1)),
            release_expired_each_cycle=bool(release_expired_each_cycle),
        )
        cycle_result = dict(drain.get("cycle_result") or self._empty_cycle_result(skipped=""))

        if not bool(cycle_result.get("ran")) and int(drain.get("processed") or 0) <= 0:
            if not bool(enqueue.get("accepted")):
                cycle_result["skipped"] = self._to_text(
                    enqueue.get("reason") or enqueue.get("mode"),
                    "event_ingestion_blocked",
                )
            elif int(enqueue.get("ingested_count") or 0) <= 0:
                cycle_result["skipped"] = "event_dedup_duplicate"
            else:
                cycle_result["skipped"] = "event_enqueued_not_claimed"

        return {
            "enqueue": enqueue,
            "drain": drain,
            "cycle_result": cycle_result,
        }
