import sqlite3, uuid from datetime import datetime, timedelta from dataclasses import dataclass from typing import List from pypos.core.base_model import BaseModel from pypos.core.utils.db_helper import connect_sqlite, connect_sqlite_read_fast from pypos.core.utils.path_utils import get_db_path from pypos.core.utils.sql_identifier_utils import quote_sql_identifier from pypos.core.utils.sql_query_builder import render_sql_template from pypos.core.database.schema_migrator import run_schema_migrations_once from pypos.modules.penjualan.errors import ReturnProcessError from pypos.modules.penjualan.models.voucher_model import VoucherModel @dataclass class ReturnItem(BaseModel): produk_id: str produk_nama: str jumlah: int harga: float jenis_return: str # full/partial class ReturnModel(BaseModel): def __init__(self, db_path: str = None, settlement_lock_service=None): super().__init__() self.db_path = db_path or get_db_path() self.conn = connect_sqlite(self.db_path) self.conn.row_factory = sqlite3.Row self._ensure_tables() self.voucher_model = VoucherModel(self.db_path) # edited by glg # Validasi lock settlement return dipusatkan pada shared service. self.settlement_lock_service = settlement_lock_service or self._build_default_settlement_lock_service() def _build_default_settlement_lock_service(self): module = __import__( "pypos.modules.penjualan.services.settlement_mutation_lock_service", fromlist=["SettlementMutationLockService"], ) return module.SettlementMutationLockService( db_path=self.db_path, log_warning=self.log_warning, ) def _ensure_tables(self): self._ensure_schema_ready_or_fail("RETURN_SCHEMA_MIGRATION_FAILED") self._ensure_return_columns() def _ensure_return_columns(self): self._ensure_schema_ready_or_fail("RETURN_SCHEMA_COLUMN_MIGRATION_FAILED") # edited by glg def _ensure_schema_ready_or_fail(self, reason_code="RETURN_SCHEMA_MIGRATION_FAILED"): ok = run_schema_migrations_once(self.db_path, strict=False) if bool(ok): return True message = ( f"{reason_code}: run_schema_migrations_once(strict=False) mengembalikan False " f"untuk db_path={self.db_path}" ) self.log_error(message) raise RuntimeError(message) # edited by glg # Read path dipisah agar aman dipanggil dari worker thread async. def _connect_read(self): conn = connect_sqlite_read_fast(self.db_path) conn.row_factory = sqlite3.Row return conn # edited by glg def _get_table_columns(self, table_name: str): cur = self.conn.cursor() try: table_sql = quote_sql_identifier(table_name) cur.execute(f"PRAGMA table_info({table_sql})") return {str(row[1]) for row in (cur.fetchall() or []) if row and len(row) > 1} except (sqlite3.Error, ValueError): return set() # edited by glg def _is_transaksi_in_settlement_history(self, transaksi_id: str): return bool(self.settlement_lock_service.is_transaksi_locked(transaksi_id)) # edited by glg # Gunakan range datetime agar filter tanggal tetap index-friendly. def _build_datetime_range(self, start_date: str, end_date: str): try: start_dt = datetime.strptime(str(start_date or "").strip(), "%Y-%m-%d") end_dt = datetime.strptime(str(end_date or "").strip(), "%Y-%m-%d") except (TypeError, ValueError): return None if end_dt < start_dt: start_dt, end_dt = end_dt, start_dt return ( start_dt.strftime("%Y-%m-%d 00:00:00"), (end_dt + timedelta(days=1)).strftime("%Y-%m-%d 00:00:00"), ) # === Transaksi master search === def search_transaksi_master(self, keyword: str, start_date: str = None, end_date: str = None, limit: int = 200): kw = f"%{keyword}%" params = [] date_clause = "" if start_date and end_date: date_range = self._build_datetime_range(start_date, end_date) if date_range: date_clause = " AND dtime >= ? AND dtime < ?" params.extend([date_range[0], date_range[1]]) else: date_clause = " AND date(dtime) BETWEEN ? AND ?" params.extend([start_date, end_date]) params.extend([kw, kw, kw, int(limit)]) conn = self._connect_read() try: # edited by glg # Hindari subquery korelatif per-row: agregasi refund dibatasi ke kandidat # transaksi pada halaman aktif (CTE base + return_total). query = render_sql_template( """ WITH base AS ( SELECT t.id, t.nomer, t.dtime, t.customers_nama, COALESCE(t.transaksi_nilai, 0) AS transaksi_nilai FROM transaksi t WHERE t.jenis_label = 'invoice' AND COALESCE(t.trash, 0) = 0 {date_clause} AND (t.nomer LIKE ? OR t.dtime LIKE ? OR t.customers_nama LIKE ?) ORDER BY t.dtime DESC LIMIT ? ), return_total AS ( SELECT r.transaksi_id, SUM(COALESCE(dr.harga, 0) * COALESCE(dr.jumlah, 0)) AS total_refund FROM return_transaksi_penjualan r JOIN detail_return_transaksi_penjualan dr ON dr.return_id = r.id WHERE r.transaksi_id IN (SELECT id FROM base) GROUP BY r.transaksi_id ) SELECT b.id, b.nomer, b.dtime AS tanggal, b.customers_nama, (COALESCE(b.transaksi_nilai, 0) - COALESCE(rt.total_refund, 0)) AS transaksi_nilai FROM base b LEFT JOIN return_total rt ON rt.transaksi_id = b.id ORDER BY b.dtime DESC """, date_clause=date_clause, ) return conn.execute(query, params).fetchall() finally: conn.close() def load_transaksi_detail(self, transaksi_id: str): conn = self._connect_read() cur = conn.cursor() try: cur.execute( """ SELECT produk_id, SUM(COALESCE(jumlah, 0)) AS qty_returned FROM detail_return_transaksi_penjualan dr JOIN return_transaksi_penjualan r ON r.id = dr.return_id WHERE r.transaksi_id = ? GROUP BY produk_id """, (transaksi_id,), ) returned_map = {str(row[0]): int(row[1] or 0) for row in cur.fetchall()} cur.execute( "SELECT * FROM transaksi_data WHERE transaksi_id = ? AND COALESCE(trash, 0) = 0", (transaksi_id,), ) rows = cur.fetchall() results = [] for row in rows: data = dict(row) produk_id = str(data.get("produk_id")) qty_jual = int(data.get("produk_ord_jml") or 0) qty_returned = int(returned_map.get(produk_id, 0)) qty_returnable = max(0, qty_jual - qty_returned) if qty_returnable <= 0: continue data["qty_returnable"] = qty_returnable data["qty_returned"] = qty_returned results.append(data) return results finally: cur.close() conn.close() def _get_returnable_qty_map(self, transaksi_id: str): cur = self.conn.cursor() cur.execute( """ SELECT produk_id, SUM(COALESCE(jumlah, 0)) AS qty_returned FROM detail_return_transaksi_penjualan dr JOIN return_transaksi_penjualan r ON r.id = dr.return_id WHERE r.transaksi_id = ? GROUP BY produk_id """, (transaksi_id,), ) returned_map = {str(row[0]): int(row[1] or 0) for row in cur.fetchall()} cur.execute( "SELECT produk_id, SUM(COALESCE(produk_ord_jml, 0)) AS qty_jual FROM transaksi_data WHERE transaksi_id = ? AND COALESCE(trash, 0) = 0 GROUP BY produk_id", (transaksi_id,), ) sold_rows = cur.fetchall() result = {} for row in sold_rows: produk_id = str(row[0]) qty_jual = int(row[1] or 0) qty_returned = int(returned_map.get(produk_id, 0)) result[produk_id] = max(0, qty_jual - qty_returned) return result def _normalize_return_items(self, transaksi_id: str, items: List[ReturnItem]): if not items: raise ValueError("Belum ada item return.") returnable_map = self._get_returnable_qty_map(transaksi_id) if not returnable_map: raise ValueError("Transaksi tidak memiliki item yang bisa direturn.") used_in_batch = {} normalized = [] for it in items: produk_id = str(getattr(it, "produk_id", "") or "").strip() if not produk_id: raise ValueError("Produk return tidak valid.") try: qty = int(getattr(it, "jumlah", 0) or 0) except (TypeError, ValueError): qty = 0 if qty <= 0: continue max_qty = int(returnable_map.get(produk_id, 0)) batch_used = int(used_in_batch.get(produk_id, 0)) qty_available = max(0, max_qty - batch_used) if qty > qty_available: raise ValueError( f"Qty return produk {getattr(it, 'produk_nama', produk_id)} melebihi sisa qty ({qty_available})." ) try: harga = float(getattr(it, "harga", 0) or 0) except (TypeError, ValueError): harga = 0.0 if harga < 0: raise ValueError("Harga return tidak valid.") used_in_batch[produk_id] = batch_used + qty normalized.append( ReturnItem( produk_id=produk_id, produk_nama=str(getattr(it, "produk_nama", "") or ""), jumlah=qty, harga=harga, jenis_return=str(getattr(it, "jenis_return", "partial") or "partial"), ) ) if not normalized: raise ValueError("Belum ada qty return yang valid.") # Cegah full-note return dari tab return penjualan. is_full_note = True for produk_id, qty_sisa in returnable_map.items(): qty_batch = int(used_in_batch.get(str(produk_id), 0)) if qty_batch < int(qty_sisa): is_full_note = False break if is_full_note: raise ValueError( "Return penuh satu nota tidak diizinkan. Gunakan tab Pembatalan Transaksi." ) return normalized def insert_return( self, transaksi_id: str, items: List[ReturnItem], jenis_return: str = 'partial', refund_method: str = 'cash', trace_id: str = "", ): trace_tag = str(trace_id or "").strip() or "-" transaksi_id = str(transaksi_id or "").strip() if not transaksi_id: raise ReturnProcessError( "RETURN_INVALID_TRANSAKSI_ID", "Transaksi return tidak valid.", ) if self._is_transaksi_in_settlement_history(transaksi_id): raise ReturnProcessError( "RETURN_SETTLEMENT_LOCKED", "Transaksi sudah disettle dan tidak dapat diproses return.", ) items = self._normalize_return_items(transaksi_id, items) total_return = sum(it.harga * it.jumlah for it in items) if total_return <= 0: raise ReturnProcessError( "RETURN_TOTAL_INVALID", "Total return harus lebih besar dari 0.", ) refund_method = str(refund_method or "cash").lower().strip() if refund_method not in ("cash", "voucher"): refund_method = "cash" prefix = "VCR" if refund_method == "voucher" else "RTR" kode_return = prefix + datetime.now().strftime("%Y%m%d%H%M%S") + uuid.uuid4().hex[:3].upper() cur = self.conn.cursor() try: self.log_debug(f"[RETURN][{trace_tag}] total return = {total_return}") # 1. Ambil customers_id berdasarkan transaksi_id transaksi_cols = self._get_table_columns("transaksi") select_cols = ["customers_id", "transaksi_nilai"] if "settlement_id" in transaksi_cols: select_cols.append("COALESCE(settlement_id, 1) AS settlement_id") transaksi_query = render_sql_template( "SELECT {select_cols} FROM transaksi WHERE id = ?", select_cols=", ".join(select_cols), ) cur.execute(transaksi_query, (transaksi_id,)) row = cur.fetchone() if not row: raise ReturnProcessError( "RETURN_TRANSAKSI_NOT_FOUND", "Transaksi tidak ditemukan.", ) customers_id = row[0] if row else None _nota_total = row[1] if row else 0 if "settlement_id" in transaksi_cols: raw_settlement_id = row["settlement_id"] if row else 1 try: settlement_id = int(raw_settlement_id) except (TypeError, ValueError): settlement_id = 1 if settlement_id == 0: raise ReturnProcessError( "RETURN_SETTLEMENT_LOCKED", "Transaksi sudah disettle dan tidak dapat diproses return.", ) # 2. Insert ke tabel master return cur.execute(""" INSERT INTO return_transaksi_penjualan (transaksi_id, customer_id, tanggal_return, total_return, kode_voucher, nilai_voucher, jenis_return, refund_method, refund_amount) VALUES (?,?,?,?,?,?,?,?,?) """, ( transaksi_id, customers_id, datetime.now().strftime('%Y-%m-%d %H:%M:%S'), total_return, kode_return, total_return, jenis_return, refund_method, total_return )) ret_id = cur.lastrowid # 3. Insert detail return for it in items: cur.execute(""" INSERT INTO detail_return_transaksi_penjualan (return_id, produk_id, produk_nama, jumlah, jenis_return, harga, subtotal) VALUES (?,?,?,?,?,?,?) """, ( ret_id, it.produk_id, it.produk_nama, it.jumlah, it.jenis_return, it.harga, it.harga * it.jumlah )) # edited by glg # Wajib atomik: pembuatan voucher harus berada dalam transaksi DB yang sama. if refund_method == "voucher": self.voucher_model.create_voucher( kode_return, total_return, return_id=ret_id, transaksi_id=transaksi_id, customer_id=customers_id, conn=self.conn, ) self.conn.commit() self.log_info( f"[RETURN][{trace_tag}] insert_return sukses kode={kode_return} " f"transaksi_id={transaksi_id} refund_method={refund_method}" ) return kode_return except ReturnProcessError: self.conn.rollback() raise except sqlite3.Error as exc: self.conn.rollback() raise ReturnProcessError( "RETURN_DB_ERROR", "Gagal menyimpan return transaksi.", cause=exc, ) from exc except (TypeError, ValueError, KeyError) as exc: self.conn.rollback() raise ReturnProcessError( "RETURN_DATA_ERROR", "Data return transaksi tidak valid.", cause=exc, ) from exc except (AttributeError, RuntimeError) as exc: self.conn.rollback() raise ReturnProcessError( "RETURN_UNEXPECTED_ERROR", "Terjadi kesalahan saat memproses return transaksi.", cause=exc, ) from exc finally: cur.close() def close(self): try: if self.conn: self.conn.close() except (sqlite3.Error, RuntimeError, AttributeError) as exc: self.log_warning(f"Gagal menutup koneksi ReturnModel: {exc}") finally: self.conn = None def get_history(self, keyword: str = "", start_date: str = None, end_date: str = None, limit: int = 200): kw = f"%{keyword}%" params = [] date_clause = "" if start_date and end_date: date_range = self._build_datetime_range(start_date, end_date) if date_range: date_clause = " AND r.tanggal_return >= ? AND r.tanggal_return < ?" params.extend([date_range[0], date_range[1]]) else: date_clause = " AND date(r.tanggal_return) BETWEEN ? AND ?" params.extend([start_date, end_date]) params.extend([kw, kw, kw, kw]) params.append(int(limit)) conn = self._connect_read() try: # edited by glg # Hindari subquery korelatif COUNT per-row pada history return. # Agregasi jumlah item dibatasi hanya untuk kandidat return pada halaman aktif. query = render_sql_template( """ WITH base AS ( SELECT r.id AS return_id, r.tanggal_return, r.kode_voucher, r.total_return, r.refund_method, t.nomer AS nomer_nota, t.customers_nama, t.oleh_nama AS kasir_nama FROM return_transaksi_penjualan r LEFT JOIN transaksi t ON t.id = r.transaksi_id WHERE 1=1 {date_clause} AND ( COALESCE(r.kode_voucher, '') LIKE ? OR COALESCE(t.nomer, '') LIKE ? OR COALESCE(t.customers_nama, '') LIKE ? OR COALESCE(t.oleh_nama, '') LIKE ? ) ORDER BY r.tanggal_return DESC LIMIT ? ), detail_count AS ( SELECT dr.return_id, COUNT(1) AS jumlah_item FROM detail_return_transaksi_penjualan dr WHERE dr.return_id IN (SELECT return_id FROM base) GROUP BY dr.return_id ) SELECT b.return_id, b.tanggal_return, b.kode_voucher, b.total_return, b.refund_method, b.nomer_nota, b.customers_nama, b.kasir_nama, COALESCE(dc.jumlah_item, 0) AS jumlah_item FROM base b LEFT JOIN detail_count dc ON dc.return_id = b.return_id ORDER BY b.tanggal_return DESC """, date_clause=date_clause, ) return conn.execute(query, params).fetchall() finally: conn.close()