RSJselet/cstest.py

197 lines
6.9 KiB
Python
Raw Normal View History

2025-07-10 13:42:54 +08:00
# 文件名: concurrent_test.py
import asyncio
import aiohttp
import time
import json
import random
import argparse
from collections import defaultdict
from tqdm import tqdm
# 测试数据生成
def generate_test_data(count):
test_data = []
# 基于系统中看到的测试数据格式
for i in range(count):
zkzh = random.randint(2000000, 2999999)
sj = f"138{random.randint(10000000, 99999999)}"
test_data.append({"zkzh": str(zkzh), "sj": sj})
return test_data
# 单个查询请求
async def query_score(session, url, data, timeout=30):
start_time = time.time()
try:
async with session.post(url, json=data, timeout=timeout) as response:
result = await response.json()
end_time = time.time()
return {
"status_code": response.status,
"response_time": end_time - start_time,
"success": result.get("success", False),
"message": result.get("message", ""),
"in_queue": "queue_position" in result,
"queue_position": result.get("queue_position", None)
}
except asyncio.TimeoutError:
return {
"status_code": 0,
"response_time": time.time() - start_time,
"success": False,
"message": "请求超时",
"in_queue": False,
"queue_position": None
}
except Exception as e:
return {
"status_code": 0,
"response_time": time.time() - start_time,
"success": False,
"message": f"发生错误: {str(e)}",
"in_queue": False,
"queue_position": None
}
# 轮询状态(如果被放入等待队列)
async def poll_status(session, base_url, task_id, max_attempts=20, interval=1):
status_url = f"{base_url}/api/query_status/{task_id}"
for attempt in range(max_attempts):
try:
async with session.get(status_url) as response:
result = await response.json()
if result.get("success") and "data" in result:
return {"success": True, "data": result["data"], "attempts": attempt + 1}
elif not result.get("in_progress", True):
return {"success": False, "message": result.get("message", "查询失败"), "attempts": attempt + 1}
await asyncio.sleep(interval)
except Exception:
await asyncio.sleep(interval)
return {"success": False, "message": "轮询超时", "attempts": max_attempts}
# 并发测试主函数
async def run_concurrent_test(base_url, concurrency, total_requests, delay=0):
url = f"{base_url}/api/query_score"
test_data = generate_test_data(total_requests)
results = []
# 创建一个共享的 ClientSession
async with aiohttp.ClientSession() as session:
# 分批次发送请求
tasks = []
for i, data in enumerate(test_data):
if i > 0 and i % concurrency == 0:
await asyncio.sleep(delay)
task = asyncio.create_task(query_score(session, url, data))
tasks.append(task)
# 使用tqdm显示进度条
for future in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="发送请求"):
result = await future
results.append(result)
return results
# 生成测试报告
def generate_report(results):
total = len(results)
success_count = sum(1 for r in results if r["success"])
failure_count = total - success_count
in_queue_count = sum(1 for r in results if r["in_queue"])
response_times = [r["response_time"] for r in results]
avg_response_time = sum(response_times) / len(response_times)
min_response_time = min(response_times)
max_response_time = max(response_times)
# 计算各百分位响应时间
response_times.sort()
p50 = response_times[int(total * 0.5)]
p90 = response_times[int(total * 0.9)]
p95 = response_times[int(total * 0.95)]
p99 = response_times[int(total * 0.99)]
# 状态码分布
status_codes = defaultdict(int)
for r in results:
status_codes[r["status_code"]] += 1
# 错误消息分类
error_messages = defaultdict(int)
for r in results:
if not r["success"]:
error_messages[r["message"]] += 1
report = {
"总请求数": total,
"成功请求": success_count,
"失败请求": failure_count,
"成功率": f"{(success_count/total*100):.2f}%",
"进入等待队列数": in_queue_count,
"响应时间(秒)": {
"平均": f"{avg_response_time:.4f}",
"最小": f"{min_response_time:.4f}",
"最大": f"{max_response_time:.4f}",
"P50": f"{p50:.4f}",
"P90": f"{p90:.4f}",
"P95": f"{p95:.4f}",
"P99": f"{p99:.4f}"
},
"状态码分布": dict(status_codes),
"错误消息分类": dict(error_messages)
}
return report
async def main():
parser = argparse.ArgumentParser(description="成绩查询系统并发测试工具")
parser.add_argument("--url", default="http://localhost:8000", help="API基础URL")
parser.add_argument("--concurrency", type=int, default=100, help="并发请求数")
parser.add_argument("--total", type=int, default=1000, help="总请求数")
parser.add_argument("--delay", type=float, default=0.1, help="批次间延迟(秒)")
parser.add_argument("--output", default="test_report.json", help="报告输出文件")
args = parser.parse_args()
print(f"开始并发测试: URL={args.url}, 并发数={args.concurrency}, 总请求数={args.total}")
start_time = time.time()
results = await run_concurrent_test(
args.url,
args.concurrency,
args.total,
args.delay
)
end_time = time.time()
total_time = end_time - start_time
report = generate_report(results)
report["总测试时间(秒)"] = f"{total_time:.2f}"
report["每秒请求数(RPS)"] = f"{args.total/total_time:.2f}"
# 保存报告到文件
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"测试完成! 总耗时: {total_time:.2f}")
print(f"报告已保存至: {args.output}")
# 打印关键指标
print("\n关键性能指标:")
print(f"总请求数: {report['总请求数']}")
print(f"成功率: {report['成功率']}")
print(f"平均响应时间: {report['响应时间(秒)']['平均']}")
print(f"RPS: {report['每秒请求数(RPS)']}请求/秒")
if __name__ == "__main__":
asyncio.run(main())
# # 默认本地测试
# python concurrent_test.py
# # 指定URL和并发参数
# python concurrent_test.py --url http://192.168.0.46:8000 --concurrency 200 --total 2000