# edited by glg
import threading
import time
import unittest

from pypos.core.utils import worker_pool_utils
from pypos.core.utils.worker_pool_utils import (
    submit_ui_periodic_task,
    submit_ui_periodic_task_keyed,
    submit_ui_query_task,
    submit_ui_query_task_keyed,
)
from pypos.modules.auth.controllers.login_controller import LoginController
from pypos.modules.dashboard.controllers.dashboard_controller import DashboardController


class WorkerPoolAndDashboardAsyncTests(unittest.TestCase):
    def test_submit_ui_query_task_keyed_skips_stale_task(self):
        blockers_done = threading.Event()
        release_blockers = threading.Event()
        results = []
        result_lock = threading.Lock()

        def _blocker():
            release_blockers.wait(timeout=2.0)
            blockers_done.set()
            return True

        def _record(value):
            with result_lock:
                results.append(str(value))
            return value

        # Isi semua worker agar task keyed masuk antrean.
        blockers = [submit_ui_query_task(_blocker) for _ in range(4)]
        # Dua task dengan key sama: task lama harus skip, task terbaru yang jalan.
        stale_future = submit_ui_query_task_keyed("demo-key", _record, "stale")
        latest_future = submit_ui_query_task_keyed("demo-key", _record, "latest")

        release_blockers.set()
        for future in blockers:
            future.result(timeout=3.0)
        _ = blockers_done.is_set()
        stale_future.result(timeout=3.0)
        latest_future.result(timeout=3.0)

        self.assertEqual(results, ["latest"])

    def test_settlement_direct_trigger_is_non_blocking_for_caller_thread(self):
        controller = DashboardController.__new__(DashboardController)
        controller.config_service = type(
            "_Cfg",
            (),
            {"get_settlement_delivery_mode": staticmethod(lambda: "dual")},
        )()
        controller._settlement_delivery_lock = threading.Lock()
        controller._settlement_direct_inflight = False
        controller.log_info = lambda *_args, **_kwargs: None
        controller.log_warning = lambda *_args, **_kwargs: None
        controller._normalize_settlement_counter = lambda payload=None: str((payload or {}).get("counter") or "")
        controller._mark_settlement_unsent = lambda **_kwargs: True
        controller._clear_settlement_unsent = lambda *_args, **_kwargs: True

        finished = threading.Event()

        class _Service:
            @staticmethod
            def enabled_getter():
                return True

            @staticmethod
            def send_settlement(_payload):
                finished.set()
                return {
                    "attempted": True,
                    "ok": True,
                    "status_code": 200,
                    "idempotency_key": "idem-test",
                }

        controller._get_settlement_direct_service = lambda: _Service()

        # Simulasikan probe jaringan lambat; harus berjalan di worker thread.
        def _slow_gate(*_args, **_kwargs):
            time.sleep(0.25)
            return {"allow": True, "state": "ONLINE_STABLE", "reason": "online_stable"}

        controller._can_run_network_operation = _slow_gate

        started_at = time.perf_counter()
        triggered = controller._trigger_settlement_direct_async({"counter": "ST-TEST-001"})
        elapsed_ms = (time.perf_counter() - started_at) * 1000.0

        self.assertTrue(triggered)
        # Jika masih blocking di caller thread, nilai ini akan > 250ms.
        self.assertLess(elapsed_ms, 140.0)
        self.assertTrue(finished.wait(timeout=2.0))

    def test_submit_ui_periodic_task_keyed_skips_stale_task(self):
        blockers_done = threading.Event()
        release_blockers = threading.Event()
        results = []
        result_lock = threading.Lock()

        def _blocker():
            release_blockers.wait(timeout=2.0)
            blockers_done.set()
            return True

        def _record(value):
            with result_lock:
                results.append(str(value))
            return value

        blockers = [submit_ui_periodic_task(_blocker) for _ in range(2)]
        stale_future = submit_ui_periodic_task_keyed("demo-periodic", _record, "stale")
        latest_future = submit_ui_periodic_task_keyed("demo-periodic", _record, "latest")

        release_blockers.set()
        for future in blockers:
            future.result(timeout=3.0)
        _ = blockers_done.is_set()
        stale_future.result(timeout=3.0)
        latest_future.result(timeout=3.0)

        self.assertEqual(results, ["latest"])

    def test_submit_ui_task_drop_saat_queue_penuh(self):
        original_sem = worker_pool_utils._UI_POOL_PENDING_SEMAPHORES.get("periodic")
        try:
            worker_pool_utils._UI_POOL_PENDING_SEMAPHORES["periodic"] = threading.Semaphore(0)
            future = submit_ui_periodic_task(lambda: "should_not_run")
            self.assertIsNone(future.result(timeout=1.0))
        finally:
            if original_sem is not None:
                worker_pool_utils._UI_POOL_PENDING_SEMAPHORES["periodic"] = original_sem

    def test_login_controller_probe_inflight_rilis_saat_queue_periodic_penuh(self):
        original_sem = worker_pool_utils._UI_POOL_PENDING_SEMAPHORES.get("periodic")
        try:
            worker_pool_utils._UI_POOL_PENDING_SEMAPHORES["periodic"] = threading.Semaphore(0)
            controller = LoginController.__new__(LoginController)
            controller._network_probe_lock = threading.Lock()
            controller._network_probe_inflight = False
            controller.login_view = None
            controller._last_network_gate = {}
            controller.login_network_policy_service = type(
                "_Policy",
                (),
                {"evaluate": staticmethod(lambda: {})},
            )()

            triggered = controller._dispatch_network_policy_refresh_async(
                source="unit-test",
                force_probe=True,
            )
            self.assertTrue(triggered)
            self.assertFalse(controller._network_probe_inflight)
        finally:
            if original_sem is not None:
                worker_pool_utils._UI_POOL_PENDING_SEMAPHORES["periodic"] = original_sem

    def test_dashboard_startup_probe_inflight_rilis_saat_queue_periodic_penuh(self):
        original_sem = worker_pool_utils._UI_POOL_PENDING_SEMAPHORES.get("periodic")
        try:
            worker_pool_utils._UI_POOL_PENDING_SEMAPHORES["periodic"] = threading.Semaphore(0)
            controller = DashboardController.__new__(DashboardController)
            controller._export_dispatch_lock = threading.Lock()
            controller._export_startup_gate_probe_inflight = False

            triggered = controller._dispatch_startup_export_gate_probe_async()
            self.assertTrue(triggered)
            self.assertFalse(controller._export_startup_gate_probe_inflight)
        finally:
            if original_sem is not None:
                worker_pool_utils._UI_POOL_PENDING_SEMAPHORES["periodic"] = original_sem


if __name__ == "__main__":
    unittest.main()
