Commit 5742ebd6 by qlintonger xeno

尝试多线程处理

parent 6dbb4cda
......@@ -13,3 +13,4 @@ futures = "0.3.31"
redis = "0.28.2"
redis_pool = "0.7.0"
lazy_static = "1.4"
dashmap = "4.0.0"
\ No newline at end of file
use crate::config::config::STATIC_WS_PWD;
use crate::events::{register_client, ClientMessage, Event, CLIENT_SENDERS};
use crate::handles::close_connection::handle_connection_error;
use crate::handles::handle_messages::handle_other_message;
use crate::deport::handle_ws_msg;
use crate::events::{register_client, ClientMessage, Event};
use crate::handles::handshake::handle_handshake;
use crate::handles::heartbeat::{handle_heartbeat, heart_resp};
use crate::handles::online_users_update::{send_online_users_resp, ServerOnlineUserMessage};
use crate::handles::redis::{insert_this_connection, remove_this_connection};
use crate::utils::json_utils::{get_current_timestamp, parse_message};
use futures::{SinkExt, StreamExt};
use crate::handles::online_users_update::send_online_users_and_send;
use dashmap::DashMap;
use futures::{StreamExt};
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::sync::Mutex;
use tokio::time;
use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message};
use tungstenite::Error;
lazy_static! {
pub static ref ONLINE_USERS: Arc<AsyncRwLock<HashMap<String, String>>> =
Arc::new(AsyncRwLock::new(HashMap::new()));
}
// 关闭之前绑定的 WebSocket 连接
async fn close_existing_connection(from_id: &str) {
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(from_id.clone());
}
if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
pub static ref ONLINE_USERS: DashMap<String, String> = DashMap::new();
}
pub(crate) async fn handle_client(
......@@ -71,169 +54,68 @@ pub(crate) async fn handle_client(
}
};
let (mut sender, mut receiver) = ws_stream.split();
let (sender, mut receiver) = ws_stream.split();
let sender = Arc::new(Mutex::new(sender)); // 包装为 Arc<Mutex<...>>
let last_heartbeat_time = Arc::new(Mutex::new(Instant::now())); // 包装为 Arc<Mutex<Instant>>
if let Some(params) = connection_params {
if let Some(from_id) = params.get("fromId") {
let from_id = from_id.clone();
let from_id_clone = from_id.clone();
// 检查 Redis 中是否已经存在该 fromId
close_existing_connection(&from_id).await;
// 将该用户的信息插入到 Redis 中
if let Err(e) = insert_this_connection(&from_id, &params).await {
println!("将用户信息插入到 Redis 中时出错: {}", e);
}
// 注册客户端到事件中心
register_client(from_id.clone(), center_to_client_sender).await;
// 发送新连接事件
event_sender
.send(Event::NewConnection(from_id.clone()))
.send(Event::NewConnection(from_id.clone(), params))
.unwrap();
let mut last_heartbeat_time = Instant::now();
let msg_check_time = Duration::from_millis(100); // 检查事件中心消息的时间间隔
let mut interval = time::interval(msg_check_time);
println!("断开与用户id: {},连接", from_id_clone);
loop {
let sender_clone = Arc::clone(&sender);
let from_id_clone = from_id.clone();
let event_sender_clone = event_sender.clone();
let last_heartbeat_time_clone = Arc::clone(&last_heartbeat_time);
tokio::select! {
// 处理消息接收
maybe_msg = receiver.next() => {
match maybe_msg {
Some(Ok(msg)) => {
if msg.is_text() {
let text = msg.to_text().unwrap();
match parse_message(text) {
Ok(data) => {
match data.msg_type.as_str() {
"Heart" => {
println!("收到客户端心跳消息 {:?}", &data);
handle_heartbeat(&mut last_heartbeat_time);
if let Ok(json_str) = heart_resp(&from_id_clone) {
if let Err(e) = sender.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e);
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone).await;
break;
}
}
},
"GetOnlineUserList" => {
println!("收到客户端获取在线用户列表 {:?}", &data);
if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone).await;
break;
}
},
_ => {
if let Err(e) = handle_other_message(&mut sender, &data, &from_id_clone).await {
println!("Failed to handle other message: {}", e);
// 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone).await;
break;
}
}
}
}
Err(e) => {
println!("解析JSON数据出错: {}", e);
}
}
}
}
Some(Err(e)) => {
println!("接受客户端消息出错: {}", e);
// 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone).await;
break;
}
None => {
println!("客户端断开连接");
// 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone).await;
break;
}
if let Some(msg) = maybe_msg {
tokio::spawn(handle_ws_msg(
msg?,
sender_clone,
from_id_clone,
event_sender_clone,
last_heartbeat_time_clone,
));
}
}
// 定期检查事件中心的消息
_ = interval.tick() => {
if let Ok(msg) = center_to_client_receiver.try_recv() {
msg = center_to_client_receiver.recv() => {
if let Some(msg) = msg {
println!("消息中心:==> 收到消息 {:?}", &msg);
match msg {
ClientMessage::CmdUpdateOnlineUsers => {
println!("消息中心:==> 收到 CmdUpdateOnlineUsers 消息 发送给 {}", &from_id_clone);
if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await {
if let Err(e) = send_online_users_and_send(&mut *sender_clone.lock().await, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e);
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone).await;
break;
}
}
_ => {}
}
}
}
// 处理心跳超时
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
_ = time::sleep_until(tokio::time::Instant::from(*last_heartbeat_time.lock().await) + Duration::from_secs(20)) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone);
// 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone).await;
break;
}
}
}
println!("断开与用户id: {},连接", from_id_clone);
}
} else {
println!("无法获取连接参数");
......@@ -241,27 +123,3 @@ pub(crate) async fn handle_client(
Ok(())
}
async fn send_online_users_and_send(
sender: &mut (impl SinkExt<Message, Error = Error> + std::marker::Unpin),
from_id: &str,
) -> Result<(), Error> {
let messages = send_online_users_resp()
.await
.map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
let json_message = serde_json::to_string(&ServerOnlineUserMessage {
msg_type: "CmdUpdateOnlineUsers".to_string(),
from_name: "Server".to_string(),
from_id: "0".to_string(),
to_id: from_id.to_string(),
msg_data: messages,
time: get_current_timestamp(),
})
.map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
if let Err(e) = sender.send(Message::text(json_message)).await {
println!("发送在线用户列表消息给用户 {} 失败: {}", from_id, e);
return Err(e);
}
println!("发送在线用户列表消息给用户 {} 成功", from_id);
Ok(())
}
use std::sync::{Arc};
use std::time::Instant;
use futures::SinkExt;
use crate::events::Event;
use futures::stream::SplitSink;
use tokio::net::TcpStream;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
use crate::handles::handle_messages::handle_other_message;
use crate::handles::heartbeat::{heart_resp};
use crate::handles::online_users_update::send_online_users_and_send;
use crate::utils::json_utils::parse_message;
pub async fn handle_ws_msg(
msg: Message,
sender: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
from_id: String,
event_sender: UnboundedSender<Event>,
last_heartbeat_time: Arc<Mutex<Instant>>,
) {
if msg.is_text() {
let text = msg.to_text().unwrap();
match parse_message(text) {
Ok(data) => {
match data.msg_type.as_str() {
"Heart" => {
println!("收到客户端心跳消息 {:?}", &data);
*last_heartbeat_time.lock().await = Instant::now();
if let Ok(json_str) = heart_resp(&from_id) {
let mut sender = sender.lock().await;
if let Err(e) = sender.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id.clone())).unwrap();
}
}
},
"GetOnlineUserList" => {
println!("收到客户端获取在线用户列表 {:?}", &data);
let mut sender = sender.lock().await;
if let Err(e) = send_online_users_and_send(&mut *sender, &from_id).await {
println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id.clone())).unwrap();
}
},
_ => {
let mut sender = sender.lock().await;
if let Err(e) = handle_other_message(&mut *sender, &data, &from_id).await {
println!("Failed to handle other message: {}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id.clone())).unwrap();
}
}
}
}
Err(e) => {
println!("解析JSON数据出错: {}", e);
}
}
}
}
\ No newline at end of file
use crate::handles::redis::{insert_this_connection, remove_this_connection};
use dashmap::DashMap;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
// 定义事件类型
#[derive(Debug)]
pub enum Event {
NewConnection(String),
NewConnection(String, HashMap<String, String>),
CloseConnection(String),
UpdateOnlineUsers,
}
// 定义事件中心发送给客户端的消息类型
#[derive(Debug)]
pub enum ClientMessage {
CmdUpdateOnlineUsers,
SendMessage(String),
}
lazy_static! {
// 存储每个客户端的发送者,用于事件中心向客户端发送消息
pub static ref CLIENT_SENDERS: Arc<RwLock<HashMap<String, mpsc::Sender<ClientMessage>>>> =
Arc::new(RwLock::new(HashMap::new()));
pub static ref CLIENT_SENDERS: DashMap<String, mpsc::Sender<ClientMessage>> = DashMap::new();
}
// 注册客户端的发送者
......@@ -30,39 +27,55 @@ pub async fn register_client(
from_id: String,
center_to_client_sender: mpsc::Sender<ClientMessage>,
) {
let mut senders = CLIENT_SENDERS.write().await;
println!("注册客户端的发送者数量---》注册前: {:?}", &senders.len());
senders.insert(from_id, center_to_client_sender);
println!("注册客户端的发送者数量---》注册后: {:?}", &senders.len());
close_existing_connection(&from_id).await;
println!("注册用户 {} 前数量 {}", &from_id, CLIENT_SENDERS.len());
CLIENT_SENDERS.insert(from_id.clone(), center_to_client_sender);
println!(
"注册用户 {} 后数量 {}",
from_id.clone(),
CLIENT_SENDERS.len()
);
}
// 关闭之前绑定的 WebSocket 连接
pub async fn close_existing_connection(from_id: &str) {
CLIENT_SENDERS.remove(from_id);
if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
}
// 处理事件的任务
pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
while let Some(event) = receiver.recv().await {
match event {
Event::NewConnection(from_id) => {
Event::NewConnection(from_id, params) => {
println!("有新的连接 用户id {} 更新在线用户列表事件触发", from_id);
// 将该用户的信息插入到 Redis 中
if let Err(e) = insert_this_connection(&from_id, &params).await {
println!("将用户信息插入到 Redis 中时出错: {}", e);
}
notify_all_clients_to_update_online_users().await;
}
Event::CloseConnection(from_id) => {
println!("有关闭的连接 用户id {} 更新在线用户列表事件触发", from_id);
close_existing_connection(&from_id).await;
notify_all_clients_to_update_online_users().await;
}
Event::UpdateOnlineUsers => {
// 这里可以实现其他触发更新在线用户列表的逻辑
// 为简单起见,暂未详细实现
println!("更新在线用户列表事件触发");
}
}
}
}
// 通知所有客户端线程发送 CmdUpdateOnlineUsers 消息
async fn notify_all_clients_to_update_online_users() {
let senders = CLIENT_SENDERS.read().await;
for (from_id, sender) in senders.iter() {
println!("尝试发送在新用户更新消息体,目前总人数为:{}", CLIENT_SENDERS.len());
for entry in CLIENT_SENDERS.iter() {
let sender: &mpsc::Sender<ClientMessage> = entry.value();
let from_id = entry.key();
if let Err(e) = sender.send(ClientMessage::CmdUpdateOnlineUsers).await {
println!("通知客户端 {} 更新在线用户列表失败: {:?}", from_id, e);
} else {
println!("通知客户端 {} 更新在线用户列表成功 ===> $$$", from_id);
}
}
}
use crate::handles::redis::remove_this_connection;
pub async fn handle_connection_error(from_id: &str) {
println!("开始处理用户id: {} 的连接错误", from_id);
// 从 Redis 中移除该用户的信息
if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
}
......@@ -8,7 +8,6 @@ pub(crate) async fn handle_other_message(
data: &ClientMessageData,
from_id: &str,
) -> Result<(), Error> {
println!("收到客户端消息: {:?}", data);
println!("收到客户端消息: {:?} 来自用户id: {}", data, &from_id);
Ok(())
}
use crate::utils::json_utils;
use serde::{Deserialize, Serialize};
use std::time::Instant;
pub fn handle_heartbeat(last_time: &mut Instant) {
*last_time = Instant::now();
}
// 定义服务器心跳数据消息结构
#[derive(Serialize, Deserialize, Debug)]
......
pub mod close_connection;
pub mod handle_messages;
pub mod handshake;
pub mod heartbeat;
pub mod online_users_update;
pub mod redis;
mod handle_agora_call;
use futures::SinkExt;
use crate::client::ONLINE_USERS;
use serde::{Deserialize, Serialize};
use tungstenite::{Error, Message};
use crate::utils::json_utils::get_current_timestamp;
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct OnlineUserMessage {
......@@ -39,13 +42,35 @@ pub struct ServerOnlineUserMessage {
pub time: u128,
}
pub async fn send_online_users_and_send(
sender: &mut (impl SinkExt<Message, Error = Error> + Unpin),
from_id: &str,
) -> Result<(), Error> {
let messages = send_online_users_resp()
.await
.map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
let json_message = serde_json::to_string(&ServerOnlineUserMessage {
msg_type: "CmdUpdateOnlineUsers".to_string(),
from_name: "Server".to_string(),
from_id: "0".to_string(),
to_id: from_id.to_string(),
msg_data: messages,
time: get_current_timestamp(),
})
.map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
if let Err(e) = sender.send(Message::text(json_message)).await {
println!("发送在线用户列表消息给用户 {} 失败: {}", from_id, e);
return Err(e);
}
println!("发送在线用户列表消息给用户 {} 成功", from_id);
Ok(())
}
pub async fn send_online_users_resp() -> Result<Vec<OnlineUserMessage>, serde_json::Error> {
let mut msg_data = Vec::new();
{
let online_users = ONLINE_USERS.read().await;
println!("当前所有用户数据信息 ONLINE_USERS: {:?}", online_users);
for (user_id, user_info_str) in online_users.iter() {
for entry in ONLINE_USERS.iter() {
let user_info_str: &String = entry.value();
let parts: Vec<&str> = user_info_str.split(',').collect();
if parts.len() == 9 {
let user_msg = OnlineUserMessage {
......
......@@ -25,9 +25,7 @@ pub async fn remove_this_connection(from_id: &str) -> Result<(), redis::RedisErr
}
{
// 获取写锁以进行写操作
let mut online_users = ONLINE_USERS.write().await;
online_users.remove(from_id);
ONLINE_USERS.remove(from_id);
println!("成功从全局变量中移除用户id: {} 的信息", from_id);
}
Ok(())
......@@ -62,8 +60,7 @@ pub async fn insert_this_connection(
{
// 获取写锁以进行写操作
let mut online_users = ONLINE_USERS.write().await;
online_users.insert(from_id.to_string(), user_info_str);
ONLINE_USERS.insert(from_id.to_string(), user_info_str);
println!("成功将用户id: {} 的信息插入到 ONLINE_USERS 中", from_id);
}
Ok(())
......
......@@ -6,6 +6,7 @@ mod events;
mod handles;
mod typing;
mod utils;
mod deport;
use crate::events::handle_events;
use client::handle_client;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment