RSJselet/cstest.py

589 lines
25 KiB
Python
Raw Permalink Normal View History

2025-07-11 10:43:14 +08:00
# 文件名: cstest.py
2025-07-10 13:42:54 +08:00
import asyncio
import aiohttp
2025-07-11 10:43:14 +08:00
import aiomysql
2025-07-10 13:42:54 +08:00
import time
import json
import random
import argparse
from collections import defaultdict
from tqdm import tqdm
2025-07-11 10:43:14 +08:00
import sys
import os
2025-07-10 13:42:54 +08:00
2025-07-11 10:43:14 +08:00
# 导入配置文件中的数据库设置
try:
from config import DB_CONFIG, REDIS_CONFIG, CACHE_EXPIRE, WAITING_ROOM_CAPACITY
print("成功导入数据库配置")
except ImportError:
print("警告: 无法导入配置文件,将使用默认配置")
# 默认数据库配置
DB_CONFIG = {
"host": "127.0.0.1",
"port": 3306,
"user": "root",
"password": "123456",
"db": "harsjselect",
"charset": "utf8mb4",
"minsize": 1,
"maxsize": 100,
"pool_recycle": 3600
}
# 等待室容量默认值
WAITING_ROOM_CAPACITY = 15000
2025-07-10 13:42:54 +08:00
2025-07-11 10:43:14 +08:00
# 直接从数据库获取真实测试数据
async def fetch_db_test_data(limit=100):
"""直接从数据库中获取有效的测试数据,不做去重处理"""
print(f"正在从数据库获取真实测试数据 (最多 {limit} 条)...")
db_pool = None
try:
# 创建数据库连接池
db_pool = await aiomysql.create_pool(
host=DB_CONFIG["host"],
port=DB_CONFIG["port"],
user=DB_CONFIG["user"],
password=DB_CONFIG["password"],
db=DB_CONFIG["db"],
charset=DB_CONFIG["charset"],
minsize=1,
maxsize=5
)
# 获取连接并查询数据
async with db_pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cursor:
# 查询总记录数
await cursor.execute("SELECT COUNT(*) as total_count FROM rsjcjselect")
result = await cursor.fetchone()
total_records = result['total_count']
print(f"数据库中共有 {total_records} 条记录")
# 查询表结构以确认字段名
await cursor.execute("SHOW COLUMNS FROM rsjcjselect")
columns = await cursor.fetchall()
column_names = [col['Field'] for col in columns]
# 检查必要的字段是否存在
zkzh_field = 'zkzh' if 'zkzh' in column_names else None
sfzh_field = 'sfzh' if 'sfzh' in column_names else None
if not zkzh_field or not sfzh_field:
print("警告: 数据库中缺少必要的字段 (zkzh 或 sfzh)")
return []
# 方式1: 随机抽取记录 (最推荐)
query = f"""
SELECT {zkzh_field}, {sfzh_field}
FROM rsjcjselect
WHERE {zkzh_field} IS NOT NULL AND {sfzh_field} IS NOT NULL
ORDER BY RAND()
LIMIT %s
"""
# 执行查询
await cursor.execute(query, (limit,))
rows = await cursor.fetchall()
if not rows:
print("警告: 数据库中没有找到符合条件的测试数据")
return []
# 转换为测试数据格式
test_data = []
for row in rows:
test_data.append({
"zkzh": str(row[zkzh_field]),
"sfzh": str(row[sfzh_field])
})
# 记录唯一考生数量
unique_pairs = set((item["zkzh"], item["sfzh"]) for item in test_data)
print(f"成功从数据库获取 {len(test_data)} 条真实测试数据")
print(f"其中包含 {len(unique_pairs)} 个不同的考生")
return test_data
except Exception as e:
print(f"从数据库获取测试数据失败: {str(e)}")
return []
finally:
# 关闭连接池
if db_pool:
db_pool.close()
await db_pool.wait_closed()
# 备用测试数据 - 从SQL文件中提取的实际数据
FALLBACK_TEST_DATA = [
{"zkzh": "101081100101", "sfzh": "320106199001015432"},
{"zkzh": "101081100102", "sfzh": "320106199203023625"},
{"zkzh": "101081100103", "sfzh": "320106199104125486"},
{"zkzh": "101081100104", "sfzh": "320106199209089635"},
{"zkzh": "101081100105", "sfzh": "320106199305075218"},
{"zkzh": "101081100201", "sfzh": "320106199407125896"},
{"zkzh": "101081100202", "sfzh": "320106199501234785"},
{"zkzh": "101081100203", "sfzh": "320106199602153695"},
{"zkzh": "101081100204", "sfzh": "320106199703126547"},
{"zkzh": "101081100301", "sfzh": "320106199804234152"},
{"zkzh": "101081100302", "sfzh": "320106199905124863"},
{"zkzh": "101081100303", "sfzh": "320106199012154789"},
{"zkzh": "101081100401", "sfzh": "320111199106152365"},
{"zkzh": "101081100402", "sfzh": "320111199207136548"},
{"zkzh": "101081100403", "sfzh": "320111199308242536"},
{"zkzh": "101081100404", "sfzh": "320111199409123568"},
{"zkzh": "101081100405", "sfzh": "320111199510256398"},
{"zkzh": "101081100406", "sfzh": "320505199611236548"},
{"zkzh": "101081100501", "sfzh": "320505199712154862"},
{"zkzh": "101081100203", "sfzh": "320106199602153695"},
{"zkzh": "101081100204", "sfzh": "320106199703126547"},
{"zkzh": "101081100301", "sfzh": "320106199804234152"},
{"zkzh": "101081100302", "sfzh": "320106199905124863"},
{"zkzh": "101081100303", "sfzh": "320106199012154789"},
{"zkzh": "101081100401", "sfzh": "320111199106152365"},
{"zkzh": "101081100402", "sfzh": "320111199207136548"},
{"zkzh": "101081100403", "sfzh": "320111199308242536"},
{"zkzh": "101081100404", "sfzh": "320111199409123568"},
{"zkzh": "101081100405", "sfzh": "320111199510256398"},
{"zkzh": "101081100406", "sfzh": "320505199611236548"},
{"zkzh": "101081100501", "sfzh": "320505199712154862"},
{"zkzh": "101081100502", "sfzh": "320505199801235674"}
]
# 扩充测试数据
def expand_test_data(base_data, target_count):
if not base_data:
print("没有真实测试数据,使用备用测试数据")
base_data = FALLBACK_TEST_DATA
result = base_data.copy()
base_count = len(base_data)
# 如果真实数据足够,直接返回
if base_count >= target_count:
return result[:target_count]
# 不足则基于已有数据生成类似格式的数据
needed = target_count - base_count
print(f"真实数据不足,基于 {base_count} 条真实数据生成 {needed} 条类似数据...")
# 获取格式模板
sample = base_data[0]
zkzh_format = sample["zkzh"]
sfzh_format = sample["sfzh"]
zkzh_length = len(zkzh_format)
sfzh_length = len(sfzh_format)
# 分析准考证号格式
zkzh_prefix = zkzh_format[:6] if zkzh_length > 6 else zkzh_format[:3]
for i in range(needed):
# 生成与真实数据格式匹配的测试数据
new_zkzh = f"{zkzh_prefix}{random.randint(100000, 999999)}"
new_zkzh = new_zkzh[:zkzh_length]
# 生成符合身份证号格式的数据
if sfzh_length == 18:
# 生成符合18位身份证规则的数据
year = random.randint(1990, 1999)
month = random.randint(1, 12)
day = random.randint(1, 28)
new_sfzh = f"320106{year}{month:02d}{day:02d}{random.randint(1000, 9999)}"
else:
new_sfzh = ''.join([str(random.randint(0, 9)) for _ in range(sfzh_length)])
result.append({"zkzh": new_zkzh, "sfzh": new_sfzh})
return result
# 单个查询请求 - 包括等待室机制和轮询处理
async def query_score_with_polling(session, base_url, data, timeout=30, max_polls=10, poll_interval=0.5):
url = f"{base_url}/api/query_score"
2025-07-10 13:42:54 +08:00
start_time = time.time()
2025-07-11 10:43:14 +08:00
2025-07-10 13:42:54 +08:00
try:
2025-07-11 10:43:14 +08:00
# 首次查询
2025-07-10 13:42:54 +08:00
async with session.post(url, json=data, timeout=timeout) as response:
result = await response.json()
2025-07-11 10:43:14 +08:00
# 检查是否需要轮询(进入等待队列)
if result.get("queue_position") is not None and result.get("estimated_wait_time") is not None:
# 进入轮询模式
queue_position = result.get("queue_position")
wait_time = result.get("estimated_wait_time")
# 轮询直到获取结果或达到最大轮询次数
polls_count = 0
while polls_count < max_polls:
# 等待一段时间
poll_delay = min(wait_time / 2, poll_interval) # 动态调整轮询间隔
await asyncio.sleep(poll_delay)
# 再次发送请求
async with session.post(url, json=data, timeout=timeout) as poll_response:
poll_result = await poll_response.json()
# 如果获取到结果或不再需要等待
if not poll_result.get("queue_position") or poll_result.get("data") is not None:
end_time = time.time()
return {
"status_code": poll_response.status,
"response_time": end_time - start_time, # 总响应时间包括轮询等待时间
"success": poll_result.get("success", False),
"message": poll_result.get("message", ""),
"in_queue": False, # 已经出队
"queue_position": None,
"cached": "来自缓存" in poll_result.get("message", ""),
"data": data,
"has_result": poll_result.get("data") is not None,
"polls": polls_count + 1 # 记录总共轮询次数
}
# 更新计数和队列位置
polls_count += 1
# 轮询超过最大次数,返回最终状态
end_time = time.time()
return {
"status_code": 408, # 请求超时
"response_time": end_time - start_time,
"success": False,
"message": "等待队列轮询超时",
"in_queue": True,
"queue_position": queue_position,
"cached": False,
"data": data,
"has_result": False,
"polls": polls_count
}
else:
# 不需要轮询,直接返回结果
end_time = time.time()
return {
"status_code": response.status,
"response_time": end_time - start_time,
"success": result.get("success", False),
"message": result.get("message", ""),
"in_queue": False,
"queue_position": None,
"cached": "来自缓存" in result.get("message", ""),
"data": data,
"has_result": result.get("data") is not None,
"polls": 0
}
2025-07-10 13:42:54 +08:00
except asyncio.TimeoutError:
return {
"status_code": 0,
"response_time": time.time() - start_time,
"success": False,
"message": "请求超时",
"in_queue": False,
2025-07-11 10:43:14 +08:00
"queue_position": None,
"data": data,
"has_result": False,
"polls": 0
2025-07-10 13:42:54 +08:00
}
except Exception as e:
return {
"status_code": 0,
"response_time": time.time() - start_time,
"success": False,
"message": f"发生错误: {str(e)}",
"in_queue": False,
2025-07-11 10:43:14 +08:00
"queue_position": None,
"data": data,
"has_result": False,
"polls": 0
2025-07-10 13:42:54 +08:00
}
2025-07-11 10:43:14 +08:00
# 强制使系统进入等待室模式
async def force_waiting_room_mode(session, base_url):
"""发送大量请求迫使系统进入等待室模式"""
url = f"{base_url}/api/query_score"
2025-07-10 13:42:54 +08:00
2025-07-11 10:43:14 +08:00
print("正在尝试激活等待室模式...")
tasks = []
# 创建足够多的并发请求以激活等待室
batch_size = 200
for i in range(batch_size):
data = {"zkzh": f"10{random.randint(10000000, 99999999)}",
"sfzh": f"32010619{random.randint(90, 99)}{random.randint(1, 12):02d}{random.randint(1, 28):02d}{random.randint(1000, 9999)}"}
task = asyncio.create_task(session.post(url, json=data))
tasks.append(task)
2025-07-10 13:42:54 +08:00
2025-07-11 10:43:14 +08:00
# 等待所有请求完成
for future in asyncio.as_completed(tasks):
try:
await future
except:
pass
print(f"已发送 {batch_size} 个请求以激活等待室机制")
2025-07-10 13:42:54 +08:00
2025-07-11 10:43:14 +08:00
# 测试缓存效果
async def test_cache_effect(session, base_url, test_data, repeat=3):
2025-07-10 13:42:54 +08:00
url = f"{base_url}/api/query_score"
2025-07-11 10:43:14 +08:00
cache_results = []
# 第一轮查询 - 应该全部未命中缓存
print("\n===== 第1轮查询(无缓存) =====")
round1_results = []
for i, data in enumerate(tqdm(test_data, desc="第1轮查询")):
result = await query_score_with_polling(session, base_url, data)
round1_results.append(result)
await asyncio.sleep(0.05) # 降低查询速率
cache_results.append(round1_results)
# 后续轮查询 - 应该命中缓存,响应更快
for r in range(2, repeat+1):
print(f"\n===== 第{r}轮查询(应命中缓存) =====")
round_results = []
for i, data in enumerate(tqdm(test_data, desc=f"{r}轮查询")):
result = await query_score_with_polling(session, base_url, data)
round_results.append(result)
await asyncio.sleep(0.05) # 降低查询速率
cache_results.append(round_results)
return cache_results
# 并发测试主函数 - 包括等待室测试
async def run_concurrent_test(base_url, concurrency, total_requests, delay=0, test_data=None, test_waiting_room=True):
2025-07-10 13:42:54 +08:00
results = []
async with aiohttp.ClientSession() as session:
2025-07-11 10:43:14 +08:00
# 如果需要测试等待室,先激活等待室模式
if test_waiting_room:
await force_waiting_room_mode(session, base_url)
# 准备测试数据
if test_data is None:
test_data = await fetch_db_test_data(total_requests)
test_data = expand_test_data(test_data, total_requests)
print(f"准备发送 {len(test_data)} 条测试请求...")
2025-07-10 13:42:54 +08:00
# 分批次发送请求
tasks = []
for i, data in enumerate(test_data):
if i > 0 and i % concurrency == 0:
await asyncio.sleep(delay)
2025-07-11 10:43:14 +08:00
task = asyncio.create_task(query_score_with_polling(session, base_url, data))
2025-07-10 13:42:54 +08:00
tasks.append(task)
# 使用tqdm显示进度条
2025-07-11 10:43:14 +08:00
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="处理请求"):
2025-07-10 13:42:54 +08:00
result = await future
results.append(result)
return results
2025-07-11 10:43:14 +08:00
# 生成测试报告 - 增加等待室和轮询相关指标
def generate_report(results, include_cache_analysis=False, rounds=None):
2025-07-10 13:42:54 +08:00
total = len(results)
success_count = sum(1 for r in results if r["success"])
failure_count = total - success_count
in_queue_count = sum(1 for r in results if r["in_queue"])
2025-07-11 10:43:14 +08:00
cached_count = sum(1 for r in results if r.get("cached", False))
has_result_count = sum(1 for r in results if r.get("has_result", False))
# 轮询相关统计
polled_requests = [r for r in results if r.get("polls", 0) > 0]
avg_polls = sum(r.get("polls", 0) for r in results) / len(polled_requests) if polled_requests else 0
max_polls = max((r.get("polls", 0) for r in results), default=0)
2025-07-10 13:42:54 +08:00
response_times = [r["response_time"] for r in results]
2025-07-11 10:43:14 +08:00
avg_response_time = sum(response_times) / len(response_times) if response_times else 0
min_response_time = min(response_times) if response_times else 0
max_response_time = max(response_times) if response_times else 0
2025-07-10 13:42:54 +08:00
# 计算各百分位响应时间
2025-07-11 10:43:14 +08:00
if response_times:
response_times.sort()
p50 = response_times[int(total * 0.5)] if total > 0 else 0
p90 = response_times[int(total * 0.9)] if total > 0 else 0
p95 = response_times[int(total * 0.95)] if total > 0 else 0
p99 = response_times[int(total * 0.99)] if total > 0 else 0
else:
p50 = p90 = p95 = p99 = 0
2025-07-10 13:42:54 +08:00
# 状态码分布
status_codes = defaultdict(int)
for r in results:
status_codes[r["status_code"]] += 1
# 错误消息分类
error_messages = defaultdict(int)
for r in results:
if not r["success"]:
error_messages[r["message"]] += 1
report = {
"总请求数": total,
"成功请求": success_count,
"失败请求": failure_count,
2025-07-11 10:43:14 +08:00
"成功率": f"{(success_count/total*100):.2f}%" if total > 0 else "0%",
"返回结果数": has_result_count,
"返回结果率": f"{(has_result_count/total*100):.2f}%" if total > 0 else "0%",
2025-07-10 13:42:54 +08:00
"进入等待队列数": in_queue_count,
2025-07-11 10:43:14 +08:00
"等待队列率": f"{(in_queue_count/total*100):.2f}%" if total > 0 else "0%",
"命中缓存数": cached_count,
"轮询统计": {
"需要轮询请求数": len(polled_requests),
"平均轮询次数": f"{avg_polls:.2f}",
"最大轮询次数": max_polls
},
2025-07-10 13:42:54 +08:00
"响应时间(秒)": {
"平均": f"{avg_response_time:.4f}",
"最小": f"{min_response_time:.4f}",
"最大": f"{max_response_time:.4f}",
"P50": f"{p50:.4f}",
"P90": f"{p90:.4f}",
"P95": f"{p95:.4f}",
"P99": f"{p99:.4f}"
},
"状态码分布": dict(status_codes),
"错误消息分类": dict(error_messages)
}
2025-07-11 10:43:14 +08:00
# 缓存效果分析
if include_cache_analysis and rounds:
cache_analysis = {}
for i, round_results in enumerate(rounds):
round_avg_time = sum(r["response_time"] for r in round_results) / len(round_results) if round_results else 0
round_success = sum(1 for r in round_results if r["success"])
round_has_result = sum(1 for r in round_results if r.get("has_result", False))
round_cached = sum(1 for r in round_results if r.get("cached", False))
round_in_queue = sum(1 for r in round_results if r.get("in_queue", False))
cache_analysis[f"{i+1}"] = {
"平均响应时间": f"{round_avg_time:.4f}",
"成功率": f"{(round_success/len(round_results)*100):.2f}%" if round_results else "0%",
"返回结果率": f"{(round_has_result/len(round_results)*100):.2f}%" if round_results else "0%",
"命中缓存数": round_cached,
"命中缓存率": f"{(round_cached/len(round_results)*100):.2f}%" if round_results else "0%",
"进入等待队列数": round_in_queue,
"等待队列率": f"{(round_in_queue/len(round_results)*100):.2f}%" if round_results else "0%",
}
report["缓存效果分析"] = cache_analysis
2025-07-10 13:42:54 +08:00
return report
2025-07-11 10:43:14 +08:00
# 清除缓存,用于测试
async def clear_cache(session, base_url):
try:
async with session.post(f"{base_url}/api/clear_cache") as response:
result = await response.json()
return result.get("success", False), result.get("message", "未知结果")
except Exception as e:
return False, f"清除缓存失败: {str(e)}"
2025-07-10 13:42:54 +08:00
async def main():
2025-07-11 10:43:14 +08:00
parser = argparse.ArgumentParser(description="江苏省人事考试成绩查询系统并发性能测试工具")
parser.add_argument("--url", default="http://127.0.0.1:80", help="API基础URL")
2025-07-10 13:42:54 +08:00
parser.add_argument("--concurrency", type=int, default=100, help="并发请求数")
parser.add_argument("--total", type=int, default=1000, help="总请求数")
parser.add_argument("--delay", type=float, default=0.1, help="批次间延迟(秒)")
parser.add_argument("--output", default="test_report.json", help="报告输出文件")
2025-07-11 10:43:14 +08:00
parser.add_argument("--mode", choices=["concurrent", "cache", "waiting-room", "all"], default="all",
help="测试模式: concurrent=并发测试, cache=缓存测试, waiting-room=等待室测试, all=全部测试")
parser.add_argument("--no-waiting-room", action="store_true", help="跳过等待室测试")
2025-07-10 13:42:54 +08:00
args = parser.parse_args()
2025-07-11 10:43:14 +08:00
print(f"成绩查询系统高并发性能测试工具 - 连接到 {args.url}")
# 首先从数据库获取测试数据
test_data = await fetch_db_test_data(args.total)
if not test_data:
print("警告: 无法从数据库获取测试数据,将使用备用数据")
2025-07-10 13:42:54 +08:00
2025-07-11 10:43:14 +08:00
# 确保测试数据量足够
test_data = expand_test_data(test_data, args.total)
# 创建会话
async with aiohttp.ClientSession() as session:
if args.mode in ["concurrent", "waiting-room", "all"]:
print(f"\n开始并发测试: URL={args.url}, 并发数={args.concurrency}, 总请求数={args.total}")
start_time = time.time()
# 先清除缓存
clear_success, clear_message = await clear_cache(session, args.url)
print(f"清除缓存: {clear_message}")
# 运行并发测试
results = await run_concurrent_test(
args.url,
args.concurrency,
args.total,
args.delay,
test_data,
test_waiting_room=not args.no_waiting_room
)
end_time = time.time()
total_time = end_time - start_time
# 生成报告
report = generate_report(results)
report["总测试时间(秒)"] = f"{total_time:.2f}"
report["每秒请求数(RPS)"] = f"{args.total/total_time:.2f}" if total_time > 0 else "0"
# 保存报告
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
# 打印关键指标
print("\n== 并发测试结果 ==")
print(f"总请求数: {report['总请求数']}")
print(f"成功率: {report['成功率']}")
print(f"返回结果率: {report['返回结果率']}")
print(f"进入等待队列数: {report['进入等待队列数']} ({report['等待队列率']})")
print(f"平均响应时间: {report['响应时间(秒)']['平均']}")
print(f"RPS: {report['每秒请求数(RPS)']}请求/秒")
# 如果有轮询,显示轮询统计
if report["轮询统计"]["需要轮询请求数"] > 0:
print(f"需要轮询请求数: {report['轮询统计']['需要轮询请求数']}")
print(f"平均轮询次数: {report['轮询统计']['平均轮询次数']}")
print(f"最大轮询次数: {report['轮询统计']['最大轮询次数']}")
print(f"详细报告已保存至: {args.output}")
if args.mode in ["cache", "all"]:
print("\n开始缓存效果测试...")
# 先清除缓存
clear_success, clear_message = await clear_cache(session, args.url)
print(f"清除缓存: {clear_message}")
# 使用部分测试数据进行缓存测试
cache_test_count = min(20, len(test_data))
cache_test_data = test_data[:cache_test_count]
# 测试缓存效果
cache_results = await test_cache_effect(session, args.url, cache_test_data, repeat=3)
# 生成缓存效果报告
cache_report = generate_report([item for sublist in cache_results for item in sublist], True, cache_results)
# 保存缓存报告
cache_report_file = "cache_" + args.output
with open(cache_report_file, 'w', encoding='utf-8') as f:
json.dump(cache_report, f, ensure_ascii=False, indent=2)
# 打印缓存效果
print("\n== 缓存效果测试结果 ==")
if "缓存效果分析" in cache_report:
for round_name, round_data in cache_report["缓存效果分析"].items():
print(f"{round_name}: 平均响应时间={round_data['平均响应时间']}, 命中缓存率={round_data['命中缓存率']}")
print(f"详细缓存报告已保存至: {cache_report_file}")
2025-07-10 13:42:54 +08:00
if __name__ == "__main__":
2025-07-11 10:43:14 +08:00
asyncio.run(main())