From d5df801673d6a2c1ad26916db96aed99d1a360ec Mon Sep 17 00:00:00 2001 From: junjun Date: Fri, 3 Apr 2026 16:20:49 +0800 Subject: [PATCH] feat: SQLBot supports custom data types in data sources #768 --- backend/apps/datasource/api/datasource.py | 106 +++++++++++++++++-- backend/apps/datasource/models/datasource.py | 15 +++ backend/apps/datasource/utils/__init__.py | 6 ++ backend/apps/datasource/utils/excel.py | 62 +++++++++++ backend/apps/swagger/locales/en.json | 2 + backend/apps/swagger/locales/zh.json | 2 + 6 files changed, 183 insertions(+), 10 deletions(-) create mode 100644 backend/apps/datasource/utils/excel.py diff --git a/backend/apps/datasource/api/datasource.py b/backend/apps/datasource/api/datasource.py index 309d3a71e..225c9d643 100644 --- a/backend/apps/datasource/api/datasource.py +++ b/backend/apps/datasource/api/datasource.py @@ -8,29 +8,29 @@ from typing import List from urllib.parse import quote -import orjson import pandas as pd -from psycopg2 import sql from fastapi import APIRouter, File, UploadFile, HTTPException, Path from fastapi.responses import StreamingResponse +from psycopg2 import sql from sqlalchemy import and_ from apps.db.db import get_schema from apps.db.engine import get_engine_conn from apps.swagger.i18n import PLACEHOLDER_PREFIX from apps.system.schemas.permission import SqlbotPermission, require_permissions +from common.audit.models.log_model import OperationType, OperationModules +from common.audit.schemas.logger_decorator import LogConfig, system_log from common.core.config import settings from common.core.deps import SessionDep, CurrentUser, Trans from common.utils.utils import SQLBotLogUtil from ..crud.datasource import get_datasource_list, check_status, create_ds, update_ds, delete_ds, getTables, getFields, \ - execSql, update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \ + update_table_and_fields, getTablesByDs, chooseTables, preview, updateTable, updateField, get_ds, fieldEnum, \ check_status_by_id, sync_single_fields from ..crud.field import get_fields_by_table_id from ..crud.table import get_tables_by_ds_id from ..models.datasource import CoreDatasource, CreateDatasource, TableObj, CoreTable, CoreField, FieldObj, \ - TableSchemaResponse, ColumnSchemaResponse, PreviewResponse -from common.audit.models.log_model import OperationType, OperationModules -from common.audit.schemas.logger_decorator import LogConfig, system_log + TableSchemaResponse, ColumnSchemaResponse, PreviewResponse, ImportRequest +from ..utils.excel import parse_excel_preview, USER_TYPE_TO_PANDAS router = APIRouter(tags=["Datasource"], prefix="/datasource") path = settings.EXCEL_PATH @@ -81,7 +81,7 @@ async def add(session: SessionDep, trans: Trans, user: CurrentUser, ds: CreateDa return await asyncio.to_thread(inner) """ loop = asyncio.get_event_loop() - + def sync_wrapper(): loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) @@ -89,12 +89,13 @@ def sync_wrapper(): return loop.run_until_complete(create_ds(session, trans, user, ds)) finally: loop.close() - + return await loop.run_in_executor(None, sync_wrapper) @router.post("/chooseTables/{id}", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_choose_tables") -@require_permissions(permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="id"))) +@require_permissions( + permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="id"))) async def choose_tables(session: SessionDep, trans: Trans, tables: List[CoreTable], id: int = Path(..., description=f"{PLACEHOLDER_PREFIX}ds_id")): def inner(): @@ -104,7 +105,8 @@ def inner(): @router.post("/update", response_model=CoreDatasource, summary=f"{PLACEHOLDER_PREFIX}ds_update") -@require_permissions(permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="ds.id"))) +@require_permissions( + permission=SqlbotPermission(role=['ws_admin'], permission=SqlbotPermission(type='ds', keyExpression="ds.id"))) @system_log( LogConfig(operation_type=OperationType.UPDATE, module=OperationModules.DATASOURCE, resource_id_expr="ds.id")) async def update(session: SessionDep, trans: Trans, user: CurrentUser, ds: CoreDatasource): @@ -326,6 +328,7 @@ def inner(): # return await asyncio.to_thread(inner) +# deprecated @router.post("/uploadExcel", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_upload_excel") async def upload_excel(session: SessionDep, file: UploadFile = File(..., description=f"{PLACEHOLDER_PREFIX}ds_excel")): ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"} @@ -537,3 +540,86 @@ async def upload_ds_schema(session: SessionDep, id: int = Path(..., description= return True except Exception as e: raise HTTPException(status_code=500, detail=f"Parse Excel Failed: {str(e)}") + + +@router.post("/parseExcel", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_parse_excel") +async def parse_excel(file: UploadFile = File(..., description=f"{PLACEHOLDER_PREFIX}ds_excel")): + ALLOWED_EXTENSIONS = {"xlsx", "xls", "csv"} + if not file.filename.lower().endswith(tuple(ALLOWED_EXTENSIONS)): + raise HTTPException(400, "Only support .xlsx/.xls/.csv") + + os.makedirs(path, exist_ok=True) + filename = f"{file.filename.split('.')[0]}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}.{file.filename.split('.')[1]}" + save_path = os.path.join(path, filename) + with open(save_path, "wb") as f: + f.write(await file.read()) + + def inner(): + sheets_data = parse_excel_preview(save_path) + return { + "filePath": filename, + "data": sheets_data + } + + return await asyncio.to_thread(inner) + + +@router.post("/importToDb", response_model=None, summary=f"{PLACEHOLDER_PREFIX}ds_import_to_db") +async def import_to_db(session: SessionDep, import_req: ImportRequest): + save_path = os.path.join(path, import_req.filePath) + if not os.path.exists(save_path): + raise HTTPException(400, "File not found") + + def inner(): + engine = get_engine_conn() + results = [] + + for sheet_info in import_req.sheets: + sheet_name = sheet_info.sheetName + table_name = f"{sheet_name}_{hashlib.sha256(uuid.uuid4().bytes).hexdigest()[:10]}" + fields = sheet_info.fields + + field_mapping = {f.fieldName: f.fieldType for f in fields} + dtype_dict = { + col: USER_TYPE_TO_PANDAS.get(field_mapping.get(col, 'string'), 'string') + for col in field_mapping.keys() + } + + if save_path.endswith(".csv"): + df = pd.read_csv(save_path, engine='c', dtype=dtype_dict) + sheet_name = "Sheet1" + else: + df = pd.read_excel(save_path, sheet_name=sheet_name, engine='calamine', dtype=dtype_dict) + + conn = engine.raw_connection() + cursor = conn.cursor() + try: + df.to_sql( + table_name, + engine, + if_exists='replace', + index=False + ) + output = StringIO() + df.to_csv(output, sep='\t', header=False, index=False) + + query = sql.SQL("COPY {} FROM STDIN WITH CSV DELIMITER E'\t'").format( + sql.Identifier(table_name) + ) + cursor.copy_expert(sql=query.as_string(cursor.connection), file=output) + conn.commit() + results.append({ + "sheetName": sheet_name, + "tableName": table_name, + "tableComment": "", + "rows": len(df) + }) + except Exception as e: + raise HTTPException(400, f"Insert data failed for {table_name}: {str(e)}") + finally: + cursor.close() + conn.close() + + return {"filename": import_req.filePath, "sheets": results} + + return await asyncio.to_thread(inner) diff --git a/backend/apps/datasource/models/datasource.py b/backend/apps/datasource/models/datasource.py index fe7ea999d..793867a4c 100644 --- a/backend/apps/datasource/models/datasource.py +++ b/backend/apps/datasource/models/datasource.py @@ -192,3 +192,18 @@ class PreviewResponse(BaseModel): fields: List | None = [] data: List | None = [] sql: str | None = '' + + +class FieldInfo(BaseModel): + fieldName: str + fieldType: str + + +class SheetFields(BaseModel): + sheetName: str + fields: List[FieldInfo] + + +class ImportRequest(BaseModel): + filePath: str + sheets: List[SheetFields] diff --git a/backend/apps/datasource/utils/__init__.py b/backend/apps/datasource/utils/__init__.py index e69de29bb..5c7cf4850 100644 --- a/backend/apps/datasource/utils/__init__.py +++ b/backend/apps/datasource/utils/__init__.py @@ -0,0 +1,6 @@ +from .excel import ( + FIELD_TYPE_MAP, + USER_TYPE_TO_PANDAS, + infer_field_type, + parse_excel_preview, +) diff --git a/backend/apps/datasource/utils/excel.py b/backend/apps/datasource/utils/excel.py new file mode 100644 index 000000000..acfbaed8d --- /dev/null +++ b/backend/apps/datasource/utils/excel.py @@ -0,0 +1,62 @@ +import pandas as pd + +FIELD_TYPE_MAP = { + 'int64': 'int', + 'int32': 'int', + 'float64': 'float', + 'float32': 'float', + 'datetime64': 'datetime', + 'datetime64[ns]': 'datetime', + 'object': 'string', + 'string': 'string', + 'bool': 'string', +} + +USER_TYPE_TO_PANDAS = { + 'int': 'int64', + 'float': 'float64', + 'datetime': 'datetime64[ns]', + 'string': 'string', +} + + +def infer_field_type(dtype) -> str: + dtype_str = str(dtype) + return FIELD_TYPE_MAP.get(dtype_str, 'string') + + +def parse_excel_preview(save_path: str, max_rows: int = 10): + sheets_data = [] + if save_path.endswith(".csv"): + df = pd.read_csv(save_path, engine='c') + fields = [] + for col in df.columns: + fields.append({ + "fieldName": col, + "fieldType": infer_field_type(df[col].dtype) + }) + preview_data = df.head(max_rows).to_dict(orient='records') + sheets_data.append({ + "sheetName": "Sheet1", + "fields": fields, + "data": preview_data, + "rows": len(df) + }) + else: + sheet_names = pd.ExcelFile(save_path).sheet_names + for sheet_name in sheet_names: + df = pd.read_excel(save_path, sheet_name=sheet_name, engine='calamine') + fields = [] + for col in df.columns: + fields.append({ + "fieldName": col, + "fieldType": infer_field_type(df[col].dtype) + }) + preview_data = df.head(max_rows).to_dict(orient='records') + sheets_data.append({ + "sheetName": sheet_name, + "fields": fields, + "data": preview_data, + "rows": len(df) + }) + return sheets_data diff --git a/backend/apps/swagger/locales/en.json b/backend/apps/swagger/locales/en.json index 9564bbac3..0120903eb 100644 --- a/backend/apps/swagger/locales/en.json +++ b/backend/apps/swagger/locales/en.json @@ -22,6 +22,8 @@ "ds_preview_data": "Preview Data", "ds_upload_excel": "Upload Excel", "ds_excel": "File", + "ds_parse_excel": "Parse Excel and Preview Data", + "ds_import_to_db": "Import Data to Database", "ds_export_ds_schema": "Export Comment", "ds_upload_ds_schema": "Upload Comment", diff --git a/backend/apps/swagger/locales/zh.json b/backend/apps/swagger/locales/zh.json index 8b9b818a7..da4d60551 100644 --- a/backend/apps/swagger/locales/zh.json +++ b/backend/apps/swagger/locales/zh.json @@ -22,6 +22,8 @@ "ds_preview_data": "预览数据", "ds_upload_excel": "上传Excel", "ds_excel": "文件", + "ds_parse_excel": "解析Excel并预览数据", + "ds_import_to_db": "导入数据到数据库", "ds_export_ds_schema": "导出备注信息", "ds_upload_ds_schema": "导入备注信息",