# edited by glg
# Shared bounded worker pools untuk task UI background.
# Tujuan:
# - hindari thread-per-event (fan-out tidak terkendali)
# - stabilkan latency klik/hover/menu saat event burst.
import logging
import threading
from concurrent.futures import Future, ThreadPoolExecutor, CancelledError, InvalidStateError


LOGGER = logging.getLogger(__name__)

# edited by glg
# Isolasi pool agar task periodik tidak mengganggu task interaksi user.
# - fast: aksi user (klik/filter/list refresh)
# - periodic: polling/probe berkala
# - preview: detail/preview read-only
_UI_FAST_QUERY_POOL = ThreadPoolExecutor(max_workers=4, thread_name_prefix="ui-fast")
_UI_PERIODIC_QUERY_POOL = ThreadPoolExecutor(max_workers=2, thread_name_prefix="ui-periodic")
_UI_PREVIEW_POOL = ThreadPoolExecutor(max_workers=2, thread_name_prefix="ui-preview")
# edited by glg
# Backpressure antrean worker:
# - batasi jumlah task pending per domain agar burst tidak menumpuk tanpa batas.
_UI_POOL_PENDING_LIMITS = {
    "fast": 256,
    "periodic": 96,
    "preview": 96,
}
_UI_POOL_PENDING_SEMAPHORES = {
    str(domain): threading.Semaphore(max(1, int(limit)))
    for domain, limit in _UI_POOL_PENDING_LIMITS.items()
}
# edited by glg
# Sequence key per domain pool untuk coalescing task berbasis key.
_UI_KEYED_LOCKS = {
    "fast": threading.Lock(),
    "periodic": threading.Lock(),
}
_UI_KEYED_SEQ = {
    "fast": {},
    "periodic": {},
}


def _attach_unhandled_exception_logger(future):
    # edited by glg
    # Jangan biarkan exception worker hilang diam-diam; log agar mudah ditelusuri.
    def _done_callback(done_future):
        try:
            err = done_future.exception()
        except (CancelledError, InvalidStateError, RuntimeError) as exc:
            LOGGER.warning("Gagal memeriksa hasil worker future: %s", exc)
            return
        if err is not None:
            LOGGER.warning("Task worker gagal: %s", err, exc_info=err)

    future.add_done_callback(_done_callback)
    return future


def _completed_none_future():
    future = Future()
    # edited by glg
    # Tandai future hasil drop antrean agar caller bisa fallback sinkron.
    setattr(future, "_queue_dropped", True)
    future.set_result(None)
    return future


# edited by glg
def _submit(pool, fn, *args, domain="fast", **kwargs):
    normalized_domain = str(domain or "fast").strip().lower() or "fast"
    semaphore = _UI_POOL_PENDING_SEMAPHORES.get(normalized_domain)
    if semaphore is None:
        semaphore = _UI_POOL_PENDING_SEMAPHORES.get("fast")
        normalized_domain = "fast"

    acquired = bool(semaphore.acquire(blocking=False))
    if not acquired:
        LOGGER.info(
            "[PERF] worker_pool_drop domain=%s reason=queue_full",
            normalized_domain,
        )
        return _completed_none_future()

    try:
        future = pool.submit(fn, *args, **kwargs)
    except RuntimeError:
        try:
            semaphore.release()
        except ValueError:
            pass
        raise

    def _release_slot(_done_future):
        try:
            semaphore.release()
        except ValueError:
            LOGGER.warning("Gagal release slot worker pool domain=%s", normalized_domain)

    future.add_done_callback(_release_slot)
    return _attach_unhandled_exception_logger(future)


def submit_ui_query_task(fn, *args, **kwargs):
    return _submit(_UI_FAST_QUERY_POOL, fn, *args, domain="fast", **kwargs)


# edited by glg
def submit_ui_periodic_task(fn, *args, **kwargs):
    return _submit(_UI_PERIODIC_QUERY_POOL, fn, *args, domain="periodic", **kwargs)


def submit_ui_preview_task(fn, *args, **kwargs):
    return _submit(_UI_PREVIEW_POOL, fn, *args, domain="preview", **kwargs)


# edited by glg
def submit_ui_query_task_keyed(key, fn, *args, **kwargs):
    return _submit_ui_task_keyed(
        key=key,
        fn=fn,
        pool=_UI_FAST_QUERY_POOL,
        domain="fast",
        fallback_submit=submit_ui_query_task,
        args=args,
        kwargs=kwargs,
    )


# edited by glg
def submit_ui_periodic_task_keyed(key, fn, *args, **kwargs):
    return _submit_ui_task_keyed(
        key=key,
        fn=fn,
        pool=_UI_PERIODIC_QUERY_POOL,
        domain="periodic",
        fallback_submit=submit_ui_periodic_task,
        args=args,
        kwargs=kwargs,
    )


# edited by glg
def is_worker_queue_dropped(future):
    return bool(getattr(future, "_queue_dropped", False))


# edited by glg
def _submit_ui_task_keyed(key, fn, pool, domain, fallback_submit, args, kwargs):
    normalized_key = str(key or "").strip()
    if not normalized_key:
        return fallback_submit(fn, *args, **kwargs)

    lock = _UI_KEYED_LOCKS.get(domain)
    seq_map = _UI_KEYED_SEQ.get(domain)
    if lock is None or seq_map is None:
        return fallback_submit(fn, *args, **kwargs)

    with lock:
        seq = int(seq_map.get(normalized_key, 0)) + 1
        seq_map[normalized_key] = int(seq)

    def _runner():
        with lock:
            latest_seq = int(seq_map.get(normalized_key, 0))
        # edited by glg
        # Skip task lama yang sudah kalah oleh request terbaru dengan key sama.
        if int(seq) != int(latest_seq):
            return None
        return fn(*args, **kwargs)

    return _submit(pool, _runner, domain=domain)
