import sqlite3
import threading
import time

import pytest

# edited by glg
pytestmark = [pytest.mark.integration, pytest.mark.non_functional, pytest.mark.perf_smoke]


def test_sqlite_lock_contention_soak(tmp_path):
    db_path = tmp_path / "soak_lock_contention.db"
    conn = sqlite3.connect(str(db_path))
    try:
        conn.execute("PRAGMA journal_mode=WAL")
        conn.execute("CREATE TABLE IF NOT EXISTS t (id INTEGER PRIMARY KEY, value INTEGER NOT NULL)")
        conn.execute("INSERT INTO t (id, value) VALUES (1, 0)")
        conn.commit()
    finally:
        conn.close()

    stop_event = threading.Event()
    writer_errors = []

    def _writer():
        while not stop_event.is_set():
            db = sqlite3.connect(str(db_path), timeout=0.05, isolation_level=None)
            try:
                db.execute("PRAGMA journal_mode=WAL")
                db.execute("BEGIN IMMEDIATE")
                db.execute("UPDATE t SET value = value + 1 WHERE id = 1")
                time.sleep(0.004)
                db.execute("COMMIT")
            except Exception as exc:  # pragma: no cover - validasi akhir lewat assert.
                writer_errors.append(exc)
                try:
                    db.execute("ROLLBACK")
                except Exception:
                    pass
            finally:
                db.close()

    writer_thread = threading.Thread(target=_writer, daemon=True)
    writer_thread.start()

    attempts = 0
    locked_count = 0
    success_count = 0
    deadline = time.perf_counter() + 1.6

    while time.perf_counter() < deadline:
        reader = sqlite3.connect(str(db_path), timeout=0.01)
        try:
            reader.execute("PRAGMA journal_mode=WAL")
            reader.execute("SELECT value FROM t WHERE id = 1").fetchone()
            success_count += 1
        except sqlite3.OperationalError as exc:
            if "locked" in str(exc).lower():
                locked_count += 1
            else:
                raise
        finally:
            reader.close()
        attempts += 1

    stop_event.set()
    writer_thread.join(timeout=2.0)

    assert not writer_thread.is_alive()
    assert len(writer_errors) == 0
    assert attempts >= 80
    assert success_count > 0
    assert locked_count >= 0
