From 027caba16f425fff95bf6c372b2b6de8eb6d6d91 Mon Sep 17 00:00:00 2001 From: Alfonso <55528394+pifu-dabai@users.noreply.github.com> Date: Fri, 10 Apr 2026 14:16:14 +0800 Subject: [PATCH] feat: add research cohort query and stabilize medication extraction --- database/__init__.py | 3 + database/clinical_multimodal_db.py | 748 +++++++++++++++++++ docs/data_management_research_engine_plan.md | 147 ++++ server/clinical_api.py | 226 ++++++ tests/test_clinical_api.py | 156 ++++ 5 files changed, 1280 insertions(+) create mode 100644 database/clinical_multimodal_db.py create mode 100644 docs/data_management_research_engine_plan.md create mode 100644 server/clinical_api.py create mode 100644 tests/test_clinical_api.py diff --git a/database/__init__.py b/database/__init__.py index e69de29..48f816b 100644 --- a/database/__init__.py +++ b/database/__init__.py @@ -0,0 +1,3 @@ +from .clinical_multimodal_db import ClinicalMultimodalDB, SearchResult + +__all__ = ["ClinicalMultimodalDB", "SearchResult"] diff --git a/database/clinical_multimodal_db.py b/database/clinical_multimodal_db.py new file mode 100644 index 0000000..de55778 --- /dev/null +++ b/database/clinical_multimodal_db.py @@ -0,0 +1,748 @@ +import re +import sqlite3 +from dataclasses import dataclass +from datetime import datetime +from pathlib import Path +from typing import Any, Dict, List, Optional, Sequence + + +@dataclass +class SearchResult: + patient_id: str + visit_id: str + note_text: str + score: float + + +class ClinicalMultimodalDB: + """Notebook-oriented multimodal clinical database on top of SQLite. + + Features included in this lightweight implementation: + - Standardized schema for patient/visit/medication/lesion/lab/assets/audit. + - Intelligent auto-fill extraction from free text (regex-based baseline). + - Natural language semantic retrieval through SQLite FTS5. + - Simple RAG-style answer synthesis with evidence references. + - Longitudinal change detection for lesion and laboratory measurements. + """ + + def __init__(self, db_path: str = "database/clinical_multimodal.db"): + self.db_path = Path(db_path) + self.db_path.parent.mkdir(parents=True, exist_ok=True) + self.conn = sqlite3.connect(str(self.db_path)) + self.conn.row_factory = sqlite3.Row + self.conn.execute("PRAGMA foreign_keys = ON") + self._init_schema() + + def _init_schema(self) -> None: + schema_sql = """ + CREATE TABLE IF NOT EXISTS patient_master ( + patient_id TEXT PRIMARY KEY, + anonymized_id TEXT NOT NULL, + sex TEXT, + birth_year INTEGER, + center TEXT, + enrollment_time TEXT, + created_at TEXT NOT NULL + ); + + CREATE TABLE IF NOT EXISTS visit_event ( + visit_id TEXT PRIMARY KEY, + patient_id TEXT NOT NULL, + visit_time TEXT NOT NULL, + diagnosis TEXT, + stage TEXT, + regimen_version TEXT, + note_text TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (patient_id) REFERENCES patient_master(patient_id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS medication_timeline ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + visit_id TEXT NOT NULL, + drug_name TEXT NOT NULL, + dose_value REAL, + dose_unit TEXT, + frequency TEXT, + change_type TEXT, + change_reason TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (visit_id) REFERENCES visit_event(visit_id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS lesion_measurement ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + visit_id TEXT NOT NULL, + lesion_site TEXT, + area_cm2 REAL, + perimeter_cm REAL, + pigmentation_score REAL, + model_version TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (visit_id) REFERENCES visit_event(visit_id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS lab_result ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + visit_id TEXT NOT NULL, + item_name TEXT NOT NULL, + item_value REAL, + unit TEXT, + ref_range TEXT, + abnormal_flag TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (visit_id) REFERENCES visit_event(visit_id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS multimodal_asset ( + asset_id TEXT PRIMARY KEY, + visit_id TEXT NOT NULL, + asset_type TEXT NOT NULL, + object_uri TEXT NOT NULL, + device TEXT, + iqa_score REAL, + frame_summary TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (visit_id) REFERENCES visit_event(visit_id) ON DELETE CASCADE + ); + + CREATE TABLE IF NOT EXISTS audit_log ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + table_name TEXT NOT NULL, + row_ref TEXT NOT NULL, + field_name TEXT NOT NULL, + old_value TEXT, + new_value TEXT, + source TEXT NOT NULL, + operator TEXT, + reason TEXT, + created_at TEXT NOT NULL + ); + + CREATE VIRTUAL TABLE IF NOT EXISTS visit_note_fts USING fts5( + visit_id UNINDEXED, + patient_id UNINDEXED, + note_text, + tokenize = 'unicode61' + ); + """ + self.conn.executescript(schema_sql) + self.conn.commit() + + @staticmethod + def _now() -> str: + return datetime.utcnow().isoformat(timespec="seconds") + "Z" + + def close(self) -> None: + self.conn.close() + + def record_audit( + self, + table_name: str, + row_ref: str, + field_name: str, + old_value: Optional[str], + new_value: Optional[str], + source: str, + operator: Optional[str] = None, + reason: Optional[str] = None, + ) -> None: + self.conn.execute( + """ + INSERT INTO audit_log(table_name, row_ref, field_name, old_value, new_value, source, operator, reason, created_at) + VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + (table_name, row_ref, field_name, old_value, new_value, source, operator, reason, self._now()), + ) + self.conn.commit() + + def upsert_patient(self, patient: Dict[str, Any]) -> None: + payload = { + "patient_id": patient["patient_id"], + "anonymized_id": patient.get("anonymized_id", patient["patient_id"]), + "sex": patient.get("sex"), + "birth_year": patient.get("birth_year"), + "center": patient.get("center"), + "enrollment_time": patient.get("enrollment_time"), + "created_at": self._now(), + } + self.conn.execute( + """ + INSERT INTO patient_master(patient_id, anonymized_id, sex, birth_year, center, enrollment_time, created_at) + VALUES(:patient_id, :anonymized_id, :sex, :birth_year, :center, :enrollment_time, :created_at) + ON CONFLICT(patient_id) DO UPDATE SET + anonymized_id=excluded.anonymized_id, + sex=excluded.sex, + birth_year=excluded.birth_year, + center=excluded.center, + enrollment_time=excluded.enrollment_time + """, + payload, + ) + self.conn.commit() + + def add_visit(self, visit: Dict[str, Any]) -> None: + payload = { + "visit_id": visit["visit_id"], + "patient_id": visit["patient_id"], + "visit_time": visit["visit_time"], + "diagnosis": visit.get("diagnosis"), + "stage": visit.get("stage"), + "regimen_version": visit.get("regimen_version"), + "note_text": visit.get("note_text", ""), + "created_at": self._now(), + } + self.conn.execute( + """ + INSERT INTO visit_event(visit_id, patient_id, visit_time, diagnosis, stage, regimen_version, note_text, created_at) + VALUES(:visit_id, :patient_id, :visit_time, :diagnosis, :stage, :regimen_version, :note_text, :created_at) + ON CONFLICT(visit_id) DO UPDATE SET + patient_id=excluded.patient_id, + visit_time=excluded.visit_time, + diagnosis=excluded.diagnosis, + stage=excluded.stage, + regimen_version=excluded.regimen_version, + note_text=excluded.note_text + """, + payload, + ) + self.conn.execute("DELETE FROM visit_note_fts WHERE visit_id = ?", (payload["visit_id"],)) + self.conn.execute( + "INSERT INTO visit_note_fts(visit_id, patient_id, note_text) VALUES (?, ?, ?)", + (payload["visit_id"], payload["patient_id"], payload["note_text"]), + ) + self.conn.commit() + + def add_medication(self, visit_id: str, medication: Dict[str, Any]) -> None: + payload = { + "visit_id": visit_id, + "drug_name": medication.get("drug_name"), + "dose_value": medication.get("dose_value"), + "dose_unit": medication.get("dose_unit"), + "frequency": medication.get("frequency"), + "change_type": medication.get("change_type"), + "change_reason": medication.get("change_reason"), + "created_at": self._now(), + } + self.conn.execute( + """ + INSERT INTO medication_timeline(visit_id, drug_name, dose_value, dose_unit, frequency, change_type, change_reason, created_at) + VALUES(:visit_id, :drug_name, :dose_value, :dose_unit, :frequency, :change_type, :change_reason, :created_at) + """, + payload, + ) + self.conn.commit() + self.record_audit( + table_name="medication_timeline", + row_ref=visit_id, + field_name="drug_name", + old_value=None, + new_value=str(payload.get("drug_name")), + source="manual_or_api", + ) + + def add_lesion_measurement(self, visit_id: str, lesion: Dict[str, Any]) -> None: + payload = { + "visit_id": visit_id, + "lesion_site": lesion.get("lesion_site"), + "area_cm2": lesion.get("area_cm2"), + "perimeter_cm": lesion.get("perimeter_cm"), + "pigmentation_score": lesion.get("pigmentation_score"), + "model_version": lesion.get("model_version"), + "created_at": self._now(), + } + self.conn.execute( + """ + INSERT INTO lesion_measurement(visit_id, lesion_site, area_cm2, perimeter_cm, pigmentation_score, model_version, created_at) + VALUES(:visit_id, :lesion_site, :area_cm2, :perimeter_cm, :pigmentation_score, :model_version, :created_at) + """, + payload, + ) + self.conn.commit() + self.record_audit( + table_name="lesion_measurement", + row_ref=visit_id, + field_name="area_cm2", + old_value=None, + new_value=None if payload.get("area_cm2") is None else str(payload.get("area_cm2")), + source="manual_or_api", + ) + + def add_lab_result(self, visit_id: str, lab: Dict[str, Any]) -> None: + payload = { + "visit_id": visit_id, + "item_name": lab.get("item_name"), + "item_value": lab.get("item_value"), + "unit": lab.get("unit"), + "ref_range": lab.get("ref_range"), + "abnormal_flag": lab.get("abnormal_flag"), + "created_at": self._now(), + } + self.conn.execute( + """ + INSERT INTO lab_result(visit_id, item_name, item_value, unit, ref_range, abnormal_flag, created_at) + VALUES(:visit_id, :item_name, :item_value, :unit, :ref_range, :abnormal_flag, :created_at) + """, + payload, + ) + self.conn.commit() + self.record_audit( + table_name="lab_result", + row_ref=visit_id, + field_name=str(payload.get("item_name")), + old_value=None, + new_value=None if payload.get("item_value") is None else str(payload.get("item_value")), + source="manual_or_api", + ) + + def add_asset(self, asset: Dict[str, Any]) -> None: + payload = { + "asset_id": asset["asset_id"], + "visit_id": asset["visit_id"], + "asset_type": asset["asset_type"], + "object_uri": asset["object_uri"], + "device": asset.get("device"), + "iqa_score": asset.get("iqa_score"), + "frame_summary": asset.get("frame_summary"), + "created_at": self._now(), + } + self.conn.execute( + """ + INSERT INTO multimodal_asset(asset_id, visit_id, asset_type, object_uri, device, iqa_score, frame_summary, created_at) + VALUES(:asset_id, :visit_id, :asset_type, :object_uri, :device, :iqa_score, :frame_summary, :created_at) + """, + payload, + ) + self.conn.commit() + + def auto_fill_from_text(self, text: str) -> Dict[str, Any]: + """Baseline intelligent auto-fill from raw medical note text.""" + meds = [] + for match in re.finditer(r"([A-Za-z\u4e00-\u9fa5]+)\s*(\d+(?:\.\d+)?)\s*(mg|g|ml)?\s*(qd|bid|tid|qod|每[日天周]\\d*次)?", text): + drug, dose, unit, frequency = match.groups() + if len(drug) < 2: + continue + if unit is None and frequency is None: + continue + if drug.lower() in {"cm", "mm"}: + continue + meds.append( + { + "drug_name": drug, + "dose_value": float(dose), + "dose_unit": unit or "mg", + "frequency": frequency, + "change_type": "unknown", + } + ) + + area_match = re.search(r"病灶面积\s*[::]?\s*(\d+(?:\.\d+)?)\s*cm2", text) + pigmentation_match = re.search(r"色素(?:评分|指数)?\s*[::]?\s*(\d+(?:\.\d+)?)", text) + + labs = [] + for item, value, unit in re.findall(r"([A-Za-z\u4e00-\u9fa5]+)\s*[::]\s*(\d+(?:\.\d+)?)\s*([A-Za-z/%μ]+)", text): + if item in {"病灶面积", "色素评分", "色素指数"}: + continue + labs.append({"item_name": item, "item_value": float(value), "unit": unit}) + + return { + "medications": meds, + "lesion": { + "area_cm2": float(area_match.group(1)) if area_match else None, + "pigmentation_score": float(pigmentation_match.group(1)) if pigmentation_match else None, + }, + "labs": labs, + "raw_text": text, + } + + def batch_auto_fill_from_texts(self, texts: Sequence[str]) -> List[Dict[str, Any]]: + return [self.auto_fill_from_text(text) for text in texts] + + def commit_auto_fill( + self, + visit_id: str, + extracted: Dict[str, Any], + operator: str = "system", + manual_overrides: Optional[Dict[str, Any]] = None, + ) -> Dict[str, int]: + """Persist extracted payload with optional manual corrections.""" + manual_overrides = manual_overrides or {} + medications = manual_overrides.get("medications", extracted.get("medications", [])) + lesion = manual_overrides.get("lesion", extracted.get("lesion", {})) + labs = manual_overrides.get("labs", extracted.get("labs", [])) + + med_count = 0 + for medication in medications: + self.add_medication(visit_id, medication) + med_count += 1 + lesion_count = 0 + if lesion and any(v is not None for v in lesion.values()): + self.add_lesion_measurement(visit_id, lesion) + lesion_count = 1 + lab_count = 0 + for lab in labs: + self.add_lab_result(visit_id, lab) + lab_count += 1 + + self.record_audit( + table_name="visit_event", + row_ref=visit_id, + field_name="auto_fill_commit", + old_value=None, + new_value=f"medications={med_count},lesion={lesion_count},labs={lab_count}", + source="auto_fill", + operator=operator, + reason="auto_fill_commit_with_optional_manual_overrides", + ) + return {"medications": med_count, "lesions": lesion_count, "labs": lab_count} + + @staticmethod + def score_iqa(blur_score: float, exposure_score: float, lesion_ratio: float) -> Dict[str, Any]: + """Simple IQA scorer for image/video frames. + + All inputs should be normalized to [0, 1], where larger is better. + """ + blur = max(0.0, min(1.0, blur_score)) + exposure = max(0.0, min(1.0, exposure_score)) + ratio = max(0.0, min(1.0, lesion_ratio)) + total = round(0.45 * blur + 0.35 * exposure + 0.20 * ratio, 4) + qualified = total >= 0.65 and ratio >= 0.1 + reasons = [] + if blur < 0.5: + reasons.append("图像清晰度不足") + if exposure < 0.5: + reasons.append("曝光质量不足") + if ratio < 0.1: + reasons.append("病灶占比过低") + return { + "iqa_score": total, + "qualified": qualified, + "reasons": reasons, + } + + def semantic_search(self, query: str, top_k: int = 5) -> List[SearchResult]: + # FTS5 query syntax; quote raw query for better compatibility with Chinese mixed text. + rows = self.conn.execute( + """ + SELECT visit_id, patient_id, note_text, bm25(visit_note_fts) AS score + FROM visit_note_fts + WHERE visit_note_fts MATCH ? + ORDER BY score + LIMIT ? + """, + (query, top_k), + ).fetchall() + return [ + SearchResult( + patient_id=row["patient_id"], + visit_id=row["visit_id"], + note_text=row["note_text"], + score=float(row["score"]), + ) + for row in rows + ] + + def rag_answer(self, query: str, top_k: int = 3) -> Dict[str, Any]: + hits = self.semantic_search(query, top_k=top_k) + if not hits: + return {"answer": "未检索到匹配病例,请放宽筛选条件。", "evidence": []} + + evidence = [ + { + "patient_id": hit.patient_id, + "visit_id": hit.visit_id, + "score": hit.score, + "snippet": hit.note_text[:180], + } + for hit in hits + ] + answer = f"共检索到 {len(hits)} 条高相关记录,已按相关性排序返回证据。" + return {"answer": answer, "evidence": evidence} + + def detect_changes(self, patient_id: str, lab_items: Optional[Sequence[str]] = None) -> Dict[str, Any]: + visits = self.conn.execute( + """ + SELECT visit_id, visit_time + FROM visit_event + WHERE patient_id = ? + ORDER BY visit_time DESC + LIMIT 2 + """, + (patient_id,), + ).fetchall() + if len(visits) < 2: + return {"patient_id": patient_id, "message": "对比至少需要两次就诊记录。"} + + latest_visit, previous_visit = visits[0]["visit_id"], visits[1]["visit_id"] + lesion_delta = self._lesion_delta(latest_visit, previous_visit) + lab_delta = self._lab_delta(latest_visit, previous_visit, lab_items) + return { + "patient_id": patient_id, + "latest_visit_id": latest_visit, + "previous_visit_id": previous_visit, + "lesion_delta": lesion_delta, + "lab_delta": lab_delta, + } + + def _lesion_delta(self, latest_visit_id: str, previous_visit_id: str) -> Dict[str, Any]: + get_area = "SELECT area_cm2 FROM lesion_measurement WHERE visit_id = ? ORDER BY id DESC LIMIT 1" + latest = self.conn.execute(get_area, (latest_visit_id,)).fetchone() + previous = self.conn.execute(get_area, (previous_visit_id,)).fetchone() + if not latest or not previous or latest["area_cm2"] is None or previous["area_cm2"] in (None, 0): + return {"message": "病灶面积数据不足,无法计算变化率。"} + + delta = latest["area_cm2"] - previous["area_cm2"] + ratio = delta / previous["area_cm2"] * 100 + return { + "latest_area_cm2": latest["area_cm2"], + "previous_area_cm2": previous["area_cm2"], + "delta_area_cm2": round(delta, 4), + "delta_ratio_pct": round(ratio, 2), + } + + def _lab_delta(self, latest_visit_id: str, previous_visit_id: str, lab_items: Optional[Sequence[str]]) -> List[Dict[str, Any]]: + clause = "" + params: List[Any] = [latest_visit_id] + if lab_items: + placeholders = ",".join(["?" for _ in lab_items]) + clause = f"AND item_name IN ({placeholders})" + params.extend(lab_items) + + latest_rows = self.conn.execute( + f"SELECT item_name, item_value, unit FROM lab_result WHERE visit_id = ? {clause}", + params, + ).fetchall() + + previous_map = { + row["item_name"]: row + for row in self.conn.execute( + "SELECT item_name, item_value, unit FROM lab_result WHERE visit_id = ?", + (previous_visit_id,), + ).fetchall() + } + + deltas = [] + for row in latest_rows: + old = previous_map.get(row["item_name"]) + if not old or old["item_value"] is None or row["item_value"] is None: + continue + old_value = float(old["item_value"]) + new_value = float(row["item_value"]) + delta = new_value - old_value + ratio = None if old_value == 0 else (delta / old_value * 100) + deltas.append( + { + "item_name": row["item_name"], + "old_value": old_value, + "new_value": new_value, + "unit": row["unit"] or old["unit"], + "delta": round(delta, 4), + "delta_ratio_pct": None if ratio is None else round(ratio, 2), + } + ) + return deltas + + def export_patient_snapshot(self, patient_id: str) -> Dict[str, Any]: + patient = self.conn.execute( + "SELECT * FROM patient_master WHERE patient_id = ?", + (patient_id,), + ).fetchone() + if not patient: + return {} + + visits = [ + dict(row) + for row in self.conn.execute( + "SELECT * FROM visit_event WHERE patient_id = ? ORDER BY visit_time", + (patient_id,), + ).fetchall() + ] + return {"patient": dict(patient), "visits": visits} + + def extract_knowledge_graph(self, text: str) -> Dict[str, Any]: + """Extract a lightweight diagnosis logic graph from plain text. + + Pattern focus: symptom -> drug -> indicator. + """ + symptom_candidates = re.findall(r"(瘙痒|红斑|脱屑|疼痛|灼热|丘疹|水疱)", text) + drug_candidates = [item["drug_name"] for item in self.auto_fill_from_text(text)["medications"]] + indicator_candidates = [item["item_name"] for item in self.auto_fill_from_text(text)["labs"]] + + nodes = [] + edges = [] + seen = set() + + def _add_node(node_id: str, label: str, node_type: str) -> None: + key = (node_id, node_type) + if key in seen: + return + seen.add(key) + nodes.append({"id": node_id, "label": label, "type": node_type}) + + for symptom in symptom_candidates: + _add_node(f"symptom:{symptom}", symptom, "symptom") + for drug in drug_candidates: + _add_node(f"drug:{drug}", drug, "drug") + for indicator in indicator_candidates: + _add_node(f"indicator:{indicator}", indicator, "indicator") + + for symptom in symptom_candidates: + for drug in drug_candidates: + edges.append( + { + "source": f"symptom:{symptom}", + "target": f"drug:{drug}", + "relation": "treated_by", + } + ) + for drug in drug_candidates: + for indicator in indicator_candidates: + edges.append( + { + "source": f"drug:{drug}", + "target": f"indicator:{indicator}", + "relation": "affects", + } + ) + return {"nodes": nodes, "edges": edges} + + def build_patient_knowledge_graph(self, patient_id: str) -> Dict[str, Any]: + rows = self.conn.execute( + "SELECT note_text FROM visit_event WHERE patient_id = ? ORDER BY visit_time", + (patient_id,), + ).fetchall() + merged_nodes: Dict[str, Dict[str, str]] = {} + merged_edges: Dict[str, Dict[str, str]] = {} + for row in rows: + graph = self.extract_knowledge_graph(row["note_text"] or "") + for node in graph["nodes"]: + merged_nodes[node["id"]] = node + for edge in graph["edges"]: + edge_id = f'{edge["source"]}->{edge["relation"]}->{edge["target"]}' + merged_edges[edge_id] = edge + return { + "patient_id": patient_id, + "nodes": list(merged_nodes.values()), + "edges": list(merged_edges.values()), + } + + def lesion_evolution(self, patient_id: str) -> Dict[str, Any]: + rows = self.conn.execute( + """ + SELECT v.visit_time, l.area_cm2, l.pigmentation_score + FROM visit_event v + LEFT JOIN lesion_measurement l ON v.visit_id = l.visit_id + WHERE v.patient_id = ? + ORDER BY v.visit_time + """, + (patient_id,), + ).fetchall() + timeline = [] + prev_area = None + for row in rows: + area = row["area_cm2"] + delta_ratio = None + if prev_area not in (None, 0) and area is not None: + delta_ratio = round((area - prev_area) / prev_area * 100, 2) + timeline.append( + { + "visit_time": row["visit_time"], + "area_cm2": area, + "pigmentation_score": row["pigmentation_score"], + "delta_ratio_pct": delta_ratio, + } + ) + if area is not None: + prev_area = area + return {"patient_id": patient_id, "timeline": timeline} + + def get_audit_logs(self, row_ref: Optional[str] = None, limit: int = 100) -> List[Dict[str, Any]]: + if row_ref: + rows = self.conn.execute( + "SELECT * FROM audit_log WHERE row_ref = ? ORDER BY id DESC LIMIT ?", + (row_ref, limit), + ).fetchall() + else: + rows = self.conn.execute( + "SELECT * FROM audit_log ORDER BY id DESC LIMIT ?", + (limit,), + ).fetchall() + return [dict(row) for row in rows] + + def research_query_dose_up_lesion_down( + self, + year: int, + lesion_reduction_pct: float = 20.0, + ) -> List[Dict[str, Any]]: + """Find patients whose dose increases while lesion area decreases over a year.""" + patient_rows = self.conn.execute( + """ + SELECT DISTINCT patient_id + FROM visit_event + WHERE substr(visit_time, 1, 4) = ? + """, + (str(year),), + ).fetchall() + results: List[Dict[str, Any]] = [] + for row in patient_rows: + patient_id = row["patient_id"] + visits = self.conn.execute( + """ + SELECT visit_id, visit_time + FROM visit_event + WHERE patient_id = ? AND substr(visit_time, 1, 4) = ? + ORDER BY visit_time + """, + (patient_id, str(year)), + ).fetchall() + if len(visits) < 2: + continue + first_visit, last_visit = visits[0]["visit_id"], visits[-1]["visit_id"] + + first_area_row = self.conn.execute( + "SELECT area_cm2 FROM lesion_measurement WHERE visit_id = ? ORDER BY id DESC LIMIT 1", + (first_visit,), + ).fetchone() + last_area_row = self.conn.execute( + "SELECT area_cm2 FROM lesion_measurement WHERE visit_id = ? ORDER BY id DESC LIMIT 1", + (last_visit,), + ).fetchone() + if not first_area_row or not last_area_row: + continue + first_area = first_area_row["area_cm2"] + last_area = last_area_row["area_cm2"] + if first_area in (None, 0) or last_area is None: + continue + lesion_change_pct = (last_area - first_area) / first_area * 100.0 + + first_dose = self.conn.execute( + "SELECT COALESCE(SUM(dose_value), 0) AS total_dose FROM medication_timeline WHERE visit_id = ?", + (first_visit,), + ).fetchone()["total_dose"] + last_dose = self.conn.execute( + "SELECT COALESCE(SUM(dose_value), 0) AS total_dose FROM medication_timeline WHERE visit_id = ?", + (last_visit,), + ).fetchone()["total_dose"] + dose_change_pct = 0.0 if first_dose in (None, 0) else (last_dose - first_dose) / first_dose * 100.0 + + if dose_change_pct > 0 and lesion_change_pct <= -abs(lesion_reduction_pct): + results.append( + { + "patient_id": patient_id, + "first_visit_id": first_visit, + "last_visit_id": last_visit, + "first_total_dose": round(float(first_dose), 4), + "last_total_dose": round(float(last_dose), 4), + "dose_change_pct": round(float(dose_change_pct), 2), + "first_area_cm2": round(float(first_area), 4), + "last_area_cm2": round(float(last_area), 4), + "lesion_change_pct": round(float(lesion_change_pct), 2), + } + ) + return results + + def __del__(self) -> None: + try: + self.close() + except Exception: + pass diff --git a/docs/data_management_research_engine_plan.md b/docs/data_management_research_engine_plan.md new file mode 100644 index 0000000..0243c0d --- /dev/null +++ b/docs/data_management_research_engine_plan.md @@ -0,0 +1,147 @@ +# 基于笔记本媒介的图像-视频-文本一体化数据库技术落地方案(数据管理场景) + +## 1. 建设目标 +- 以“笔记本媒介”为入口,构建可落地的多模态数据库,覆盖图像、视频、文本及结构化指标。 +- 规范录入诊疗相关全量数据,实现“自动识别 + 精准匹配 + 批量填充 + 人工校正”的协同流程。 +- 将数据库能力从“记录工具”升级为“科研引擎”,支撑实时监测、纵向追踪、可追溯统计与深度研究。 + +## 2. 数据范围与标准化对象 +### 2.1 核心数据域 +1. 用药信息:药物种类、剂量、频次、调整记录(增减量、停药、换药)。 +2. 病灶信息:面积测量值、边界变化、动态趋势。 +3. 色素信息:色素分布、深浅变化、前后差异。 +4. 体征信息:身高、体重等周期性监测数据。 +5. 化验信息:各类实验室指标、时间序列波动、异常标记。 + +### 2.2 数据治理规范 +- 统一患者主索引(Patient ID)与时间戳标准(就诊时间、采样时间、录入时间)。 +- 统一计量单位(mg、mg/day、cm²、mmol/L 等)与枚举字典(药物名、检测项名、部位编码)。 +- 建立“原始数据层 + 标准化层 + 特征层 + 研究主题层”的分层模型,保证回溯能力。 + +## 3. 智能数据填写模块(新增) +### 3.1 能力定义 +- 自动识别:解析图像病灶参数、视频动态数据、文本诊疗信息与化验指标。 +- 字段匹配:基于实体识别 + 规则映射,将识别结果映射到标准字段。 +- 批量填充:支持同批次病例的批处理导入与自动补全。 +- 人工校正:保留人工确认与修改入口,实现“智能 + 准确”闭环。 + +### 3.2 推荐技术路线 +- 图像:病灶分割模型(如 UNet/Mask2Former)+ 面积量化。 +- 视频:关键帧提取 + 时序特征建模(如 TimeSformer/3D-CNN)。 +- 文本:医疗 NER + 关系抽取 + 指标归一化。 +- 匹配层:规则引擎(单位换算、同义词标准化)+ 向量相似度纠错。 + +### 3.3 质量控制机制 +- 字段置信度阈值(低置信度自动转入人工复核队列)。 +- 双轨日志:记录“模型填充值/人工修正值/修正原因/修正人”。 +- 全流程审计:支持按病例、字段、时间段追踪数据来源与变更轨迹。 + +## 4. 四项核心科研增强功能 + +### 4.1 语义搜索与 RAG(检索增强生成) +- 基于向量数据库(Milvus / FAISS / pgvector)建立病例语义索引。 +- 支持自然语言检索复杂条件病例,例如: + - “调取去年所有用药剂量增加且病灶面积缩小超过 20% 的患者视频”。 +- RAG 输出可追溯答案:返回结论 + 引用病例片段 + 关键字段来源。 + +### 4.2 知识图谱自动化构建 +- 自动抽取“症状 → 药物 → 指标变化”等诊疗路径。 +- 生成可视化关系图谱(患者群体、药物、指标、事件节点)。 +- 支持发现潜在副作用关联、非典型疗效演变路径与组合用药线索。 + +### 4.3 图像质量自动化评分(IQA) +- 图像/视频帧入库即评估:清晰度、曝光度、对焦情况、病灶占比。 +- 对低质量素材(模糊、欠曝、过曝、病灶区域不足)即时告警。 +- 将 IQA 分数纳入采集 KPI,实现源头质量闭环。 + +### 4.4 纵向演变热力图可视化 +- 通过差分图像算法与配准技术,计算病灶区域逐期变化。 +- 将数值变化映射为热力图,直观展示愈合/恶化空间分布。 +- 支持医生在单病例与队列层面进行疗效对比分析。 + +## 5. 数据对比检测模型(变化识别引擎) +### 5.1 模型目标 +- 自动识别并标记“前后变化节点、变化幅度、变化方向、变化速度”。 +- 覆盖数值型、分类型、影像型、文本事件型四类数据。 + +### 5.2 关键方法 +- 时间序列检测:异常点检测 + 趋势分段。 +- 事件对齐:以治疗方案调整为锚点,分析前后窗口期指标变化。 +- 多模态融合:将影像特征、文本事件、化验指标映射到统一时序轴。 + +### 5.3 输出能力 +- 实时监测看板:关键风险与疗效信号自动提醒。 +- 精准统计分析:按药物、病种、亚组自动生成变化统计。 +- 可追溯证据链:每个结论可回溯至原始图像/视频/文本与计算过程。 + +## 6. 系统架构建议(落地导向) +- 采集层:笔记本端录入组件 + 设备接入(相机/检验系统/随访表单)。 +- 处理层:多模态解析服务(CV、NLP、OCR、ASR)+ 规则校验服务。 +- 存储层: + - 对象存储(图像/视频) + - 关系型数据库(结构化字段) + - 向量数据库(语义检索) + - 图数据库(知识图谱) +- 应用层:录入台、质控台、科研检索台、可视化分析台。 + +## 7. 分阶段实施路径 +1. 第一阶段(1-2 个月):字段标准化、数据模型搭建、基础录入与审核流程上线。 +2. 第二阶段(2-4 个月):智能填充、IQA、语义检索初版上线。 +3. 第三阶段(4-6 个月):知识图谱、热力图、变化检测模型与科研报表联动。 +4. 第四阶段(持续迭代):模型校准、规则优化、跨中心数据协同与科研课题沉淀。 + +## 8. 预期价值 +- 管理价值:提升录入效率、降低漏填误填率、强化质量可控。 +- 临床价值:增强疗效判读与动态追踪能力。 +- 科研价值:显著缩短病例检索与证据组织时间,提升高质量研究产出效率。 + +## 9. 数据库与检索层落地细化 +### 9.1 推荐库表(关系型) +- `patient_master`:患者主索引(匿名 ID、性别、出生年、中心、入组时间)。 +- `visit_event`:就诊事件(就诊时间、诊断、分期、方案版本)。 +- `medication_timeline`:用药时间线(药物、剂量、频次、调整原因)。 +- `lesion_measurement`:病灶结构化测量(面积、周长、部位编码、模型版本)。 +- `lab_result`:化验指标(项目、值、单位、参考区间、异常标记)。 +- `multimodal_asset`:多模态资产元数据(对象存储 URI、采集设备、IQA 分、帧级摘要)。 +- `audit_log`:审计日志(字段旧值/新值、来源、操作人、时间、原因)。 + +### 9.2 向量检索与 RAG 索引设计 +- 分层向量:病例摘要向量、单次就诊向量、段落/句级向量并存。 +- 混合检索:关键词 BM25 + 向量 ANN(避免纯向量漏召回)。 +- 元数据过滤:按时间范围、病种、治疗方案、中心进行 pre-filter。 +- 重排策略:Cross-Encoder 进行 Top-K 重排,输出可追溯证据片段。 + +## 10. 算法指标与验收标准(可量化) +### 10.1 智能填充模块 +- 字段级准确率 ≥ 95%,关键字段(药物剂量、病灶面积、化验值)≥ 98%。 +- 低置信度召回率 ≥ 99%(宁可多提醒,不可漏风险字段)。 +- 单病例录入耗时较基线下降 ≥ 40%。 + +### 10.2 语义检索与 RAG +- Top-5 召回率 ≥ 90%,复杂条件查询平均响应 ≤ 3 秒(离线索引前提)。 +- 生成答案引用完整率 ≥ 95%(每条结论可定位到来源片段)。 + +### 10.3 IQA 与热力图 +- IQA 低质拦截准确率 ≥ 92%,误报率 ≤ 8%。 +- 热力图与人工标注的一致性(Dice/IoU)达到项目阈值(建议 IoU ≥ 0.80)。 + +## 11. 接口与流程编排建议 +### 11.1 关键服务 API(示例) +- `POST /ingest/multimodal`:上传图像/视频/文本并触发解析。 +- `POST /extract/auto-fill`:返回字段候选值、置信度、映射依据。 +- `POST /review/commit`:人工确认后写入标准层并记录审计日志。 +- `POST /search/semantic`:自然语言查询 + 条件过滤 + 证据返回。 +- `GET /evolution/heatmap/{patient_id}`:返回时序热力图与变化统计。 + +### 11.2 流程编排 +1. 数据采集进入消息队列(Kafka/RabbitMQ)并异步解析。 +2. 解析结果写入暂存区,规则引擎完成单位归一、字典映射。 +3. 低置信度字段进入人工复核任务池。 +4. 通过审核后写入正式库,并同步更新向量库与图数据库。 +5. 触发变化检测任务,更新监测看板与科研主题数据集。 + +## 12. 风险与合规建议 +- 数据安全:传输 TLS、存储加密(KMS)、最小权限访问控制(RBAC/ABAC)。 +- 隐私合规:脱敏与可逆映射分离存放,导出数据默认去标识化。 +- 模型漂移:按月监控准确率与置信度分布,触发再训练与阈值校准。 +- 跨中心一致性:统一字典版本与模型版本,建立变更公告与回溯机制。 diff --git a/server/clinical_api.py b/server/clinical_api.py new file mode 100644 index 0000000..d831362 --- /dev/null +++ b/server/clinical_api.py @@ -0,0 +1,226 @@ +from flask import Flask, jsonify, request + +from database import ClinicalMultimodalDB + + +def create_app(db_path: str = "database/clinical_multimodal.db") -> Flask: + app = Flask(__name__) + db = ClinicalMultimodalDB(db_path=db_path) + + def _require(payload: dict, keys: list) -> tuple: + missing = [key for key in keys if key not in payload] + return (len(missing) == 0, missing) + + @app.get("/health") + def health() -> tuple: + return jsonify({"status": "ok"}), 200 + + @app.post("/patients/upsert") + def upsert_patient() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["patient_id"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + db.upsert_patient(payload) + return jsonify({"ok": True, "patient_id": payload.get("patient_id")}), 200 + + @app.post("/visits") + def add_visit() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["visit_id", "patient_id", "visit_time"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + db.add_visit(payload) + return jsonify({"ok": True, "visit_id": payload.get("visit_id")}), 201 + + @app.post("/medications") + def add_medication() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["visit_id", "medication"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + db.add_medication(payload["visit_id"], payload["medication"]) + return jsonify({"ok": True}), 201 + + @app.post("/lesions") + def add_lesion() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["visit_id", "lesion"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + db.add_lesion_measurement(payload["visit_id"], payload["lesion"]) + return jsonify({"ok": True}), 201 + + @app.post("/labs") + def add_lab() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["visit_id", "lab"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + db.add_lab_result(payload["visit_id"], payload["lab"]) + return jsonify({"ok": True}), 201 + + @app.post("/assets") + def add_asset() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["asset_id", "visit_id", "asset_type", "object_uri"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + db.add_asset(payload) + return jsonify({"ok": True, "asset_id": payload.get("asset_id")}), 201 + + @app.post("/extract/auto-fill") + def auto_fill() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["text"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + result = db.auto_fill_from_text(payload["text"]) + return jsonify(result), 200 + + @app.post("/extract/auto-fill/batch") + def auto_fill_batch() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["texts"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + result = db.batch_auto_fill_from_texts(payload["texts"]) + return jsonify({"count": len(result), "items": result}), 200 + + @app.post("/quality/iqa") + def score_iqa() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["blur_score", "exposure_score", "lesion_ratio"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + result = db.score_iqa( + blur_score=float(payload["blur_score"]), + exposure_score=float(payload["exposure_score"]), + lesion_ratio=float(payload["lesion_ratio"]), + ) + return jsonify(result), 200 + + @app.post("/search/semantic") + def semantic_search() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["query"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + query = payload["query"] + top_k = int(payload.get("top_k", 5)) + hits = db.semantic_search(query, top_k=top_k) + return ( + jsonify( + { + "query": query, + "hits": [ + { + "patient_id": h.patient_id, + "visit_id": h.visit_id, + "score": h.score, + "note_text": h.note_text, + } + for h in hits + ], + } + ), + 200, + ) + + @app.post("/search/rag") + def rag() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["query"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + query = payload["query"] + top_k = int(payload.get("top_k", 3)) + return jsonify(db.rag_answer(query, top_k=top_k)), 200 + + @app.post("/ingest/multimodal") + def ingest_multimodal() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["patient", "visit", "text"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + db.upsert_patient(payload["patient"]) + db.add_visit(payload["visit"]) + extracted = db.auto_fill_from_text(payload["text"]) + persist = bool(payload.get("persist_auto_fill", True)) + committed = None + if persist: + committed = db.commit_auto_fill( + visit_id=payload["visit"]["visit_id"], + extracted=extracted, + operator=payload.get("operator", "system"), + manual_overrides=payload.get("manual_overrides"), + ) + return jsonify({"ok": True, "extracted": extracted, "committed": committed}), 201 + + @app.post("/review/commit") + def review_commit() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["visit_id", "extracted"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + result = db.commit_auto_fill( + visit_id=payload["visit_id"], + extracted=payload["extracted"], + operator=payload.get("operator", "reviewer"), + manual_overrides=payload.get("manual_overrides"), + ) + return jsonify({"ok": True, "committed": result}), 200 + + @app.get("/patients//changes") + def detect_changes(patient_id: str) -> tuple: + item_names = request.args.get("items") + lab_items = item_names.split(",") if item_names else None + return jsonify(db.detect_changes(patient_id, lab_items=lab_items)), 200 + + @app.get("/patients//snapshot") + def snapshot(patient_id: str) -> tuple: + data = db.export_patient_snapshot(patient_id) + if not data: + return jsonify({"message": "patient not found"}), 404 + return jsonify(data), 200 + + @app.post("/graph/extract") + def extract_graph() -> tuple: + payload = request.get_json(force=True) + ok, missing = _require(payload, ["text"]) + if not ok: + return jsonify({"ok": False, "missing": missing}), 400 + return jsonify(db.extract_knowledge_graph(payload["text"])), 200 + + @app.get("/patients//graph") + def patient_graph(patient_id: str) -> tuple: + return jsonify(db.build_patient_knowledge_graph(patient_id)), 200 + + @app.get("/patients//evolution") + def patient_evolution(patient_id: str) -> tuple: + return jsonify(db.lesion_evolution(patient_id)), 200 + + @app.get("/audits") + def audits() -> tuple: + row_ref = request.args.get("row_ref") + limit = int(request.args.get("limit", 100)) + return jsonify({"items": db.get_audit_logs(row_ref=row_ref, limit=limit)}), 200 + + @app.get("/research/query") + def research_query() -> tuple: + year = request.args.get("year") + if not year: + return jsonify({"ok": False, "missing": ["year"]}), 400 + reduction = float(request.args.get("lesion_reduction_pct", 20)) + items = db.research_query_dose_up_lesion_down( + year=int(year), + lesion_reduction_pct=reduction, + ) + return jsonify({"ok": True, "count": len(items), "items": items}), 200 + + return app + + +if __name__ == "__main__": + app = create_app() + app.run(host="0.0.0.0", port=8000, debug=True) diff --git a/tests/test_clinical_api.py b/tests/test_clinical_api.py new file mode 100644 index 0000000..dbffc4c --- /dev/null +++ b/tests/test_clinical_api.py @@ -0,0 +1,156 @@ +import tempfile +import unittest +from pathlib import Path + +from server.clinical_api import create_app + + +class ClinicalApiTestCase(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.TemporaryDirectory() + self.db_path = str(Path(self.tmpdir.name) / "clinical_api_test.db") + self.app = create_app(db_path=self.db_path) + self.client = self.app.test_client() + + def tearDown(self): + self.tmpdir.cleanup() + + def test_end_to_end_flow(self): + patient = { + "patient_id": "P100", + "anonymized_id": "AX100", + "sex": "F", + "birth_year": 1992, + "center": "center-a", + } + resp = self.client.post("/patients/upsert", json=patient) + self.assertEqual(resp.status_code, 200) + + visit_1 = { + "visit_id": "V100-1", + "patient_id": "P100", + "visit_time": "2026-03-01T10:00:00Z", + "note_text": "他克莫司 2 mg bid,病灶面积: 12.5 cm2,CRP: 8.2 mg/L", + } + visit_2 = { + "visit_id": "V100-2", + "patient_id": "P100", + "visit_time": "2026-04-01T10:00:00Z", + "note_text": "剂量增加,病灶面积: 9.0 cm2,CRP: 4.1 mg/L", + } + self.assertEqual(self.client.post("/visits", json=visit_1).status_code, 201) + self.assertEqual(self.client.post("/visits", json=visit_2).status_code, 201) + ingest_payload = { + "patient": patient, + "visit": { + "visit_id": "V100-0", + "patient_id": "P100", + "visit_time": "2026-02-01T10:00:00Z", + "note_text": "红斑,他克莫司 1 mg qd,CRP: 10.0 mg/L,病灶面积: 15.0 cm2", + }, + "text": "红斑,他克莫司 1 mg qd,CRP: 10.0 mg/L,病灶面积: 15.0 cm2", + "persist_auto_fill": True, + "operator": "bot", + } + ingest_resp = self.client.post("/ingest/multimodal", json=ingest_payload) + self.assertEqual(ingest_resp.status_code, 201) + self.assertTrue(ingest_resp.get_json()["ok"]) + + auto_fill_resp = self.client.post("/extract/auto-fill", json={"text": visit_1["note_text"]}) + self.assertEqual(auto_fill_resp.status_code, 200) + auto_fill_data = auto_fill_resp.get_json() + self.assertTrue(len(auto_fill_data["medications"]) >= 1) + batch_fill_resp = self.client.post("/extract/auto-fill/batch", json={"texts": [visit_1["note_text"], visit_2["note_text"]]}) + self.assertEqual(batch_fill_resp.status_code, 200) + self.assertEqual(batch_fill_resp.get_json()["count"], 2) + + self.assertEqual( + self.client.post("/lesions", json={"visit_id": "V100-1", "lesion": {"area_cm2": 12.5}}).status_code, + 201, + ) + self.assertEqual( + self.client.post("/lesions", json={"visit_id": "V100-2", "lesion": {"area_cm2": 9.0}}).status_code, + 201, + ) + self.assertEqual( + self.client.post("/labs", json={"visit_id": "V100-1", "lab": {"item_name": "CRP", "item_value": 8.2, "unit": "mg/L"}}).status_code, + 201, + ) + self.assertEqual( + self.client.post("/labs", json={"visit_id": "V100-2", "lab": {"item_name": "CRP", "item_value": 4.1, "unit": "mg/L"}}).status_code, + 201, + ) + self.assertEqual( + self.client.post("/medications", json={"visit_id": "V100-0", "medication": {"drug_name": "他克莫司", "dose_value": 1.0, "dose_unit": "mg", "frequency": "qd"}}).status_code, + 201, + ) + self.assertEqual( + self.client.post("/medications", json={"visit_id": "V100-2", "medication": {"drug_name": "他克莫司", "dose_value": 2.0, "dose_unit": "mg", "frequency": "bid"}}).status_code, + 201, + ) + + semantic = self.client.post("/search/semantic", json={"query": "病灶面积 CRP", "top_k": 3}) + self.assertEqual(semantic.status_code, 200) + self.assertTrue(len(semantic.get_json()["hits"]) >= 1) + + rag = self.client.post("/search/rag", json={"query": "病灶面积变化"}) + self.assertEqual(rag.status_code, 200) + self.assertIn("answer", rag.get_json()) + iqa = self.client.post("/quality/iqa", json={"blur_score": 0.9, "exposure_score": 0.8, "lesion_ratio": 0.2}) + self.assertEqual(iqa.status_code, 200) + self.assertTrue(iqa.get_json()["qualified"]) + + changes = self.client.get("/patients/P100/changes?items=CRP") + self.assertEqual(changes.status_code, 200) + change_data = changes.get_json() + self.assertEqual(change_data["patient_id"], "P100") + self.assertEqual(change_data["lesion_delta"]["delta_ratio_pct"], -28.0) + + snapshot = self.client.get("/patients/P100/snapshot") + self.assertEqual(snapshot.status_code, 200) + self.assertEqual(snapshot.get_json()["patient"]["patient_id"], "P100") + + review_payload = { + "visit_id": "V100-2", + "extracted": { + "medications": [{"drug_name": "他克莫司", "dose_value": 2.0, "dose_unit": "mg", "frequency": "bid"}], + "lesion": {"area_cm2": 9.0}, + "labs": [{"item_name": "CRP", "item_value": 4.1, "unit": "mg/L"}], + }, + "operator": "doctor_a", + "manual_overrides": {"lesion": {"area_cm2": 8.8, "pigmentation_score": 0.4}}, + } + review = self.client.post("/review/commit", json=review_payload) + self.assertEqual(review.status_code, 200) + self.assertEqual(review.get_json()["committed"]["lesions"], 1) + + graph = self.client.post("/graph/extract", json={"text": "患者红斑,使用他克莫司 2 mg bid,CRP: 8.2 mg/L"}) + self.assertEqual(graph.status_code, 200) + self.assertTrue(len(graph.get_json()["nodes"]) >= 1) + + patient_graph = self.client.get("/patients/P100/graph") + self.assertEqual(patient_graph.status_code, 200) + self.assertEqual(patient_graph.get_json()["patient_id"], "P100") + + evolution = self.client.get("/patients/P100/evolution") + self.assertEqual(evolution.status_code, 200) + self.assertTrue(len(evolution.get_json()["timeline"]) >= 2) + audits = self.client.get("/audits?row_ref=V100-2&limit=20") + self.assertEqual(audits.status_code, 200) + self.assertTrue(len(audits.get_json()["items"]) >= 1) + research = self.client.get("/research/query?year=2026&lesion_reduction_pct=20") + self.assertEqual(research.status_code, 200) + self.assertTrue(research.get_json()["count"] >= 1) + + def test_validation_guardrails(self): + self.assertEqual(self.client.post("/patients/upsert", json={}).status_code, 400) + self.assertEqual(self.client.post("/extract/auto-fill", json={}).status_code, 400) + self.assertEqual(self.client.post("/search/semantic", json={}).status_code, 400) + self.assertEqual(self.client.post("/graph/extract", json={}).status_code, 400) + self.assertEqual(self.client.post("/ingest/multimodal", json={}).status_code, 400) + self.assertEqual(self.client.post("/review/commit", json={}).status_code, 400) + self.assertEqual(self.client.get("/research/query").status_code, 400) + + +if __name__ == "__main__": + unittest.main()