import json import sqlite3, uuid from datetime import datetime, timedelta from dataclasses import dataclass from typing import List from pypos.core.base_model import BaseModel from pypos.core.utils.db_helper import connect_sqlite, connect_sqlite_read_fast from pypos.core.utils.path_utils import get_db_path from pypos.core.utils.sql_identifier_utils import quote_sql_identifier from pypos.core.database.schema_migrator import run_schema_migrations_once from pypos.modules.penjualan.models.voucher_model import VoucherModel @dataclass class ReturnItem(BaseModel): produk_id: str produk_nama: str jumlah: int harga: float jenis_return: str # full/partial class ReturnModel(BaseModel): def __init__(self, db_path: str = None): super().__init__() self.db_path = db_path or get_db_path() self.conn = connect_sqlite(self.db_path) self.conn.row_factory = sqlite3.Row self._ensure_tables() self.voucher_model = VoucherModel(self.db_path) # edited by glg # Cache membership transaksi settle untuk menghindari parsing JSON berulang. self._settlement_txn_cache = set() self._settlement_txn_cache_last_max_id = 0 self._settlement_txn_cache_ready = False # edited by glg # Cache keberadaan map settlement history agar lookup lock transaksi O(1) di DB. self._settlement_map_table_checked = False self._settlement_map_table_available = False def _ensure_tables(self): run_schema_migrations_once(self.db_path, strict=False) self._ensure_return_columns() def _ensure_return_columns(self): run_schema_migrations_once(self.db_path, strict=False) # edited by glg # Read path dipisah agar aman dipanggil dari worker thread async. def _connect_read(self): conn = connect_sqlite_read_fast(self.db_path) conn.row_factory = sqlite3.Row return conn # edited by glg def _get_table_columns(self, table_name: str): cur = self.conn.cursor() try: table_sql = quote_sql_identifier(table_name) cur.execute(f"PRAGMA table_info({table_sql})") return {str(row[1]) for row in (cur.fetchall() or []) if row and len(row) > 1} except (sqlite3.Error, ValueError): return set() # edited by glg def _is_transaksi_in_settlement_history(self, transaksi_id: str): normalized = str(transaksi_id or "").strip() if not normalized: return False try: target_id = int(normalized) except Exception: return False if target_id <= 0: return False cur = self.conn.cursor() try: if self._has_settlement_history_map_table(cur): cur.execute( """ SELECT 1 FROM settlement_history_transaksi_map WHERE transaksi_id = ? LIMIT 1 """, (target_id,), ) if cur.fetchone() is not None: return True self._refresh_settlement_txn_cache(cur) return target_id in self._settlement_txn_cache except Exception: return False finally: cur.close() # edited by glg def _has_settlement_history_map_table(self, cursor): if bool(self._settlement_map_table_checked): return bool(self._settlement_map_table_available) try: cursor.execute( """ SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'settlement_history_transaksi_map' LIMIT 1 """ ) self._settlement_map_table_available = cursor.fetchone() is not None except Exception: self._settlement_map_table_available = False self._settlement_map_table_checked = True return bool(self._settlement_map_table_available) # edited by glg # Cache append-only berbasis max(id) agar cek settle tetap cepat dan konsisten. def _refresh_settlement_txn_cache(self, cursor): cursor.execute("SELECT COALESCE(MAX(id), 0) FROM settlement_history") row = cursor.fetchone() latest_max_id = int((row[0] if row else 0) or 0) if self._settlement_txn_cache_ready and latest_max_id == self._settlement_txn_cache_last_max_id: return if latest_max_id <= 0: self._settlement_txn_cache.clear() self._settlement_txn_cache_last_max_id = 0 self._settlement_txn_cache_ready = True return cursor.execute( """ SELECT id, data_transaksi_id FROM settlement_history WHERE COALESCE(data_transaksi_id, '') <> '' ORDER BY id DESC LIMIT 5000 """ ) rows = cursor.fetchall() or [] cached_ids = set() for row_item in rows: raw_blob = row_item[1] if row_item and len(row_item) > 1 else None if raw_blob is None: continue text = str(raw_blob).strip() if not text: continue try: payload = json.loads(text) except Exception: payload = [] if not isinstance(payload, list): continue for item in payload: try: cached_ids.add(int(item)) except Exception: continue self._settlement_txn_cache = cached_ids self._settlement_txn_cache_last_max_id = latest_max_id self._settlement_txn_cache_ready = True # edited by glg # Gunakan range datetime agar filter tanggal tetap index-friendly. def _build_datetime_range(self, start_date: str, end_date: str): try: start_dt = datetime.strptime(str(start_date or "").strip(), "%Y-%m-%d") end_dt = datetime.strptime(str(end_date or "").strip(), "%Y-%m-%d") except Exception: return None if end_dt < start_dt: start_dt, end_dt = end_dt, start_dt return ( start_dt.strftime("%Y-%m-%d 00:00:00"), (end_dt + timedelta(days=1)).strftime("%Y-%m-%d 00:00:00"), ) # === Transaksi master search === def search_transaksi_master(self, keyword: str, start_date: str = None, end_date: str = None, limit: int = 200): kw = f"%{keyword}%" params = [] date_clause = "" if start_date and end_date: date_range = self._build_datetime_range(start_date, end_date) if date_range: date_clause = " AND dtime >= ? AND dtime < ?" params.extend([date_range[0], date_range[1]]) else: date_clause = " AND date(dtime) BETWEEN ? AND ?" params.extend([start_date, end_date]) params.extend([kw, kw, kw, int(limit)]) conn = self._connect_read() try: # edited by glg # Hindari subquery korelatif per-row: agregasi refund dibatasi ke kandidat # transaksi pada halaman aktif (CTE base + return_total). return conn.execute( """ WITH base AS ( SELECT t.id, t.nomer, t.dtime, t.customers_nama, COALESCE(t.transaksi_nilai, 0) AS transaksi_nilai FROM transaksi t WHERE t.jenis_label = 'invoice' AND COALESCE(t.trash, 0) = 0 {date_clause} AND (t.nomer LIKE ? OR t.dtime LIKE ? OR t.customers_nama LIKE ?) ORDER BY t.dtime DESC LIMIT ? ), return_total AS ( SELECT r.transaksi_id, SUM(COALESCE(dr.harga, 0) * COALESCE(dr.jumlah, 0)) AS total_refund FROM return_transaksi_penjualan r JOIN detail_return_transaksi_penjualan dr ON dr.return_id = r.id WHERE r.transaksi_id IN (SELECT id FROM base) GROUP BY r.transaksi_id ) SELECT b.id, b.nomer, b.dtime AS tanggal, b.customers_nama, (COALESCE(b.transaksi_nilai, 0) - COALESCE(rt.total_refund, 0)) AS transaksi_nilai FROM base b LEFT JOIN return_total rt ON rt.transaksi_id = b.id ORDER BY b.dtime DESC """.format(date_clause=date_clause), params ).fetchall() finally: conn.close() def load_transaksi_detail(self, transaksi_id: str): conn = self._connect_read() cur = conn.cursor() try: cur.execute( """ SELECT produk_id, SUM(COALESCE(jumlah, 0)) AS qty_returned FROM detail_return_transaksi_penjualan dr JOIN return_transaksi_penjualan r ON r.id = dr.return_id WHERE r.transaksi_id = ? GROUP BY produk_id """, (transaksi_id,), ) returned_map = {str(row[0]): int(row[1] or 0) for row in cur.fetchall()} cur.execute( "SELECT * FROM transaksi_data WHERE transaksi_id = ? AND COALESCE(trash, 0) = 0", (transaksi_id,), ) rows = cur.fetchall() results = [] for row in rows: data = dict(row) produk_id = str(data.get("produk_id")) qty_jual = int(data.get("produk_ord_jml") or 0) qty_returned = int(returned_map.get(produk_id, 0)) qty_returnable = max(0, qty_jual - qty_returned) if qty_returnable <= 0: continue data["qty_returnable"] = qty_returnable data["qty_returned"] = qty_returned results.append(data) return results finally: cur.close() conn.close() def _get_returnable_qty_map(self, transaksi_id: str): cur = self.conn.cursor() cur.execute( """ SELECT produk_id, SUM(COALESCE(jumlah, 0)) AS qty_returned FROM detail_return_transaksi_penjualan dr JOIN return_transaksi_penjualan r ON r.id = dr.return_id WHERE r.transaksi_id = ? GROUP BY produk_id """, (transaksi_id,), ) returned_map = {str(row[0]): int(row[1] or 0) for row in cur.fetchall()} cur.execute( "SELECT produk_id, SUM(COALESCE(produk_ord_jml, 0)) AS qty_jual FROM transaksi_data WHERE transaksi_id = ? AND COALESCE(trash, 0) = 0 GROUP BY produk_id", (transaksi_id,), ) sold_rows = cur.fetchall() result = {} for row in sold_rows: produk_id = str(row[0]) qty_jual = int(row[1] or 0) qty_returned = int(returned_map.get(produk_id, 0)) result[produk_id] = max(0, qty_jual - qty_returned) return result def _normalize_return_items(self, transaksi_id: str, items: List[ReturnItem]): if not items: raise ValueError("Belum ada item return.") returnable_map = self._get_returnable_qty_map(transaksi_id) if not returnable_map: raise ValueError("Transaksi tidak memiliki item yang bisa direturn.") used_in_batch = {} normalized = [] for it in items: produk_id = str(getattr(it, "produk_id", "") or "").strip() if not produk_id: raise ValueError("Produk return tidak valid.") try: qty = int(getattr(it, "jumlah", 0) or 0) except Exception: qty = 0 if qty <= 0: continue max_qty = int(returnable_map.get(produk_id, 0)) batch_used = int(used_in_batch.get(produk_id, 0)) qty_available = max(0, max_qty - batch_used) if qty > qty_available: raise ValueError( f"Qty return produk {getattr(it, 'produk_nama', produk_id)} melebihi sisa qty ({qty_available})." ) try: harga = float(getattr(it, "harga", 0) or 0) except Exception: harga = 0.0 if harga < 0: raise ValueError("Harga return tidak valid.") used_in_batch[produk_id] = batch_used + qty normalized.append( ReturnItem( produk_id=produk_id, produk_nama=str(getattr(it, "produk_nama", "") or ""), jumlah=qty, harga=harga, jenis_return=str(getattr(it, "jenis_return", "partial") or "partial"), ) ) if not normalized: raise ValueError("Belum ada qty return yang valid.") # Cegah full-note return dari tab return penjualan. is_full_note = True for produk_id, qty_sisa in returnable_map.items(): qty_batch = int(used_in_batch.get(str(produk_id), 0)) if qty_batch < int(qty_sisa): is_full_note = False break if is_full_note: raise ValueError( "Return penuh satu nota tidak diizinkan. Gunakan tab Pembatalan Transaksi." ) return normalized def insert_return(self, transaksi_id: str, items: List[ReturnItem], jenis_return: str = 'partial', refund_method: str = 'cash'): transaksi_id = str(transaksi_id or "").strip() if not transaksi_id: raise ValueError("Transaksi return tidak valid.") if self._is_transaksi_in_settlement_history(transaksi_id): raise ValueError("Transaksi sudah disettle dan tidak dapat diproses return.") items = self._normalize_return_items(transaksi_id, items) total_return = sum(it.harga * it.jumlah for it in items) if total_return <= 0: raise ValueError("Total return harus lebih besar dari 0.") refund_method = str(refund_method or "cash").lower().strip() if refund_method not in ("cash", "voucher"): refund_method = "cash" prefix = "VCR" if refund_method == "voucher" else "RTR" kode_return = prefix + datetime.now().strftime("%Y%m%d%H%M%S") + uuid.uuid4().hex[:3].upper() cur = self.conn.cursor() self.log_debug(f"total return = {total_return}") # 1. Ambil customers_id berdasarkan transaksi_id transaksi_cols = self._get_table_columns("transaksi") select_cols = ["customers_id", "transaksi_nilai"] if "settlement_id" in transaksi_cols: select_cols.append("COALESCE(settlement_id, 1) AS settlement_id") cur.execute( f"SELECT {', '.join(select_cols)} FROM transaksi WHERE id = ?", (transaksi_id,), ) row = cur.fetchone() if not row: raise ValueError("Transaksi tidak ditemukan.") customers_id = row[0] if row else None _nota_total = row[1] if row else 0 if "settlement_id" in transaksi_cols: raw_settlement_id = row["settlement_id"] if row else 1 try: settlement_id = int(raw_settlement_id) except Exception: settlement_id = 1 if settlement_id == 0: raise ValueError("Transaksi sudah disettle dan tidak dapat diproses return.") # 2. Insert ke tabel master return cur.execute(""" INSERT INTO return_transaksi_penjualan (transaksi_id, customer_id, tanggal_return, total_return, kode_voucher, nilai_voucher, jenis_return, refund_method, refund_amount) VALUES (?,?,?,?,?,?,?,?,?) """, ( transaksi_id, customers_id, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), total_return, kode_return, total_return, jenis_return, refund_method, total_return )) ret_id = cur.lastrowid # 3. Insert detail return for it in items: cur.execute(""" INSERT INTO detail_return_transaksi_penjualan (return_id, produk_id, produk_nama, jumlah, jenis_return, harga, subtotal) VALUES (?,?,?,?,?,?,?) """, ( ret_id, it.produk_id, it.produk_nama, it.jumlah, it.jenis_return, it.harga, it.harga * it.jumlah )) self.conn.commit() if refund_method == "voucher": self.voucher_model.create_voucher( kode_return, total_return, return_id=ret_id, transaksi_id=transaksi_id, customer_id=customers_id ) return kode_return def close(self): try: if self.conn: self.conn.close() except Exception as exc: self.log_warning(f"Gagal menutup koneksi ReturnModel: {exc}") finally: self.conn = None def get_history(self, keyword: str = "", start_date: str = None, end_date: str = None, limit: int = 200): kw = f"%{keyword}%" params = [] date_clause = "" if start_date and end_date: date_range = self._build_datetime_range(start_date, end_date) if date_range: date_clause = " AND r.tanggal_return >= ? AND r.tanggal_return < ?" params.extend([date_range[0], date_range[1]]) else: date_clause = " AND date(r.tanggal_return) BETWEEN ? AND ?" params.extend([start_date, end_date]) params.extend([kw, kw, kw, kw]) params.append(int(limit)) conn = self._connect_read() try: # edited by glg # Hindari subquery korelatif COUNT per-row pada history return. # Agregasi jumlah item dibatasi hanya untuk kandidat return pada halaman aktif. return conn.execute( """ WITH base AS ( SELECT r.id AS return_id, r.tanggal_return, r.kode_voucher, r.total_return, r.refund_method, t.nomer AS nomer_nota, t.customers_nama, t.oleh_nama AS kasir_nama FROM return_transaksi_penjualan r LEFT JOIN transaksi t ON t.id = r.transaksi_id WHERE 1=1 {date_clause} AND ( COALESCE(r.kode_voucher, '') LIKE ? OR COALESCE(t.nomer, '') LIKE ? OR COALESCE(t.customers_nama, '') LIKE ? OR COALESCE(t.oleh_nama, '') LIKE ? ) ORDER BY r.tanggal_return DESC LIMIT ? ), detail_count AS ( SELECT dr.return_id, COUNT(1) AS jumlah_item FROM detail_return_transaksi_penjualan dr WHERE dr.return_id IN (SELECT return_id FROM base) GROUP BY dr.return_id ) SELECT b.return_id, b.tanggal_return, b.kode_voucher, b.total_return, b.refund_method, b.nomer_nota, b.customers_nama, b.kasir_nama, COALESCE(dc.jumlah_item, 0) AS jumlah_item FROM base b LEFT JOIN detail_count dc ON dc.return_id = b.return_id ORDER BY b.tanggal_return DESC """.format(date_clause=date_clause), params ).fetchall() finally: conn.close()