import os
import json
import sqlite3
import tempfile
import unittest
import base64
from unittest.mock import patch

from pypos.core.utils import config_utils
from pypos.modules.customer.models.customer_search_model import CustomerSearchModel
from pypos.modules.sinkronisasi.config import get_export_tables
from pypos.modules.sinkronisasi.models.sinkron_model import SinkronModel
from pypos.modules.sinkronisasi.services.transaction_export_service import TransactionExportService


class PhaseHardeningGuardTests(unittest.TestCase):
    def test_managed_policy_runtime_file_merges_only_whitelisted_keys(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            policy_file = os.path.join(tmpdir, "app_settings.managed_policy.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")
            with open(app_settings_file, "w", encoding="utf-8") as fh:
                json.dump(
                    {
                        "settlement_delivery_mode": "dual",
                        "settlement_direct_enabled": 0,
                        "export_on_settlement": 0,
                        "export_transaksi_enabled": 1,
                        "export_transaksi_data_enabled": 1,
                        "export_transaksi_data_registry_enabled": 1,
                        "printer_default_paper_size": "80mm",
                    },
                    fh,
                )
            with open(policy_file, "w", encoding="utf-8") as fh:
                json.dump(
                    {
                        "enabled": 1,
                        "version": 7,
                        "managed_keys": [
                            "settlement_delivery_mode",
                            "settlement_direct_enabled",
                            "export_on_settlement",
                            "export_transaksi_enabled",
                            "export_transaksi_data_enabled",
                            "export_transaksi_data_registry_enabled",
                        ],
                        "values": {
                            "settlement_delivery_mode": "direct_only",
                            "settlement_direct_enabled": 1,
                            "export_on_settlement": 1,
                            "export_transaksi_enabled": 1,
                            "export_transaksi_data_enabled": 0,
                            "export_transaksi_data_registry_enabled": 0,
                        },
                    },
                    fh,
                )

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                app_cfg = config_utils.read_app_settings()
                self.assertEqual(str(app_cfg.get("settlement_delivery_mode")), "direct_only")
                self.assertEqual(int(app_cfg.get("settlement_direct_enabled") or 0), 1)
                self.assertEqual(int(app_cfg.get("export_on_settlement") or 0), 1)
                self.assertEqual(int(app_cfg.get("export_transaksi_enabled") or 0), 1)
                self.assertEqual(int(app_cfg.get("export_transaksi_data_enabled") or 0), 0)
                self.assertEqual(int(app_cfg.get("export_transaksi_data_registry_enabled") or 0), 0)
                self.assertEqual(int(app_cfg.get("managed_settings_policy_applied_version") or 0), 7)
                # Non-whitelist key harus dipertahankan.
                self.assertEqual(str(app_cfg.get("printer_default_paper_size") or ""), "80mm")

                tables = get_export_tables()
                self.assertIn("transaksi", tables)
                self.assertNotIn("transaksi_data", tables)
                self.assertNotIn("transaksi_data_registry", tables)
                self.assertNotIn("settlement_history", tables)
                self.assertNotIn("transaksi_settlement", tables)

    def test_managed_policy_disabled_does_not_override_app_settings(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            policy_file = os.path.join(tmpdir, "app_settings.managed_policy.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")
            with open(app_settings_file, "w", encoding="utf-8") as fh:
                json.dump(
                    {
                        "settlement_delivery_mode": "dual",
                        "export_transaksi_data_enabled": 1,
                    },
                    fh,
                )
            with open(policy_file, "w", encoding="utf-8") as fh:
                json.dump(
                    {
                        "enabled": 0,
                        "version": 9,
                        "managed_keys": ["settlement_delivery_mode", "export_transaksi_data_enabled"],
                        "values": {
                            "settlement_delivery_mode": "direct_only",
                            "export_transaksi_data_enabled": 0,
                        },
                    },
                    fh,
                )

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                app_cfg = config_utils.read_app_settings()
                self.assertEqual(str(app_cfg.get("settlement_delivery_mode")), "dual")
                self.assertEqual(int(app_cfg.get("export_transaksi_data_enabled") or 0), 1)

    def test_read_endpoint_config_allows_empty_base_url_on_bootstrap(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                endpoint = config_utils.read_endpoint_config()
                self.assertEqual(endpoint.get("api_base_url"), "")
                self.assertTrue(os.path.exists(app_settings_file))

    def test_save_config_app_only_does_not_require_api_base_url(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                config_utils.save_config({"voucher_enabled_for_kasir": 1})
                app_settings = config_utils.read_app_settings()
                self.assertEqual(int(app_settings.get("voucher_enabled_for_kasir", 0)), 1)

    def test_customer_model_returns_empty_when_table_not_exists(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = os.path.join(tmpdir, "empty.db")
            sqlite3.connect(db_path).close()

            model = CustomerSearchModel(db_path=db_path)
            self.assertEqual(model.load_all_customers(), [])

    def test_sinkron_model_rejects_invalid_table_name(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = os.path.join(tmpdir, "sync.db")
            sqlite3.connect(db_path).close()

            model = SinkronModel(db_path=db_path)
            try:
                with self.assertRaises(ValueError):
                    model.apply_sync_result('price;DROP TABLE "price"', [{"id": 1}])
            finally:
                model.close_connections()

    def test_sinkron_model_skips_invalid_server_columns(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = os.path.join(tmpdir, "sync_cols.db")
            sqlite3.connect(db_path).close()

            model = SinkronModel(db_path=db_path)
            try:
                updated = model.apply_sync_result(
                    "sync_guard_demo",
                    [{"id": "1", "valid_col": "OK", "bad-col": "SKIP"}],
                )
                self.assertEqual(updated, 1)
            finally:
                model.close_connections()

            conn = sqlite3.connect(db_path)
            cur = conn.cursor()
            cur.execute('PRAGMA table_info("sync_guard_demo")')
            cols = [r[1] for r in cur.fetchall()]
            conn.close()
            self.assertIn("valid_col", cols)
            self.assertNotIn("bad-col", cols)

    def test_export_service_rejects_invalid_table_name(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = os.path.join(tmpdir, "export.db")
            sqlite3.connect(db_path).close()

            service = TransactionExportService()
            with patch(
                "pypos.modules.sinkronisasi.services.transaction_export_service.get_db_path",
                return_value=db_path,
            ):
                with self.assertRaises(ValueError):
                    service._fetch_table_rows("transaksi; DROP TABLE x", 0, 10)

    def test_get_export_tables_default_includes_settlement(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")
            with open(app_settings_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                tables = get_export_tables()
                self.assertIn("transaksi", tables)
                self.assertIn("transaksi_data", tables)
                self.assertIn("settlement_history", tables)

    def test_get_export_tables_direct_only_filters_settlement_when_direct_enabled(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                config_utils.save_config(
                    {
                        "settlement_delivery_mode": "direct_only",
                        "settlement_direct_enabled": 1,
                        "export_transaksi_enabled": 1,
                        "export_transaksi_data_enabled": 1,
                        "export_transaksi_data_registry_enabled": 1,
                    }
                )
                tables = get_export_tables()
                self.assertIn("transaksi", tables)
                self.assertIn("transaksi_data", tables)
                self.assertIn("transaksi_data_registry", tables)
                self.assertNotIn("settlement_history", tables)
                self.assertNotIn("transaksi_settlement", tables)

    def test_get_export_tables_direct_only_fallback_when_direct_disabled(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                config_utils.save_config(
                    {
                        "settlement_delivery_mode": "direct_only",
                        "settlement_direct_enabled": 0,
                    }
                )
                tables = get_export_tables()
                self.assertIn("settlement_history", tables)
                self.assertIn("transaksi_settlement", tables)

    def test_get_export_tables_export_only_keeps_settlement_tables(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                config_utils.save_config(
                    {
                        "settlement_delivery_mode": "export_only",
                        "settlement_direct_enabled": 1,
                    }
                )
                tables = get_export_tables()
                self.assertIn("settlement_history", tables)
                self.assertIn("transaksi_settlement", tables)

    def test_get_export_tables_toggle_transaksi_off_disables_detail_and_registry(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                config_utils.save_config(
                    {
                        "export_transaksi_enabled": 0,
                        "export_transaksi_data_enabled": 1,
                        "export_transaksi_data_registry_enabled": 1,
                    }
                )
                tables = get_export_tables()
                self.assertNotIn("transaksi", tables)
                self.assertNotIn("transaksi_data", tables)
                self.assertNotIn("transaksi_data_registry", tables)

    def test_get_export_tables_toggle_transaksi_data_off_disables_registry(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                config_utils.save_config(
                    {
                        "export_transaksi_enabled": 1,
                        "export_transaksi_data_enabled": 0,
                        "export_transaksi_data_registry_enabled": 1,
                    }
                )
                tables = get_export_tables()
                self.assertIn("transaksi", tables)
                self.assertNotIn("transaksi_data", tables)
                self.assertNotIn("transaksi_data_registry", tables)

    def test_get_export_tables_toggle_registry_off_keeps_transaksi_and_detail(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            config_file = os.path.join(tmpdir, "config.json")
            app_settings_file = os.path.join(tmpdir, "app_settings.json")
            with open(config_file, "w", encoding="utf-8") as fh:
                fh.write("{}")

            with patch.object(config_utils, "CONFIG_FILE", config_file), patch.object(
                config_utils, "APP_SETTINGS_FILE", app_settings_file
            ):
                config_utils.reload_config()
                config_utils.save_config(
                    {
                        "export_transaksi_enabled": 1,
                        "export_transaksi_data_enabled": 1,
                        "export_transaksi_data_registry_enabled": 0,
                    }
                )
                tables = get_export_tables()
                self.assertIn("transaksi", tables)
                self.assertIn("transaksi_data", tables)
                self.assertNotIn("transaksi_data_registry", tables)

    def test_export_service_get_exportable_tables_filters_invalid_table(self):
        with tempfile.TemporaryDirectory() as tmpdir:
            db_path = os.path.join(tmpdir, "exportable.db")
            conn = sqlite3.connect(db_path)
            cur = conn.cursor()
            cur.execute("CREATE TABLE transaksi (id INTEGER PRIMARY KEY AUTOINCREMENT, nomer TEXT)")
            cur.execute("CREATE TABLE no_id_table (nama TEXT)")
            conn.commit()
            conn.close()

            service = TransactionExportService()
            with patch(
                "pypos.modules.sinkronisasi.services.transaction_export_service.get_db_path",
                return_value=db_path,
            ):
                result = service._get_exportable_tables(["transaksi", "no_id_table", "missing_table"])
                self.assertEqual(result, ["transaksi"])

    def test_export_normalize_transaksi_enforces_strict_required_defaults(self):
        # edited by glg
        service = TransactionExportService()
        row = {
            "id": 10,
            "x_id": None,
            "nomer": None,
            "dtime": None,
            "fulldate": None,
            "machine_id": None,
            "oleh_id": 7,
            "oleh_nama": "kasir glg",
            "print_oleh_id": None,
            "print_oleh_nama": None,
            "kontainer_size": None,
            "ekspedisi_status": None,
            "bank_rekening_id": None,
            "ekspedisi_id": None,
            "kontainer_size_id": None,
            "kontainer_status": None,
            "kontainer_approve_id": None,
            "jatuh_tempo": None,
            "bank_rekening_id_from": None,
            "cli": None,
            "inv_dollar": None,
            "next_step_num": None,
            "counters": None,
            "returned": None,
            "returns": None,
            "deposit_persen_in": None,
            "transaksi_nilai_ori": None,
            "trash2": None,
            "r_jenis": None,
            "_temp": None,
            "cli_loop": None,
            "status_edit": None,
            "print_counter": None,
            "print_status": None,
            "setor_status": None,
            "sinkron_cache": None,
            "auto_pemindahan": None,
            "counters_customers_id_all": None,
            "counters_gudang_id_all": None,
            "batal_kirim": None,
            "kontainer_valas_nilai": None,
            "tambahan_nilai": None,
            "cek": None,
            "status_4": None,
            "trash_4": None,
            "r_sales": None,
            "r_maju": None,
            "r_mundur": None,
        }
        with patch(
            "pypos.modules.sinkronisasi.services.transaction_export_service.get_active_device_info",
            return_value={"machine_id": "M-001", "cabang_id": 100, "toko_id": 1001},
        ), patch(
            "pypos.modules.sinkronisasi.services.transaction_export_service.get_device_id",
            return_value="M-001",
        ), patch(
            "pypos.modules.sinkronisasi.services.transaction_export_service.read_config",
            return_value={},
        ):
            out = service._normalize_rows_for_export("transaksi", [row], "2026-03-02 10:00:00")[0]

        self.assertEqual(out.get("machine_id"), "M-001")
        self.assertEqual(out.get("x_id"), 10)
        self.assertEqual(out.get("nomer"), "10")
        self.assertEqual(out.get("fulldate"), "2026-03-02")
        self.assertEqual(out.get("print_oleh_nama"), "kasir glg")
        self.assertEqual(out.get("kontainer_size"), "none")
        self.assertEqual(out.get("ekspedisi_status"), "none")
        self.assertEqual(int(out.get("bank_rekening_id")), 0)
        self.assertEqual(int(out.get("ekspedisi_id")), 0)
        self.assertEqual(int(out.get("kontainer_size_id")), 0)
        self.assertEqual(int(out.get("kontainer_status")), 0)
        self.assertEqual(int(out.get("kontainer_approve_id")), 0)
        self.assertEqual(int(out.get("bank_rekening_id_from")), 0)
        self.assertEqual(int(out.get("jatuh_tempo")), 0)
        self.assertEqual(int(out.get("print_oleh_id")), 7)
        self.assertEqual(int(out.get("cli")), 0)
        self.assertEqual(int(out.get("inv_dollar")), 0)
        self.assertEqual(int(out.get("next_step_num")), 0)
        self.assertEqual(int(out.get("counters")), 0)
        self.assertEqual(int(out.get("returned")), 0)
        self.assertEqual(int(out.get("returns")), 0)
        self.assertEqual(int(out.get("deposit_persen_in")), 0)
        self.assertEqual(int(out.get("transaksi_nilai_ori")), 0)
        self.assertEqual(int(out.get("trash2")), 0)
        self.assertEqual(int(out.get("r_jenis")), 0)
        self.assertEqual(int(out.get("_temp")), 0)
        self.assertEqual(int(out.get("cli_loop")), 0)
        self.assertEqual(int(out.get("status_edit")), 0)
        self.assertEqual(int(out.get("print_counter")), 0)
        self.assertEqual(int(out.get("print_status")), 0)
        self.assertEqual(int(out.get("setor_status")), 0)
        self.assertEqual(int(out.get("sinkron_cache")), 0)
        self.assertEqual(int(out.get("auto_pemindahan")), 0)
        self.assertEqual(int(out.get("counters_customers_id_all")), 0)
        self.assertEqual(int(out.get("counters_gudang_id_all")), 0)
        self.assertEqual(int(out.get("batal_kirim")), 0)
        self.assertEqual(int(out.get("kontainer_valas_nilai")), 0)
        self.assertEqual(int(out.get("tambahan_nilai")), 0)
        self.assertEqual(int(out.get("cek")), 0)
        self.assertEqual(int(out.get("status_4")), 0)
        self.assertEqual(int(out.get("trash_4")), 0)
        self.assertEqual(int(out.get("r_sales")), 0)
        self.assertEqual(int(out.get("r_maju")), 0)
        self.assertEqual(int(out.get("r_mundur")), 0)

    def test_export_align_registry_rebuilds_main_with_dtime(self):
        # edited by glg
        service = TransactionExportService()
        rows = [
            {
                "transaksi_id": "5",
                "machine_id": "M-OLD",
                "main": "",
                "items": "",
            }
        ]
        master_index = {
            "5": {
                "dtime": "2026-03-02 12:05:40",
                "machine_id": "M-001",
                "nomer": "INV-001",
                "jenis": 582,
            }
        }
        out = service._align_detail_and_registry_rows(rows, "transaksi_data_registry", master_index)
        self.assertEqual(len(out), 1)
        row = out[0]
        self.assertEqual(row.get("machine_id"), "M-001")
        self.assertFalse(service._is_blank_value(row.get("main")))
        self.assertFalse(service._is_blank_value(row.get("items")))

        decoded = base64.b64decode(str(row.get("main"))).decode("utf-8", errors="ignore")
        self.assertIn("dtime", decoded)
        self.assertIn("INV-001", decoded)
        self.assertEqual(row.get("items"), "YTowOnt9")


if __name__ == "__main__":
    unittest.main()
