# 文件名: cstest.py import asyncio import aiohttp import aiomysql import time import json import random import argparse from collections import defaultdict from tqdm import tqdm import sys import os # 导入配置文件中的数据库设置 try: from config import DB_CONFIG, REDIS_CONFIG, CACHE_EXPIRE, WAITING_ROOM_CAPACITY print("成功导入数据库配置") except ImportError: print("警告: 无法导入配置文件,将使用默认配置") # 默认数据库配置 DB_CONFIG = { "host": "127.0.0.1", "port": 3306, "user": "root", "password": "123456", "db": "harsjselect", "charset": "utf8mb4", "minsize": 1, "maxsize": 100, "pool_recycle": 3600 } # 等待室容量默认值 WAITING_ROOM_CAPACITY = 15000 # 直接从数据库获取真实测试数据 async def fetch_db_test_data(limit=100): """直接从数据库中获取有效的测试数据,不做去重处理""" print(f"正在从数据库获取真实测试数据 (最多 {limit} 条)...") db_pool = None try: # 创建数据库连接池 db_pool = await aiomysql.create_pool( host=DB_CONFIG["host"], port=DB_CONFIG["port"], user=DB_CONFIG["user"], password=DB_CONFIG["password"], db=DB_CONFIG["db"], charset=DB_CONFIG["charset"], minsize=1, maxsize=5 ) # 获取连接并查询数据 async with db_pool.acquire() as conn: async with conn.cursor(aiomysql.DictCursor) as cursor: # 查询总记录数 await cursor.execute("SELECT COUNT(*) as total_count FROM rsjcjselect") result = await cursor.fetchone() total_records = result['total_count'] print(f"数据库中共有 {total_records} 条记录") # 查询表结构以确认字段名 await cursor.execute("SHOW COLUMNS FROM rsjcjselect") columns = await cursor.fetchall() column_names = [col['Field'] for col in columns] # 检查必要的字段是否存在 zkzh_field = 'zkzh' if 'zkzh' in column_names else None sfzh_field = 'sfzh' if 'sfzh' in column_names else None if not zkzh_field or not sfzh_field: print("警告: 数据库中缺少必要的字段 (zkzh 或 sfzh)") return [] # 方式1: 随机抽取记录 (最推荐) query = f""" SELECT {zkzh_field}, {sfzh_field} FROM rsjcjselect WHERE {zkzh_field} IS NOT NULL AND {sfzh_field} IS NOT NULL ORDER BY RAND() LIMIT %s """ # 执行查询 await cursor.execute(query, (limit,)) rows = await cursor.fetchall() if not rows: print("警告: 数据库中没有找到符合条件的测试数据") return [] # 转换为测试数据格式 test_data = [] for row in rows: test_data.append({ "zkzh": str(row[zkzh_field]), "sfzh": str(row[sfzh_field]) }) # 记录唯一考生数量 unique_pairs = set((item["zkzh"], item["sfzh"]) for item in test_data) print(f"成功从数据库获取 {len(test_data)} 条真实测试数据") print(f"其中包含 {len(unique_pairs)} 个不同的考生") return test_data except Exception as e: print(f"从数据库获取测试数据失败: {str(e)}") return [] finally: # 关闭连接池 if db_pool: db_pool.close() await db_pool.wait_closed() # 备用测试数据 - 从SQL文件中提取的实际数据 FALLBACK_TEST_DATA = [ {"zkzh": "101081100101", "sfzh": "320106199001015432"}, {"zkzh": "101081100102", "sfzh": "320106199203023625"}, {"zkzh": "101081100103", "sfzh": "320106199104125486"}, {"zkzh": "101081100104", "sfzh": "320106199209089635"}, {"zkzh": "101081100105", "sfzh": "320106199305075218"}, {"zkzh": "101081100201", "sfzh": "320106199407125896"}, {"zkzh": "101081100202", "sfzh": "320106199501234785"}, {"zkzh": "101081100203", "sfzh": "320106199602153695"}, {"zkzh": "101081100204", "sfzh": "320106199703126547"}, {"zkzh": "101081100301", "sfzh": "320106199804234152"}, {"zkzh": "101081100302", "sfzh": "320106199905124863"}, {"zkzh": "101081100303", "sfzh": "320106199012154789"}, {"zkzh": "101081100401", "sfzh": "320111199106152365"}, {"zkzh": "101081100402", "sfzh": "320111199207136548"}, {"zkzh": "101081100403", "sfzh": "320111199308242536"}, {"zkzh": "101081100404", "sfzh": "320111199409123568"}, {"zkzh": "101081100405", "sfzh": "320111199510256398"}, {"zkzh": "101081100406", "sfzh": "320505199611236548"}, {"zkzh": "101081100501", "sfzh": "320505199712154862"}, {"zkzh": "101081100203", "sfzh": "320106199602153695"}, {"zkzh": "101081100204", "sfzh": "320106199703126547"}, {"zkzh": "101081100301", "sfzh": "320106199804234152"}, {"zkzh": "101081100302", "sfzh": "320106199905124863"}, {"zkzh": "101081100303", "sfzh": "320106199012154789"}, {"zkzh": "101081100401", "sfzh": "320111199106152365"}, {"zkzh": "101081100402", "sfzh": "320111199207136548"}, {"zkzh": "101081100403", "sfzh": "320111199308242536"}, {"zkzh": "101081100404", "sfzh": "320111199409123568"}, {"zkzh": "101081100405", "sfzh": "320111199510256398"}, {"zkzh": "101081100406", "sfzh": "320505199611236548"}, {"zkzh": "101081100501", "sfzh": "320505199712154862"}, {"zkzh": "101081100502", "sfzh": "320505199801235674"} ] # 扩充测试数据 def expand_test_data(base_data, target_count): if not base_data: print("没有真实测试数据,使用备用测试数据") base_data = FALLBACK_TEST_DATA result = base_data.copy() base_count = len(base_data) # 如果真实数据足够,直接返回 if base_count >= target_count: return result[:target_count] # 不足则基于已有数据生成类似格式的数据 needed = target_count - base_count print(f"真实数据不足,基于 {base_count} 条真实数据生成 {needed} 条类似数据...") # 获取格式模板 sample = base_data[0] zkzh_format = sample["zkzh"] sfzh_format = sample["sfzh"] zkzh_length = len(zkzh_format) sfzh_length = len(sfzh_format) # 分析准考证号格式 zkzh_prefix = zkzh_format[:6] if zkzh_length > 6 else zkzh_format[:3] for i in range(needed): # 生成与真实数据格式匹配的测试数据 new_zkzh = f"{zkzh_prefix}{random.randint(100000, 999999)}" new_zkzh = new_zkzh[:zkzh_length] # 生成符合身份证号格式的数据 if sfzh_length == 18: # 生成符合18位身份证规则的数据 year = random.randint(1990, 1999) month = random.randint(1, 12) day = random.randint(1, 28) new_sfzh = f"320106{year}{month:02d}{day:02d}{random.randint(1000, 9999)}" else: new_sfzh = ''.join([str(random.randint(0, 9)) for _ in range(sfzh_length)]) result.append({"zkzh": new_zkzh, "sfzh": new_sfzh}) return result # 单个查询请求 - 包括等待室机制和轮询处理 async def query_score_with_polling(session, base_url, data, timeout=30, max_polls=10, poll_interval=0.5): url = f"{base_url}/api/query_score" start_time = time.time() try: # 首次查询 async with session.post(url, json=data, timeout=timeout) as response: result = await response.json() # 检查是否需要轮询(进入等待队列) if result.get("queue_position") is not None and result.get("estimated_wait_time") is not None: # 进入轮询模式 queue_position = result.get("queue_position") wait_time = result.get("estimated_wait_time") # 轮询直到获取结果或达到最大轮询次数 polls_count = 0 while polls_count < max_polls: # 等待一段时间 poll_delay = min(wait_time / 2, poll_interval) # 动态调整轮询间隔 await asyncio.sleep(poll_delay) # 再次发送请求 async with session.post(url, json=data, timeout=timeout) as poll_response: poll_result = await poll_response.json() # 如果获取到结果或不再需要等待 if not poll_result.get("queue_position") or poll_result.get("data") is not None: end_time = time.time() return { "status_code": poll_response.status, "response_time": end_time - start_time, # 总响应时间包括轮询等待时间 "success": poll_result.get("success", False), "message": poll_result.get("message", ""), "in_queue": False, # 已经出队 "queue_position": None, "cached": "来自缓存" in poll_result.get("message", ""), "data": data, "has_result": poll_result.get("data") is not None, "polls": polls_count + 1 # 记录总共轮询次数 } # 更新计数和队列位置 polls_count += 1 # 轮询超过最大次数,返回最终状态 end_time = time.time() return { "status_code": 408, # 请求超时 "response_time": end_time - start_time, "success": False, "message": "等待队列轮询超时", "in_queue": True, "queue_position": queue_position, "cached": False, "data": data, "has_result": False, "polls": polls_count } else: # 不需要轮询,直接返回结果 end_time = time.time() return { "status_code": response.status, "response_time": end_time - start_time, "success": result.get("success", False), "message": result.get("message", ""), "in_queue": False, "queue_position": None, "cached": "来自缓存" in result.get("message", ""), "data": data, "has_result": result.get("data") is not None, "polls": 0 } except asyncio.TimeoutError: return { "status_code": 0, "response_time": time.time() - start_time, "success": False, "message": "请求超时", "in_queue": False, "queue_position": None, "data": data, "has_result": False, "polls": 0 } except Exception as e: return { "status_code": 0, "response_time": time.time() - start_time, "success": False, "message": f"发生错误: {str(e)}", "in_queue": False, "queue_position": None, "data": data, "has_result": False, "polls": 0 } # 强制使系统进入等待室模式 async def force_waiting_room_mode(session, base_url): """发送大量请求迫使系统进入等待室模式""" url = f"{base_url}/api/query_score" print("正在尝试激活等待室模式...") tasks = [] # 创建足够多的并发请求以激活等待室 batch_size = 200 for i in range(batch_size): data = {"zkzh": f"10{random.randint(10000000, 99999999)}", "sfzh": f"32010619{random.randint(90, 99)}{random.randint(1, 12):02d}{random.randint(1, 28):02d}{random.randint(1000, 9999)}"} task = asyncio.create_task(session.post(url, json=data)) tasks.append(task) # 等待所有请求完成 for future in asyncio.as_completed(tasks): try: await future except: pass print(f"已发送 {batch_size} 个请求以激活等待室机制") # 测试缓存效果 async def test_cache_effect(session, base_url, test_data, repeat=3): url = f"{base_url}/api/query_score" cache_results = [] # 第一轮查询 - 应该全部未命中缓存 print("\n===== 第1轮查询(无缓存) =====") round1_results = [] for i, data in enumerate(tqdm(test_data, desc="第1轮查询")): result = await query_score_with_polling(session, base_url, data) round1_results.append(result) await asyncio.sleep(0.05) # 降低查询速率 cache_results.append(round1_results) # 后续轮查询 - 应该命中缓存,响应更快 for r in range(2, repeat+1): print(f"\n===== 第{r}轮查询(应命中缓存) =====") round_results = [] for i, data in enumerate(tqdm(test_data, desc=f"第{r}轮查询")): result = await query_score_with_polling(session, base_url, data) round_results.append(result) await asyncio.sleep(0.05) # 降低查询速率 cache_results.append(round_results) return cache_results # 并发测试主函数 - 包括等待室测试 async def run_concurrent_test(base_url, concurrency, total_requests, delay=0, test_data=None, test_waiting_room=True): results = [] async with aiohttp.ClientSession() as session: # 如果需要测试等待室,先激活等待室模式 if test_waiting_room: await force_waiting_room_mode(session, base_url) # 准备测试数据 if test_data is None: test_data = await fetch_db_test_data(total_requests) test_data = expand_test_data(test_data, total_requests) print(f"准备发送 {len(test_data)} 条测试请求...") # 分批次发送请求 tasks = [] for i, data in enumerate(test_data): if i > 0 and i % concurrency == 0: await asyncio.sleep(delay) task = asyncio.create_task(query_score_with_polling(session, base_url, data)) tasks.append(task) # 使用tqdm显示进度条 for future in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="处理请求"): result = await future results.append(result) return results # 生成测试报告 - 增加等待室和轮询相关指标 def generate_report(results, include_cache_analysis=False, rounds=None): total = len(results) success_count = sum(1 for r in results if r["success"]) failure_count = total - success_count in_queue_count = sum(1 for r in results if r["in_queue"]) cached_count = sum(1 for r in results if r.get("cached", False)) has_result_count = sum(1 for r in results if r.get("has_result", False)) # 轮询相关统计 polled_requests = [r for r in results if r.get("polls", 0) > 0] avg_polls = sum(r.get("polls", 0) for r in results) / len(polled_requests) if polled_requests else 0 max_polls = max((r.get("polls", 0) for r in results), default=0) response_times = [r["response_time"] for r in results] avg_response_time = sum(response_times) / len(response_times) if response_times else 0 min_response_time = min(response_times) if response_times else 0 max_response_time = max(response_times) if response_times else 0 # 计算各百分位响应时间 if response_times: response_times.sort() p50 = response_times[int(total * 0.5)] if total > 0 else 0 p90 = response_times[int(total * 0.9)] if total > 0 else 0 p95 = response_times[int(total * 0.95)] if total > 0 else 0 p99 = response_times[int(total * 0.99)] if total > 0 else 0 else: p50 = p90 = p95 = p99 = 0 # 状态码分布 status_codes = defaultdict(int) for r in results: status_codes[r["status_code"]] += 1 # 错误消息分类 error_messages = defaultdict(int) for r in results: if not r["success"]: error_messages[r["message"]] += 1 report = { "总请求数": total, "成功请求": success_count, "失败请求": failure_count, "成功率": f"{(success_count/total*100):.2f}%" if total > 0 else "0%", "返回结果数": has_result_count, "返回结果率": f"{(has_result_count/total*100):.2f}%" if total > 0 else "0%", "进入等待队列数": in_queue_count, "等待队列率": f"{(in_queue_count/total*100):.2f}%" if total > 0 else "0%", "命中缓存数": cached_count, "轮询统计": { "需要轮询请求数": len(polled_requests), "平均轮询次数": f"{avg_polls:.2f}", "最大轮询次数": max_polls }, "响应时间(秒)": { "平均": f"{avg_response_time:.4f}", "最小": f"{min_response_time:.4f}", "最大": f"{max_response_time:.4f}", "P50": f"{p50:.4f}", "P90": f"{p90:.4f}", "P95": f"{p95:.4f}", "P99": f"{p99:.4f}" }, "状态码分布": dict(status_codes), "错误消息分类": dict(error_messages) } # 缓存效果分析 if include_cache_analysis and rounds: cache_analysis = {} for i, round_results in enumerate(rounds): round_avg_time = sum(r["response_time"] for r in round_results) / len(round_results) if round_results else 0 round_success = sum(1 for r in round_results if r["success"]) round_has_result = sum(1 for r in round_results if r.get("has_result", False)) round_cached = sum(1 for r in round_results if r.get("cached", False)) round_in_queue = sum(1 for r in round_results if r.get("in_queue", False)) cache_analysis[f"第{i+1}轮"] = { "平均响应时间": f"{round_avg_time:.4f}秒", "成功率": f"{(round_success/len(round_results)*100):.2f}%" if round_results else "0%", "返回结果率": f"{(round_has_result/len(round_results)*100):.2f}%" if round_results else "0%", "命中缓存数": round_cached, "命中缓存率": f"{(round_cached/len(round_results)*100):.2f}%" if round_results else "0%", "进入等待队列数": round_in_queue, "等待队列率": f"{(round_in_queue/len(round_results)*100):.2f}%" if round_results else "0%", } report["缓存效果分析"] = cache_analysis return report # 清除缓存,用于测试 async def clear_cache(session, base_url): try: async with session.post(f"{base_url}/api/clear_cache") as response: result = await response.json() return result.get("success", False), result.get("message", "未知结果") except Exception as e: return False, f"清除缓存失败: {str(e)}" async def main(): parser = argparse.ArgumentParser(description="江苏省人事考试成绩查询系统并发性能测试工具") parser.add_argument("--url", default="http://127.0.0.1:80", help="API基础URL") parser.add_argument("--concurrency", type=int, default=100, help="并发请求数") parser.add_argument("--total", type=int, default=1000, help="总请求数") parser.add_argument("--delay", type=float, default=0.1, help="批次间延迟(秒)") parser.add_argument("--output", default="test_report.json", help="报告输出文件") parser.add_argument("--mode", choices=["concurrent", "cache", "waiting-room", "all"], default="all", help="测试模式: concurrent=并发测试, cache=缓存测试, waiting-room=等待室测试, all=全部测试") parser.add_argument("--no-waiting-room", action="store_true", help="跳过等待室测试") args = parser.parse_args() print(f"成绩查询系统高并发性能测试工具 - 连接到 {args.url}") # 首先从数据库获取测试数据 test_data = await fetch_db_test_data(args.total) if not test_data: print("警告: 无法从数据库获取测试数据,将使用备用数据") # 确保测试数据量足够 test_data = expand_test_data(test_data, args.total) # 创建会话 async with aiohttp.ClientSession() as session: if args.mode in ["concurrent", "waiting-room", "all"]: print(f"\n开始并发测试: URL={args.url}, 并发数={args.concurrency}, 总请求数={args.total}") start_time = time.time() # 先清除缓存 clear_success, clear_message = await clear_cache(session, args.url) print(f"清除缓存: {clear_message}") # 运行并发测试 results = await run_concurrent_test( args.url, args.concurrency, args.total, args.delay, test_data, test_waiting_room=not args.no_waiting_room ) end_time = time.time() total_time = end_time - start_time # 生成报告 report = generate_report(results) report["总测试时间(秒)"] = f"{total_time:.2f}" report["每秒请求数(RPS)"] = f"{args.total/total_time:.2f}" if total_time > 0 else "0" # 保存报告 with open(args.output, 'w', encoding='utf-8') as f: json.dump(report, f, ensure_ascii=False, indent=2) # 打印关键指标 print("\n== 并发测试结果 ==") print(f"总请求数: {report['总请求数']}") print(f"成功率: {report['成功率']}") print(f"返回结果率: {report['返回结果率']}") print(f"进入等待队列数: {report['进入等待队列数']} ({report['等待队列率']})") print(f"平均响应时间: {report['响应时间(秒)']['平均']}秒") print(f"RPS: {report['每秒请求数(RPS)']}请求/秒") # 如果有轮询,显示轮询统计 if report["轮询统计"]["需要轮询请求数"] > 0: print(f"需要轮询请求数: {report['轮询统计']['需要轮询请求数']}") print(f"平均轮询次数: {report['轮询统计']['平均轮询次数']}") print(f"最大轮询次数: {report['轮询统计']['最大轮询次数']}") print(f"详细报告已保存至: {args.output}") if args.mode in ["cache", "all"]: print("\n开始缓存效果测试...") # 先清除缓存 clear_success, clear_message = await clear_cache(session, args.url) print(f"清除缓存: {clear_message}") # 使用部分测试数据进行缓存测试 cache_test_count = min(20, len(test_data)) cache_test_data = test_data[:cache_test_count] # 测试缓存效果 cache_results = await test_cache_effect(session, args.url, cache_test_data, repeat=3) # 生成缓存效果报告 cache_report = generate_report([item for sublist in cache_results for item in sublist], True, cache_results) # 保存缓存报告 cache_report_file = "cache_" + args.output with open(cache_report_file, 'w', encoding='utf-8') as f: json.dump(cache_report, f, ensure_ascii=False, indent=2) # 打印缓存效果 print("\n== 缓存效果测试结果 ==") if "缓存效果分析" in cache_report: for round_name, round_data in cache_report["缓存效果分析"].items(): print(f"{round_name}: 平均响应时间={round_data['平均响应时间']}, 命中缓存率={round_data['命中缓存率']}") print(f"详细缓存报告已保存至: {cache_report_file}") if __name__ == "__main__": asyncio.run(main())