Skip to content
Projects
Groups
Snippets
Help
This project
Loading...
Sign in / Register
Toggle navigation
W
ws-rst
Overview
Overview
Details
Activity
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
0
Issues
0
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
qlintonger xeno
ws-rst
Commits
f0a9c691
Commit
f0a9c691
authored
Feb 14, 2025
by
qlintonger xeno
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Delete simple_test.py
parent
7d671bed
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
0 additions
and
79 deletions
+0
-79
test/simple_test.py
+0
-79
No files found.
test/simple_test.py
deleted
100644 → 0
View file @
7d671bed
import
websocket
import
threading
import
random
import
time
import
json
# 服务器地址
SERVER_URL
=
"ws://localhost:12345"
# 客户端数量
CLIENT_COUNT
=
10
# wsPWD
WS_PWD
=
"Q8kFm5LzJ2Ab"
def
generate_random_id
(
length
=
10
):
"""生成指定长度的随机字符串作为 ID"""
import
string
import
random
letters
=
string
.
ascii_letters
+
string
.
digits
return
''
.
join
(
random
.
choice
(
letters
)
for
i
in
range
(
length
))
def
send_heartbeat
(
ws
):
"""发送心跳消息的函数"""
while
True
:
from_id
=
generate_random_id
()
from_name
=
generate_random_id
()
heartbeat_msg
=
{
"fromId"
:
from_id
,
"fromName"
:
from_name
,
"msgType"
:
"Heart"
,
"msgData"
:
None
}
try
:
ws
.
send
(
json
.
dumps
(
heartbeat_msg
))
print
(
f
"Sent heartbeat: {heartbeat_msg}"
)
except
websocket
.
WebSocketException
as
e
:
print
(
f
"Error sending heartbeat: {e}"
)
break
time
.
sleep
(
5
)
def
client_task
():
"""每个客户端的任务函数"""
ws
=
websocket
.
WebSocket
()
try
:
ws
.
connect
(
SERVER_URL
+
"?fromId="
+
generate_random_id
()
+
"&deviceId="
+
generate_random_id
()
+
"&wsPwd={}"
.
format
(
WS_PWD
))
print
(
"Connected to the server"
)
# 启动心跳发送线程
heartbeat_thread
=
threading
.
Thread
(
target
=
send_heartbeat
,
args
=
(
ws
,))
heartbeat_thread
.
daemon
=
True
heartbeat_thread
.
start
()
# 随机 40 秒内关闭连接
close_time
=
random
.
randint
(
20
,
40
)
print
(
f
"Will close the connection in {close_time} seconds"
)
time
.
sleep
(
close_time
)
try
:
ws
.
close
()
print
(
"Connection closed"
)
except
websocket
.
WebSocketException
as
e
:
print
(
f
"Error closing connection: {e}"
)
except
websocket
.
WebSocketException
as
e
:
print
(
f
"Failed to connect to the server: {e}"
)
if
__name__
==
"__main__"
:
threads
=
[]
for
_
in
range
(
CLIENT_COUNT
):
thread
=
threading
.
Thread
(
target
=
client_task
)
threads
.
append
(
thread
)
thread
.
start
()
# 等待所有线程完成
for
thread
in
threads
:
thread
.
join
()
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment