mirror of
https://gitee.com/myxzgzs/RSJselet
synced 2025-08-08 00:02:41 +08:00
738 lines
27 KiB
Python
738 lines
27 KiB
Python
from fastapi import FastAPI, HTTPException, Depends, Query, BackgroundTasks
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.responses import RedirectResponse
|
||
from pydantic import BaseModel
|
||
from typing import Optional, List, Dict, Any
|
||
import uvicorn
|
||
import aiomysql
|
||
import asyncio
|
||
import time
|
||
import os
|
||
import json
|
||
import aioredis
|
||
from contextlib import asynccontextmanager
|
||
import random
|
||
from config import * # 导入配置
|
||
import monitoring # 导入监控模块
|
||
from prometheus_client import make_asgi_app
|
||
from fastapi.staticfiles import StaticFiles
|
||
from fastapi.responses import HTMLResponse
|
||
from fastapi.responses import JSONResponse
|
||
|
||
# 数据库配置
|
||
# DB_CONFIG = {
|
||
# "host": "127.0.0.1",
|
||
# "port": 3306,
|
||
# "user": "root", # 请根据实际情况修改
|
||
# "password": "123456", # 请根据实际情况修改
|
||
# "db": "harsjselect",
|
||
# "charset": "utf8mb4"
|
||
# }
|
||
|
||
# Redis配置
|
||
# REDIS_CONFIG = {
|
||
# "host": "127.0.0.1",
|
||
# "port": 6379,
|
||
# "db": 0,
|
||
# "password": 123456, # 如果有密码,请设置
|
||
# "encoding": "utf-8"
|
||
# }
|
||
|
||
# 缓存过期时间(秒)
|
||
# CACHE_EXPIRE = 3600 # 1小时
|
||
|
||
# 连接池
|
||
pool = None
|
||
redis = None
|
||
|
||
# 测试数据
|
||
TEST_DATA = [
|
||
{
|
||
"zkzh": 2023001,
|
||
"xm": "张三",
|
||
"sj": "13800000001",
|
||
"dw": "北京市第一中学",
|
||
"subjects": [
|
||
{"km": "语文", "cj": "98", "pm": "1", "bz": "优秀"},
|
||
{"km": "数学", "cj": "92", "pm": "3", "bz": "良好"},
|
||
{"km": "英语", "cj": "95", "pm": "2", "bz": "优秀"},
|
||
{"km": "物理", "cj": "88", "pm": "5", "bz": "良好"},
|
||
{"km": "化学", "cj": "94", "pm": "2", "bz": "优秀"}
|
||
]
|
||
},
|
||
{
|
||
"zkzh": 2023002,
|
||
"xm": "李四",
|
||
"sj": "13800000002",
|
||
"dw": "北京市第二中学",
|
||
"subjects": [
|
||
{"km": "语文", "cj": "85", "pm": "12", "bz": "良好"},
|
||
{"km": "数学", "cj": "98", "pm": "1", "bz": "优秀"},
|
||
{"km": "英语", "cj": "88", "pm": "8", "bz": "良好"},
|
||
{"km": "物理", "cj": "92", "pm": "3", "bz": "优秀"},
|
||
{"km": "化学", "cj": "86", "pm": "10", "bz": "良好"}
|
||
]
|
||
}
|
||
]
|
||
|
||
# 等待室配置
|
||
# WAITING_ROOM_CAPACITY = 15000 # 等待室容量增加到15000
|
||
# CONCURRENT_QUERIES = 1000 # 并发查询数量增加到1000
|
||
waiting_room = asyncio.Queue()
|
||
current_processing = 0
|
||
processing_lock = asyncio.Lock()
|
||
|
||
# 初始化测试数据
|
||
async def init_test_data():
|
||
print("开始初始化测试数据...")
|
||
if not pool:
|
||
print("数据库连接池未初始化,无法添加测试数据")
|
||
return
|
||
|
||
try:
|
||
async with pool.acquire() as conn:
|
||
async with conn.cursor() as cursor:
|
||
# 检查表是否存在
|
||
try:
|
||
await cursor.execute("SELECT 1 FROM rsjcjselect LIMIT 1")
|
||
table_exists = True
|
||
except Exception:
|
||
table_exists = False
|
||
|
||
# 如果表不存在,创建表
|
||
if not table_exists:
|
||
print("表不存在,创建表...")
|
||
await cursor.execute("""
|
||
CREATE TABLE IF NOT EXISTS `rsjcjselect` (
|
||
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '序号',
|
||
`pm` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '排名',
|
||
`cj` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '成绩',
|
||
`zkzh` int(11) NOT NULL COMMENT '准考证号',
|
||
`sj` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '手机号',
|
||
`dw` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '单位',
|
||
`xm` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '姓名',
|
||
`km` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '科目',
|
||
`bz` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT '备注',
|
||
PRIMARY KEY (`id`)
|
||
) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic
|
||
""")
|
||
|
||
# 检查表中是否有数据
|
||
await cursor.execute("SELECT COUNT(*) as count FROM rsjcjselect")
|
||
result = await cursor.fetchone()
|
||
record_count = result[0] if result else 0
|
||
|
||
# 如果表中已有数据,不再添加测试数据
|
||
if record_count > 0:
|
||
print(f"表中已有 {record_count} 条数据,跳过初始化")
|
||
return
|
||
|
||
# 清空原有数据
|
||
await cursor.execute("DELETE FROM rsjcjselect")
|
||
|
||
# 准备插入语句
|
||
sql = """
|
||
INSERT INTO rsjcjselect (pm, cj, zkzh, sj, dw, xm, km, bz)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
|
||
"""
|
||
|
||
# 插入测试数据
|
||
count = 0
|
||
for student in TEST_DATA:
|
||
for subject in student["subjects"]:
|
||
await cursor.execute(sql, (
|
||
subject["pm"],
|
||
subject["cj"],
|
||
student["zkzh"],
|
||
student["sj"],
|
||
student["dw"],
|
||
student["xm"],
|
||
subject["km"],
|
||
subject["bz"]
|
||
))
|
||
count += 1
|
||
|
||
# 提交事务
|
||
await conn.commit()
|
||
|
||
print(f"成功初始化了 {count} 条测试数据")
|
||
except Exception as e:
|
||
print(f"初始化测试数据时发生错误: {str(e)}")
|
||
|
||
# 初始化数据库连接池
|
||
async def init_db():
|
||
global pool
|
||
try:
|
||
print(f"正在连接到MySQL数据库: {DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['db']}")
|
||
pool = await aiomysql.create_pool(
|
||
host=DB_CONFIG["host"],
|
||
port=DB_CONFIG["port"],
|
||
user=DB_CONFIG["user"],
|
||
password=DB_CONFIG["password"],
|
||
db=DB_CONFIG["db"],
|
||
charset=DB_CONFIG["charset"],
|
||
maxsize=DB_CONFIG["maxsize"],
|
||
minsize=DB_CONFIG["minsize"],
|
||
pool_recycle=DB_CONFIG["pool_recycle"],
|
||
autocommit=True
|
||
)
|
||
print("MySQL数据库连接池初始化成功")
|
||
|
||
# 连接池预热
|
||
print("正在预热数据库连接池...")
|
||
prewarming_conns = []
|
||
for _ in range(min(10, DB_CONFIG["maxsize"])):
|
||
try:
|
||
conn = await asyncio.wait_for(pool.acquire(), timeout=5.0)
|
||
prewarming_conns.append(conn)
|
||
except Exception as e:
|
||
print(f"预热连接失败: {str(e)}")
|
||
|
||
# 释放预热连接
|
||
for conn in prewarming_conns:
|
||
pool.release(conn)
|
||
|
||
print(f"数据库连接池预热完成,预热了 {len(prewarming_conns)} 个连接")
|
||
return True
|
||
except Exception as e:
|
||
print(f"数据库连接失败: {str(e)}")
|
||
print("请检查以下可能的问题:")
|
||
print("1. MySQL服务是否已启动")
|
||
print("2. 数据库用户名和密码是否正确")
|
||
print("3. 数据库名是否存在")
|
||
print("4. 数据库主机是否可访问")
|
||
print(f"当前配置: 主机={DB_CONFIG['host']}, 端口={DB_CONFIG['port']}, 用户={DB_CONFIG['user']}, 数据库={DB_CONFIG['db']}")
|
||
return False
|
||
|
||
# 初始化Redis连接池
|
||
async def init_redis():
|
||
global redis
|
||
try:
|
||
print(f"正在连接到Redis: {REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}/{REDIS_CONFIG['db']}")
|
||
redis = aioredis.from_url(
|
||
f"redis://{REDIS_CONFIG['host']}:{REDIS_CONFIG['port']}/{REDIS_CONFIG['db']}",
|
||
password=REDIS_CONFIG.get('password'),
|
||
encoding=REDIS_CONFIG.get('encoding', 'utf-8'),
|
||
decode_responses=True,
|
||
socket_timeout=5.0, # 设置超时
|
||
socket_connect_timeout=5.0,
|
||
max_connections=100 # 最大连接数
|
||
)
|
||
# 测试Redis连接
|
||
await redis.ping()
|
||
print("Redis连接池初始化成功")
|
||
return True
|
||
except Exception as e:
|
||
print(f"Redis连接失败: {str(e)}")
|
||
print("请检查以下可能的问题:")
|
||
print("1. Redis服务是否已启动")
|
||
print("2. Redis密码是否正确")
|
||
print("3. Redis主机是否可访问")
|
||
print(f"当前配置: 主机={REDIS_CONFIG['host']}, 端口={REDIS_CONFIG['port']}, 数据库={REDIS_CONFIG['db']}")
|
||
redis = None
|
||
return False
|
||
|
||
# 应用启动和关闭事件
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
# 启动时执行
|
||
global pool, redis
|
||
|
||
# 初始化数据库
|
||
db_success = await init_db()
|
||
if not db_success:
|
||
print("警告: 数据库连接失败,应用程序将以有限功能运行")
|
||
|
||
# 初始化Redis
|
||
redis_success = await init_redis()
|
||
if not redis_success:
|
||
print("警告: Redis连接失败,将禁用缓存功能")
|
||
|
||
# 启动等待室处理协程
|
||
asyncio.create_task(process_waiting_room())
|
||
|
||
print("成绩查询系统启动完成")
|
||
print(f"访问地址: http://{SERVER_HOST}:{SERVER_PORT}")
|
||
|
||
yield
|
||
|
||
# 关闭时执行
|
||
if pool:
|
||
pool.close()
|
||
await pool.wait_closed()
|
||
print("数据库连接池已关闭")
|
||
|
||
if redis:
|
||
await redis.close()
|
||
print("Redis连接池已关闭")
|
||
|
||
# 创建FastAPI应用
|
||
app = FastAPI(lifespan=lifespan)
|
||
|
||
# 添加Prometheus指标端点
|
||
metrics_app = make_asgi_app()
|
||
app.mount("/metrics", metrics_app)
|
||
|
||
# 允许跨域请求
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
# 定义查询请求模型
|
||
class ScoreQuery(BaseModel):
|
||
zkzh: str # 准考证号
|
||
sfzh: Optional[str] = None # 身份证号作为验证
|
||
|
||
# 定义查询响应模型
|
||
class ScoreResponse(BaseModel):
|
||
success: bool
|
||
message: str
|
||
data: Optional[dict] = None
|
||
queue_position: Optional[int] = None
|
||
estimated_wait_time: Optional[int] = None
|
||
|
||
# API路由应该定义在挂载静态文件之前
|
||
|
||
# 定义API路由处理函数
|
||
@app.post("/api/query_score")
|
||
async def query_score(query: ScoreQuery):
|
||
try:
|
||
# 验证输入
|
||
if not query.zkzh:
|
||
return JSONResponse(
|
||
status_code=400,
|
||
content={"success": False, "message": "准考证号不能为空", "data": None}
|
||
)
|
||
|
||
# 打印请求参数
|
||
print("接收到查询请求:", query)
|
||
|
||
# 直接查询,跳过等待室机制
|
||
try:
|
||
result = await query_score_from_db(query)
|
||
print("查询结果:", result)
|
||
return result
|
||
except Exception as e:
|
||
print("查询失败:", str(e))
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={
|
||
"success": False,
|
||
"message": f"查询失败: {str(e)}",
|
||
"data": None
|
||
}
|
||
)
|
||
except Exception as e:
|
||
print("查询处理异常:", str(e))
|
||
return JSONResponse(
|
||
status_code=500,
|
||
content={
|
||
"success": False,
|
||
"message": f"查询失败: {str(e)}",
|
||
"data": None
|
||
}
|
||
)
|
||
|
||
@app.get("/api/query_status/{task_id}")
|
||
async def query_status(task_id: str):
|
||
# 此端点将在前端轮询获取结果时使用
|
||
# 在实际实现中,需要一个存储任务状态的机制
|
||
# 简化版实现
|
||
return {"status": "completed", "result": {}}
|
||
|
||
# 处理等待室队列
|
||
async def process_waiting_room():
|
||
global current_processing
|
||
|
||
print("启动等待室处理协程")
|
||
|
||
while True:
|
||
try:
|
||
# 如果当前处理数量小于并发上限,从等待室获取任务处理
|
||
async with processing_lock:
|
||
can_process = current_processing < CONCURRENT_QUERIES
|
||
if can_process:
|
||
current_processing += 1
|
||
monitoring.update_concurrent_queries(current_processing)
|
||
|
||
if can_process:
|
||
try:
|
||
# 尝试获取任务,非阻塞方式
|
||
task_info = await asyncio.wait_for(waiting_room.get(), timeout=0.1)
|
||
monitoring.update_waiting_room_size(waiting_room.qsize())
|
||
|
||
# 创建任务处理协程
|
||
asyncio.create_task(process_query_task(task_info))
|
||
except asyncio.TimeoutError:
|
||
# 等待室为空,减少计数
|
||
async with processing_lock:
|
||
current_processing -= 1
|
||
monitoring.update_concurrent_queries(current_processing)
|
||
|
||
# 更新连接池指标
|
||
monitoring.update_pool_metrics(pool, redis)
|
||
|
||
# 短暂等待,避免CPU过度使用
|
||
await asyncio.sleep(0.1)
|
||
except Exception as e:
|
||
print(f"等待室处理出错: {str(e)}")
|
||
monitoring.record_query_error("waiting_room_error")
|
||
await asyncio.sleep(1) # 出错后稍长延迟
|
||
|
||
# 处理单个查询任务
|
||
async def process_query_task(task_info):
|
||
global current_processing
|
||
|
||
query_data, future = task_info
|
||
|
||
try:
|
||
# 如果future已经被取消,直接释放资源不执行查询
|
||
if future.cancelled():
|
||
print("任务已被取消,跳过处理")
|
||
return
|
||
|
||
# 执行实际查询
|
||
result = await query_score_from_db(query_data)
|
||
|
||
# 输出调试信息
|
||
print("查询任务结果:", result)
|
||
if hasattr(result, "data") and result.data is not None:
|
||
print("查询结果data字段:", result.data)
|
||
if "subjects" in result.data:
|
||
print("subjects字段存在,有", len(result.data["subjects"]), "个科目")
|
||
else:
|
||
print("警告: subjects字段不存在!")
|
||
else:
|
||
print("警告: data字段为空!")
|
||
|
||
# 设置future的结果
|
||
if not future.done():
|
||
future.set_result(result)
|
||
except Exception as e:
|
||
print("处理查询任务异常:", str(e))
|
||
if not future.done():
|
||
future.set_exception(e)
|
||
finally:
|
||
# 减少当前处理计数
|
||
async with processing_lock:
|
||
current_processing -= 1
|
||
|
||
# 显式清理结果对象,帮助垃圾回收
|
||
result = None
|
||
|
||
# 从数据库查询成绩
|
||
async def query_score_from_db(query: ScoreQuery):
|
||
if not pool:
|
||
raise HTTPException(status_code=503, detail="数据库服务不可用,请稍后再试")
|
||
|
||
# 记录查询尝试
|
||
monitoring.record_query_attempt()
|
||
|
||
# 设置查询超时
|
||
QUERY_TIMEOUT = 5.0 # 5秒超时
|
||
conn = None
|
||
cursor = None
|
||
|
||
try:
|
||
# 构建查询条件
|
||
zkzh_val = query.zkzh.strip()
|
||
|
||
# 构建缓存键
|
||
cache_key = f"score:{zkzh_val}"
|
||
if query.sfzh: # 使用身份证号字段进行缓存
|
||
cache_key += f":sfzh:{query.sfzh}"
|
||
|
||
# 尝试从缓存获取数据
|
||
if redis:
|
||
try:
|
||
cached_data = await redis.get(cache_key)
|
||
if cached_data:
|
||
print(f"从Redis缓存获取数据: {cache_key}")
|
||
# 记录缓存命中
|
||
monitoring.record_cache_hit()
|
||
student_info = json.loads(cached_data)
|
||
return ScoreResponse(
|
||
success=True,
|
||
message="成绩查询成功(来自缓存)",
|
||
data=student_info
|
||
)
|
||
else:
|
||
# 记录缓存未命中
|
||
monitoring.record_cache_miss()
|
||
except Exception as e:
|
||
print(f"从Redis获取缓存失败: {str(e)}")
|
||
# 记录缓存错误
|
||
monitoring.record_query_error("redis_error")
|
||
|
||
# 使用计时器测量数据库查询性能
|
||
with monitoring.query_timer():
|
||
# 如果没有缓存,从数据库查询,设置超时
|
||
try:
|
||
# 获取数据库连接
|
||
with monitoring.connection_timer():
|
||
conn = await asyncio.wait_for(pool.acquire(), timeout=QUERY_TIMEOUT)
|
||
cursor = await conn.cursor(aiomysql.DictCursor)
|
||
|
||
# 构建查询条件
|
||
conditions = ["zkzh = %s"]
|
||
params = [zkzh_val]
|
||
|
||
# 打印查询参数
|
||
print("查询条件:", conditions, "参数:", params)
|
||
|
||
# 使用sfzh字段进行验证 (使用身份证号验证)
|
||
if query.sfzh:
|
||
conditions.append("sfzh = %s")
|
||
params.append(query.sfzh)
|
||
else:
|
||
raise HTTPException(status_code=400, detail="必须提供身份证号作为验证条件")
|
||
|
||
# 构建SQL查询 - 获取考生信息
|
||
sql = f"""
|
||
SELECT id, dwdm, dwmc, zwdm, zwmc, zprs, kdmc, kch, zwh, zkzh, xm, sjh, sfzh, bscj, pm, bz
|
||
FROM rsjcjselect
|
||
WHERE {' AND '.join(conditions)}
|
||
LIMIT 1
|
||
"""
|
||
|
||
# 执行查询
|
||
await asyncio.wait_for(cursor.execute(sql, params), timeout=QUERY_TIMEOUT)
|
||
result = await asyncio.wait_for(cursor.fetchone(), timeout=QUERY_TIMEOUT)
|
||
|
||
if not result:
|
||
monitoring.record_query_error("no_results")
|
||
return ScoreResponse(
|
||
success=False,
|
||
message="未找到匹配的成绩记录",
|
||
data=None
|
||
)
|
||
|
||
# 处理查询结果
|
||
student_info = {
|
||
"zkzh": result["zkzh"],
|
||
"xm": result["xm"],
|
||
"sjh": result["sjh"],
|
||
"zprs": result["zprs"],
|
||
"sfzh": result["sfzh"],
|
||
"dwmc": result["dwmc"],
|
||
"dwdm": result["dwdm"],
|
||
"zwmc": result["zwmc"],
|
||
"zwdm": result["zwdm"],
|
||
"kdmc": result["kdmc"],
|
||
"kch": result["kch"],
|
||
"zwh": result["zwh"],
|
||
"bscj": result["bscj"],
|
||
"pm": result["pm"],
|
||
"bz": result["bz"]
|
||
}
|
||
|
||
# 打印调试信息
|
||
print("查询结果:", student_info)
|
||
|
||
# 将结果存入Redis缓存
|
||
if redis:
|
||
try:
|
||
await redis.set(cache_key, json.dumps(student_info), ex=CACHE_EXPIRE)
|
||
print(f"成绩数据已缓存: {cache_key}, 过期时间: {CACHE_EXPIRE}秒")
|
||
except Exception as e:
|
||
print(f"缓存数据到Redis失败: {str(e)}")
|
||
monitoring.record_query_error("redis_cache_error")
|
||
|
||
# 查询成功
|
||
return ScoreResponse(
|
||
success=True,
|
||
message="成绩查询成功",
|
||
data=student_info
|
||
)
|
||
except asyncio.TimeoutError:
|
||
monitoring.record_query_error("db_timeout")
|
||
raise HTTPException(status_code=504, detail="数据库查询超时,请稍后再试")
|
||
except Exception as e:
|
||
print("数据库查询错误:", str(e))
|
||
monitoring.record_query_error("db_error")
|
||
raise HTTPException(status_code=500, detail=f"数据库查询错误: {str(e)}")
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
print("查询过程中发生错误:", str(e))
|
||
monitoring.record_query_error("general_error")
|
||
raise HTTPException(status_code=500, detail=f"查询过程中发生错误: {str(e)}")
|
||
finally:
|
||
# 确保资源释放
|
||
try:
|
||
if cursor:
|
||
await cursor.close()
|
||
if conn:
|
||
pool.release(conn)
|
||
except Exception as e:
|
||
print(f"释放资源错误: {str(e)}")
|
||
monitoring.record_query_error("resource_cleanup_error")
|
||
|
||
# 提交查询到等待室
|
||
async def submit_to_waiting_room(query: ScoreQuery):
|
||
# 如果等待室已满,拒绝请求
|
||
if waiting_room.qsize() >= WAITING_ROOM_CAPACITY:
|
||
monitoring.record_query_error("waiting_room_full")
|
||
return ScoreResponse(
|
||
success=False,
|
||
message=f"系统繁忙,等待人数已达上限({WAITING_ROOM_CAPACITY}),请稍后再试",
|
||
data=None
|
||
), None
|
||
|
||
# 放入等待室并获取位置
|
||
queue_position = waiting_room.qsize() + 1
|
||
future = asyncio.Future()
|
||
await waiting_room.put((query, future))
|
||
|
||
# 更新监控指标
|
||
monitoring.update_waiting_room_size(waiting_room.qsize())
|
||
|
||
# 计算预计等待时间(每个请求平均处理时间假设为1秒)
|
||
estimated_wait_time = queue_position * 1 # 秒
|
||
|
||
return ScoreResponse(
|
||
success=True,
|
||
message=f"您的请求已提交,当前排队位置: {queue_position},预计等待时间: {estimated_wait_time}秒",
|
||
data=None,
|
||
queue_position=queue_position,
|
||
estimated_wait_time=estimated_wait_time
|
||
), future
|
||
|
||
# 启动事件
|
||
@app.on_event("startup")
|
||
async def startup():
|
||
# 初始化数据库连接池
|
||
db_success = await init_db()
|
||
if not db_success:
|
||
print("警告: 数据库连接失败,应用程序将以有限功能运行")
|
||
print("您可以尝试以下操作:")
|
||
print("1. 检查MySQL服务是否已启动")
|
||
print("2. 确认config.py中的数据库配置是否正确")
|
||
print("3. 运行run_with_test_data.bat或run_with_test_data.sh脚本导入测试数据")
|
||
|
||
# 初始化Redis连接池
|
||
redis_success = await init_redis()
|
||
if not redis_success:
|
||
print("警告: Redis连接失败,将禁用缓存功能")
|
||
print("您可以尝试以下操作:")
|
||
print("1. 检查Redis服务是否已启动")
|
||
print("2. 确认config.py中的Redis配置是否正确")
|
||
print("3. 如果不需要Redis,可以忽略此警告")
|
||
|
||
print("成绩查询系统启动完成")
|
||
print(f"访问地址: http://{SERVER_HOST}:{SERVER_PORT}")
|
||
print("按Ctrl+C停止服务")
|
||
|
||
# 关闭事件
|
||
@app.on_event("shutdown")
|
||
async def shutdown():
|
||
# 关闭数据库连接池
|
||
if pool:
|
||
pool.close()
|
||
await pool.wait_closed()
|
||
print("数据库连接池已关闭")
|
||
|
||
# 关闭Redis连接池
|
||
if redis:
|
||
await redis.close()
|
||
print("Redis连接池已关闭")
|
||
|
||
# 根路由 - 返回主页
|
||
@app.get("/", response_class=HTMLResponse)
|
||
async def read_root():
|
||
with open("html/index.html", "r", encoding="utf-8") as f:
|
||
html_content = f.read()
|
||
return HTMLResponse(content=html_content)
|
||
|
||
@app.get("/api/test_data")
|
||
async def test_data():
|
||
"""测试端点,用于检查数据库中的数据"""
|
||
if not pool:
|
||
return {"error": "数据库连接未初始化"}
|
||
|
||
async with pool.acquire() as conn:
|
||
async with conn.cursor(aiomysql.DictCursor) as cursor:
|
||
# 查询所有数据
|
||
await cursor.execute("SELECT * FROM rsjcjselect")
|
||
results = await cursor.fetchall()
|
||
|
||
# 统计每个准考证号的科目数量
|
||
stats = {}
|
||
for row in results:
|
||
zkzh = row["zkzh"]
|
||
if zkzh not in stats:
|
||
stats[zkzh] = {
|
||
"xm": row["xm"],
|
||
"subjects": []
|
||
}
|
||
stats[zkzh]["subjects"].append(row["km"])
|
||
|
||
return {
|
||
"total_records": len(results),
|
||
"students": stats
|
||
}
|
||
|
||
@app.post("/api/init_test_data")
|
||
async def api_init_test_data():
|
||
"""手动初始化测试数据的API端点"""
|
||
try:
|
||
await init_test_data()
|
||
return {"success": True, "message": "测试数据初始化成功"}
|
||
except Exception as e:
|
||
return {"success": False, "message": f"测试数据初始化失败: {str(e)}"}
|
||
|
||
# 添加缓存清除端点
|
||
@app.post("/api/clear_cache")
|
||
async def clear_cache(zkzh: Optional[str] = None):
|
||
"""清除缓存的API端点
|
||
|
||
如果提供了准考证号,则只清除该学生的缓存
|
||
否则清除所有成绩缓存
|
||
"""
|
||
if not redis:
|
||
return {"success": False, "message": "Redis未连接,无法清除缓存"}
|
||
|
||
try:
|
||
if zkzh:
|
||
# 清除特定学生的缓存
|
||
pattern = f"score:{zkzh}:*"
|
||
keys = await redis.keys(pattern)
|
||
if keys:
|
||
count = 0
|
||
for key in keys:
|
||
await redis.delete(key)
|
||
count += 1
|
||
return {"success": True, "message": f"已清除准考证号 {zkzh} 的 {count} 条缓存记录"}
|
||
else:
|
||
return {"success": False, "message": f"未找到准考证号 {zkzh} 的缓存记录"}
|
||
else:
|
||
# 清除所有成绩缓存
|
||
pattern = "score:*"
|
||
keys = await redis.keys(pattern)
|
||
if keys:
|
||
count = 0
|
||
for key in keys:
|
||
await redis.delete(key)
|
||
count += 1
|
||
return {"success": True, "message": f"已清除所有 {count} 条成绩缓存记录"}
|
||
else:
|
||
return {"success": False, "message": "缓存中没有成绩记录"}
|
||
except Exception as e:
|
||
return {"success": False, "message": f"清除缓存失败: {str(e)}"}
|
||
|
||
# 挂载静态文件 - 必须在定义所有路由后挂载
|
||
app.mount("/static", StaticFiles(directory="static"), name="static")
|
||
|
||
if __name__ == "__main__":
|
||
uvicorn.run("main:app",
|
||
host=SERVER_HOST,
|
||
port=SERVER_PORT,
|
||
reload=False, # 生产环境关闭重载
|
||
workers=WORKERS, # 根据配置使用多个工作进程
|
||
timeout_keep_alive=TIMEOUT) # 长连接超时时间
|