Commit 8fd6a07a by qlintonger xeno

新增python测试script

parent 2b845919
import websocket
import threading
import random
import time
import json
# 服务器地址
SERVER_URL = "ws://localhost:12345"
# 客户端数量
CLIENT_COUNT = 200
# wsPWD
WS_PWD = "Q8kFm5LzJ2Ab"
def generate_random_id(length=10):
"""生成指定长度的随机字符串作为 ID"""
import string
import random
letters = string.ascii_letters + string.digits
return ''.join(random.choice(letters) for i in range(length))
def send_heartbeat(ws):
"""发送心跳消息的函数"""
while True:
from_id = generate_random_id()
from_name = generate_random_id()
heartbeat_msg = {
"fromId": from_id,
"fromName": from_name,
"msgType": "Heart",
"msgData": None
}
try:
ws.send(json.dumps(heartbeat_msg))
print(f"Sent heartbeat: {heartbeat_msg}")
except websocket.WebSocketException as e:
print(f"Error sending heartbeat: {e}")
break
time.sleep(5)
def client_task():
"""每个客户端的任务函数"""
ws = websocket.WebSocket()
try:
ws.connect(SERVER_URL + "?fromId=" + generate_random_id() + "&deviceId=" + generate_random_id() + "&wsPwd={}".format(WS_PWD))
print("Connected to the server")
# 启动心跳发送线程
heartbeat_thread = threading.Thread(target=send_heartbeat, args=(ws,))
heartbeat_thread.daemon = True
heartbeat_thread.start()
# 随机 40 秒内关闭连接
close_time = random.randint(20, 40)
print(f"Will close the connection in {close_time} seconds")
time.sleep(close_time)
try:
ws.close()
print("Connection closed")
except websocket.WebSocketException as e:
print(f"Error closing connection: {e}")
except websocket.WebSocketException as e:
print(f"Failed to connect to the server: {e}")
if __name__ == "__main__":
threads = []
for _ in range(CLIENT_COUNT):
thread = threading.Thread(target=client_task)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
\ No newline at end of file
import websocket
import threading
import random
import time
import json
# 服务器地址
SERVER_URL = "ws://localhost:12345"
# 客户端数量
CLIENT_COUNT = 20
# wsPWD
WS_PWD = "Q8kFm5LzJ2Ab"
def generate_random_id(length=10):
"""生成指定长度的随机字符串作为 ID"""
import string
letters = string.ascii_letters + string.digits
return ''.join(random.choice(letters) for i in range(length))
def send_heartbeat(ws, from_id):
"""发送心跳消息的函数"""
while True:
from_name = generate_random_id()
heartbeat_msg = {
"fromId": from_id,
"fromName": from_name,
"msgType": "Heart",
"msgData": None
}
try:
ws.send(json.dumps(heartbeat_msg))
print(f"Sent heartbeat: {heartbeat_msg}")
except websocket.WebSocketException as e:
print(f"Error sending heartbeat: {e}")
break
time.sleep(5)
def client_task():
"""每个客户端的任务函数"""
# 随机决定是否使用重复的 fromId
if random.random() < 0.5: # 30% 的概率使用重复的 fromId
from_id = random.choice(existing_from_ids)
else:
from_id = generate_random_id()
existing_from_ids.append(from_id)
ws = websocket.WebSocket()
try:
ws.connect(SERVER_URL + f"?fromId={from_id}&deviceId={generate_random_id()}&wsPwd={WS_PWD}")
print("Connected to the server")
# 启动心跳发送线程
heartbeat_thread = threading.Thread(target=send_heartbeat, args=(ws, from_id))
heartbeat_thread.daemon = True
heartbeat_thread.start()
# 随机 20 - 30 秒内关闭连接
close_time = random.randint(20, 30)
print(f"Will close the connection in {close_time} seconds")
time.sleep(close_time)
try:
ws.close()
print("Connection closed")
except websocket.WebSocketException as e:
print(f"Error closing connection: {e}")
except websocket.WebSocketException as e:
print(f"Failed to connect to the server: {e}")
if __name__ == "__main__":
existing_from_ids = []
threads = []
for _ in range(CLIENT_COUNT):
thread = threading.Thread(target=client_task)
threads.append(thread)
thread.start()
# 等待所有线程完成
for thread in threads:
thread.join()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment