import sqlite3
import threading
from datetime import datetime

import pytest

from pypos.modules.sinkronisasi.services.event_outbox_service import EventOutboxService

# edited by glg
pytestmark = [pytest.mark.unit]


def _read_row(db_path, row_id):
    conn = sqlite3.connect(str(db_path))
    conn.row_factory = sqlite3.Row
    try:
        cur = conn.cursor()
        cur.execute(
            """
            SELECT id, status, attempt_count, available_at, lease_until, error_code, error_message
            FROM event_outbox
            WHERE id = ?
            """,
            (int(row_id),),
        )
        return cur.fetchone()
    finally:
        conn.close()


def test_outbox_enqueue_and_dedup(tmp_path):
    db_path = tmp_path / "outbox.db"
    service = EventOutboxService(str(db_path))

    first = service.enqueue_event(
        topic="transaksi.created",
        payload={"trx_id": 1001, "nominal": 12000},
        source_id="CBG-01",
    )
    second = service.enqueue_event(
        topic="transaksi.created",
        payload={"trx_id": 1001, "nominal": 12000},
        source_id="CBG-01",
    )

    assert first["inserted"] is True
    assert second["inserted"] is False
    assert second["event_id"] == first["event_id"]
    assert second["dedup_key"] == first["dedup_key"]

    metrics = service.get_queue_metrics()
    assert metrics["pending"] == 1
    assert metrics["inflight"] == 0
    assert metrics["sent"] == 0


def test_outbox_claim_ack_and_purge_sent(tmp_path):
    db_path = tmp_path / "outbox.db"
    service = EventOutboxService(str(db_path))

    service.enqueue_event(topic="a", payload={"idx": 1}, source_id="CBG-01")
    service.enqueue_event(topic="a", payload={"idx": 2}, source_id="CBG-01")

    claimed = service.claim_batch(limit=2, lease_seconds=30, max_inflight=10)
    assert len(claimed) == 2
    ids = [int(item["id"]) for item in claimed]

    acked = service.ack_events([ids[0]])
    assert acked == 1

    metrics_after_ack = service.get_queue_metrics()
    assert metrics_after_ack["sent"] == 1
    assert metrics_after_ack["inflight"] == 1

    conn = sqlite3.connect(str(db_path))
    try:
        conn.execute(
            "UPDATE event_outbox SET updated_at = ? WHERE id = ?",
            ("2000-01-01 00:00:00", ids[0]),
        )
        conn.commit()
    finally:
        conn.close()

    purged = service.purge_sent(retention_days=1, limit=10)
    assert purged == 1


def test_outbox_release_expired_and_fail_events(tmp_path):
    db_path = tmp_path / "outbox.db"
    service = EventOutboxService(str(db_path))

    service.enqueue_event(topic="b", payload={"idx": 10}, source_id="CBG-02")
    claimed = service.claim_batch(limit=1, lease_seconds=60, max_inflight=1)
    assert len(claimed) == 1
    row_id = int(claimed[0]["id"])

    blocked = service.claim_batch(limit=1, lease_seconds=60, max_inflight=1)
    assert blocked == []

    conn = sqlite3.connect(str(db_path))
    try:
        conn.execute(
            "UPDATE event_outbox SET lease_until = ? WHERE id = ?",
            ("2000-01-01 00:00:00", row_id),
        )
        conn.commit()
    finally:
        conn.close()

    released = service.release_expired_leases()
    assert released == 1

    expired = _read_row(db_path, row_id)
    assert str(expired["status"]) == "FAILED"
    assert str(expired["error_code"]) == "LEASE_EXPIRED"

    reclaimed = service.claim_batch(limit=1, lease_seconds=60, max_inflight=2)
    assert len(reclaimed) == 1
    assert int(reclaimed[0]["id"]) == row_id

    failed = service.fail_events(
        [row_id],
        error_code="UPSTREAM_500",
        error_message="server error",
        retry_delay_seconds=7,
    )
    assert failed == 1

    failed_row = _read_row(db_path, row_id)
    assert str(failed_row["status"]) == "FAILED"
    assert str(failed_row["error_code"]) == "UPSTREAM_500"
    assert str(failed_row["error_message"]) == "server error"

    available_at = datetime.strptime(str(failed_row["available_at"]), "%Y-%m-%d %H:%M:%S")
    assert available_at >= datetime.utcnow()


def test_outbox_claim_batch_concurrent_no_duplicate_ids(tmp_path):
    # edited by glg
    db_path = tmp_path / "outbox_concurrent.db"
    service = EventOutboxService(str(db_path))

    for idx in range(1, 7):
        service.enqueue_event(
            topic="bulk.created",
            payload={"idx": idx},
            source_id="CBG-TEST",
        )

    start_gate = threading.Barrier(3)
    thread_results = []
    errors = []

    def _claim_worker():
        try:
            start_gate.wait(timeout=5)
            claimed = service.claim_batch(limit=3, lease_seconds=60, max_inflight=10)
            ids = [int(item["id"]) for item in (claimed or [])]
            thread_results.append(ids)
        except Exception as exc:  # pragma: no cover - guard test infra
            errors.append(exc)

    t1 = threading.Thread(target=_claim_worker, daemon=True)
    t2 = threading.Thread(target=_claim_worker, daemon=True)
    t1.start()
    t2.start()
    start_gate.wait(timeout=5)
    t1.join(timeout=5)
    t2.join(timeout=5)

    assert not errors
    claimed_ids = [item for batch in thread_results for item in batch]
    assert len(claimed_ids) == 6
    assert len(set(claimed_ids)) == 6

    metrics = service.get_queue_metrics()
    assert metrics["inflight"] == 6
