import threading
from types import SimpleNamespace

import pytest

from pypos.core.utils import worker_pool_utils
from pypos.modules.penjualan.controllers.transaksi_penjualan_controller import (
    TransaksiPenjualanController,
)
from pypos.modules.penjualan.services.free_produk_outbox_alert_service import (
    FreeProdukOutboxAlertService,
)

# edited by glg
pytestmark = [pytest.mark.unit]


class _PerfStub:
    @staticmethod
    def mark():
        return 1.0

    @staticmethod
    def record(*_args, **_kwargs):
        return None


def _new_controller_stub():
    ctrl = TransaksiPenjualanController.__new__(TransaksiPenjualanController)
    ctrl._autocomplete_request_lock = threading.Lock()
    ctrl._autocomplete_request_seq = 0
    ctrl._settlement_request_lock = threading.Lock()
    ctrl._settlement_request_seq = 0
    ctrl.performance_service = _PerfStub()
    ctrl.log_warning = lambda *_args, **_kwargs: None
    ctrl.barang_mapping = {}
    return ctrl


def test_autocomplete_payload_guard_hanya_terima_request_terbaru():
    ctrl = _new_controller_stub()
    calls = []
    ctrl.view = SimpleNamespace(
        set_barang_autocomplete=lambda rows: calls.append(list(rows)),
    )

    ctrl._autocomplete_request_seq = 2
    ctrl._on_autocomplete_payload_ready(
        1,
        {
            "keyword": "kopi",
            "barang_list": ["kopi - 10000"],
            "mapping": {"kopi - 10000": {"id": 1}},
            "lookup_started": 1.0,
            "ok": True,
        },
    )
    assert calls == []
    assert ctrl.barang_mapping == {}

    ctrl._on_autocomplete_payload_ready(
        2,
        {
            "keyword": "kopi",
            "barang_list": ["kopi - 10000"],
            "mapping": {"kopi - 10000": {"id": 1}},
            "lookup_started": 1.0,
            "ok": True,
        },
    )
    assert calls == [["kopi - 10000"]]
    assert "kopi - 10000" in ctrl.barang_mapping


def test_settlement_payload_guard_hanya_terima_request_terbaru():
    ctrl = _new_controller_stub()
    settlement_calls = []
    ctrl.settlement_result = lambda data, has_other_kasir=False, current_kasir=None: settlement_calls.append(
        {
            "data": list(data or []),
            "has_other_kasir": bool(has_other_kasir),
            "current_kasir": current_kasir,
        }
    )

    ctrl._settlement_request_seq = 3
    ctrl._on_settlement_payload_ready(
        2,
        {
            "data": [{"id": 10}],
            "has_other_kasir": True,
            "current_kasir": "kasir-a",
        },
    )
    assert settlement_calls == []

    ctrl._on_settlement_payload_ready(
        3,
        {
            "data": [{"id": 11}],
            "has_other_kasir": False,
            "current_kasir": "kasir-b",
        },
    )
    assert len(settlement_calls) == 1
    assert settlement_calls[0]["data"] == [{"id": 11}]


def test_settlement_check_inflight_rilis_saat_queue_fast_penuh():
    ctrl = _new_controller_stub()
    ctrl._settlement_check_inflight = False
    ctrl.model_settle = SimpleNamespace(cek_transaksi_settlement=lambda **_kwargs: [])
    ctrl.user_info = {"nama": "kasir"}
    ctrl.transaksi_payment_state_service = SimpleNamespace(
        detect_other_kasir=lambda *_args, **_kwargs: False
    )
    ctrl.view = SimpleNamespace()

    original_sem = worker_pool_utils._UI_POOL_PENDING_SEMAPHORES.get("fast")
    try:
        worker_pool_utils._UI_POOL_PENDING_SEMAPHORES["fast"] = threading.Semaphore(0)
        ctrl._run_async_settlement_status_check(
            request_id=1,
            include_today=False,
            enforce_kasir=False,
        )
        assert ctrl._settlement_check_inflight is False
    finally:
        if original_sem is not None:
            worker_pool_utils._UI_POOL_PENDING_SEMAPHORES["fast"] = original_sem


def test_free_produk_outbox_drain_single_flight():
    ctrl = _new_controller_stub()
    ctrl._free_produk_drain_lock = threading.Lock()
    ctrl._free_produk_drain_inflight = False
    ctrl._free_produk_outbox_notice_ts = 0.0
    ctrl.log_debug = lambda *_args, **_kwargs: None
    ctrl.log_info = lambda *_args, **_kwargs: None
    ctrl.log_warning = lambda *_args, **_kwargs: None
    ctrl.free_produk_outbox_service = SimpleNamespace(
        count_pending=lambda: 0,
        count_inflight=lambda: 0,
        count_dead=lambda: 0,
    )

    run_calls = []

    def _run(task, *args, on_done=None, on_error=None, **kwargs):
        run_calls.append(
            {
                "task": task,
                "args": args,
                "on_done": on_done,
                "on_error": on_error,
            }
        )
        _ = kwargs
        return object()

    ctrl.background_task_service = SimpleNamespace(run=_run)

    first = ctrl._drain_free_produk_outbox_async(max_items=3)
    second = ctrl._drain_free_produk_outbox_async(max_items=3)

    assert first is True
    assert second is False
    assert len(run_calls) == 1

    done_callback = run_calls[0]["on_done"]
    assert callable(done_callback)
    done_callback(1)
    assert ctrl._free_produk_drain_inflight is False


def test_free_produk_outbox_dead_memicu_channel_alert_aktif():
    ctrl = _new_controller_stub()
    ctrl._free_produk_outbox_notice_ts = 0.0
    ctrl._free_produk_dead_alert_notice_ts = 0.0
    ctrl.free_produk_outbox_alert_service = FreeProdukOutboxAlertService()

    warning_logs = []
    toast_calls = []
    ctrl.log_info = lambda *_args, **_kwargs: None
    ctrl.log_warning = lambda message: warning_logs.append(str(message))
    ctrl.view = SimpleNamespace(
        show_toast=lambda message, duration_ms=1400, level="info": toast_calls.append(
            {"message": str(message), "duration_ms": int(duration_ms), "level": str(level)}
        )
    )
    ctrl.parent_window = None
    ctrl.free_produk_outbox_service = SimpleNamespace(
        count_pending=lambda: 0,
        count_inflight=lambda: 0,
        count_dead=lambda: 1,
        build_dead_letter_alert=lambda limit=3: {
            "dead_count": 1,
            "top_items_text": "id=11:code=HTTP_500:attempt=8",
            "top_items": [{"id": 11}],
            "limit": int(limit),
        },
    )

    ctrl._log_free_produk_outbox_backlog()

    assert toast_calls
    assert toast_calls[0]["level"] == "warning"
    assert any("OUTBOX_DEAD" in message for message in warning_logs)
