import sqlite3
import tempfile
from pathlib import Path

from pypos.core.database.schema_migrator import (
    TARGET_SCHEMA_VERSION,
    _create_index_if_columns_exist,
    _ensure_column,
    get_schema_version,
    run_schema_migrations,
)
from pypos.core.utils.db_helper import connect_sqlite


def _table_exists(conn, table_name: str) -> bool:
    cur = conn.cursor()
    cur.execute(
        "SELECT name FROM sqlite_master WHERE type='table' AND name=?",
        (table_name,),
    )
    row = cur.fetchone()
    return bool(row)


def test_schema_migrator_creates_core_tables_and_version():
    with tempfile.TemporaryDirectory() as td:
        db_path = str(Path(td) / "schema_migrator_v1.db")

        ok = run_schema_migrations(db_path=db_path, strict=True)
        assert ok is True
        assert get_schema_version(db_path) == TARGET_SCHEMA_VERSION

        conn = sqlite3.connect(db_path)
        try:
            assert _table_exists(conn, "schema_version")
            assert _table_exists(conn, "return_transaksi_penjualan")
            assert _table_exists(conn, "detail_return_transaksi_penjualan")
            assert _table_exists(conn, "voucher_return")
            assert _table_exists(conn, "voucher_usage")
            assert _table_exists(conn, "pembatalan_transaksi_history")
            assert _table_exists(conn, "settlement_history")
        finally:
            conn.close()


def test_connect_sqlite_auto_runs_schema_migrator_once():
    with tempfile.TemporaryDirectory() as td:
        db_path = str(Path(td) / "schema_migrator_auto.db")

        conn = connect_sqlite(db_path=db_path)
        conn.close()

        # Panggilan kedua harus tetap aman dan idempotent.
        conn2 = connect_sqlite(db_path=db_path)
        conn2.close()

        conn3 = sqlite3.connect(db_path)
        try:
            assert _table_exists(conn3, "schema_version")
            cur = conn3.cursor()
            cur.execute("SELECT version FROM schema_version WHERE id=1")
            row = cur.fetchone()
            assert row is not None
            assert int(row[0]) == TARGET_SCHEMA_VERSION
        finally:
            conn3.close()


def test_migration_v5_removes_voucher_return_nilai_column_and_preserves_data():
    with tempfile.TemporaryDirectory() as td:
        db_path = str(Path(td) / "schema_migrator_v5_drop_nilai.db")
        conn = sqlite3.connect(db_path)
        try:
            cur = conn.cursor()
            cur.execute(
                """
                CREATE TABLE schema_version (
                    id INTEGER PRIMARY KEY CHECK (id = 1),
                    version INTEGER NOT NULL DEFAULT 0,
                    updated_at TEXT NOT NULL
                )
                """
            )
            cur.execute(
                "INSERT INTO schema_version (id, version, updated_at) VALUES (1, 4, datetime('now'))"
            )
            cur.execute(
                """
                CREATE TABLE voucher_return (
                    id INTEGER PRIMARY KEY AUTOINCREMENT,
                    return_id INTEGER NULL,
                    transaksi_id TEXT NULL,
                    customer_id TEXT NULL,
                    kode TEXT UNIQUE NOT NULL,
                    nilai REAL DEFAULT 0,
                    nilai_awal REAL NOT NULL DEFAULT 0,
                    saldo REAL NOT NULL DEFAULT 0,
                    is_used INTEGER DEFAULT 0,
                    status TEXT NOT NULL DEFAULT '',
                    dtime_terbit TEXT NOT NULL,
                    dtime_expired TEXT NULL
                )
                """
            )
            cur.execute(
                """
                INSERT INTO voucher_return (
                    id, return_id, transaksi_id, customer_id, kode, nilai, nilai_awal, saldo, is_used, status, dtime_terbit
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (11, 5, "TRX-001", "CUST-001", "VCR-001", 25000, 0, 0, 0, "", "2026-03-12 08:00:00"),
            )
            conn.commit()
        finally:
            conn.close()

        ok = run_schema_migrations(db_path=db_path, strict=True)
        assert ok is True
        assert get_schema_version(db_path) == TARGET_SCHEMA_VERSION

        conn2 = sqlite3.connect(db_path)
        try:
            cur2 = conn2.cursor()
            cur2.execute('PRAGMA table_info("voucher_return")')
            cols = [str(row[1]) for row in cur2.fetchall() if row and len(row) > 1]
            assert "nilai" not in cols
            assert "nilai_awal" in cols
            assert "saldo" in cols

            cur2.execute(
                """
                SELECT id, kode, nilai_awal, saldo, status, dtime_terbit
                FROM voucher_return WHERE id = 11
                """
            )
            row = cur2.fetchone()
            assert row is not None
            assert int(row[0]) == 11
            assert str(row[1]) == "VCR-001"
            assert float(row[2]) == 25000.0
            assert float(row[3]) == 25000.0
            assert str(row[4]) == "aktif"
            assert str(row[5]) == "2026-03-12 08:00:00"
        finally:
            conn2.close()


# edited by glg
def test_create_index_if_columns_exist_rejects_invalid_identifier():
    conn = sqlite3.connect(":memory:")
    try:
        cur = conn.cursor()
        cur.execute("CREATE TABLE sample_table (id INTEGER PRIMARY KEY, name TEXT)")
        ok = _create_index_if_columns_exist(
            cur,
            "sample_table",
            'bad-index"; DROP TABLE sample_table; --',
            ("name",),
        )
        assert ok is False

        cur.execute(
            "SELECT name FROM sqlite_master WHERE type='table' AND name='sample_table'"
        )
        assert cur.fetchone() is not None
    finally:
        conn.close()


def test_ensure_column_rejects_invalid_table_identifier():
    conn = sqlite3.connect(":memory:")
    try:
        cur = conn.cursor()
        cur.execute("CREATE TABLE sample_table (id INTEGER PRIMARY KEY)")
        try:
            _ensure_column(cur, 'sample_table"; DROP TABLE sample_table; --', "safe_col", "TEXT")
            raised = False
        except ValueError:
            raised = True
        assert raised is True

        cur.execute(
            "SELECT name FROM sqlite_master WHERE type='table' AND name='sample_table'"
        )
        assert cur.fetchone() is not None
    finally:
        conn.close()
