Commit b6126d1e by qlintonger xeno

尝试使用事件机制来实现多线程沟通

parent 683aaa2a
use crate::config::config::STATIC_WS_PWD; use crate::config::config::STATIC_WS_PWD;
use crate::events::{Event};
use crate::handles::close_connection::handle_connection_error; use crate::handles::close_connection::handle_connection_error;
use crate::handles::handle_messages::handle_other_message; use crate::handles::handle_messages::handle_other_message;
use crate::handles::handshake::handle_handshake; use crate::handles::handshake::handle_handshake;
use crate::handles::heartbeat::{handle_heartbeat, heart_resp}; use crate::handles::heartbeat::{handle_heartbeat, heart_resp};
use crate::handles::online_users_update::send_online_users_resp;
use crate::handles::redis::{insert_this_connection, remove_this_connection}; use crate::handles::redis::{insert_this_connection, remove_this_connection};
use crate::typing::used_typed::{Connection, ConnectionMap, TaskMap}; use crate::typing::used_typed::{Connection, ConnectionMap};
use crate::utils::json_utils::parse_message; use crate::utils::json_utils::parse_message;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, RwLock}; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::{mpsc, RwLock as AsyncRwLock};
use tokio::sync::mpsc::UnboundedSender;
use tokio::time; use tokio::time;
use tokio_tungstenite::accept_hdr_async; use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response}; use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
use crate::handles::online_users_update::send_online_users_resp;
lazy_static! { lazy_static! {
pub static ref CONNECTIONS: ConnectionMap = Arc::new(AsyncRwLock::new(HashMap::new())); pub static ref CONNECTIONS: ConnectionMap = Arc::new(AsyncRwLock::new(HashMap::new()));
static ref TASKS: TaskMap = Arc::new(RwLock::new(HashMap::new())); pub static ref ONLINE_USERS: Arc<AsyncRwLock<HashMap<String, String>>> =
pub static ref ONLINE_USERS: Arc<AsyncRwLock<HashMap<String, String>>> = Arc::new(AsyncRwLock::new(HashMap::new())); Arc::new(AsyncRwLock::new(HashMap::new()));
} }
// 关闭之前绑定的 WebSocket 连接并取消对应的任务 // 关闭之前绑定的 WebSocket 连接
async fn close_existing_connection(from_id: &str) { async fn close_existing_connection(from_id: &str) {
let task_to_abort = {
let mut tasks = TASKS.write().unwrap();
tasks.remove(from_id)
};
if let Some(task) = task_to_abort {
task.abort();
}
let old_connection = { let old_connection = {
let mut connections = CONNECTIONS.write().await; let mut connections = CONNECTIONS.write().await;
let already_done = connections.get(&from_id.to_string()); let already_done = connections.get(&from_id.to_string());
...@@ -71,7 +64,9 @@ async fn remove_connection(from_id: &str) { ...@@ -71,7 +64,9 @@ async fn remove_connection(from_id: &str) {
connections.remove(from_id); connections.remove(from_id);
} }
pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> { pub(crate) async fn handle_client(stream: tokio::net::TcpStream,
event_sender: UnboundedSender<Event>,
) -> Result<(), Error> {
let must_existed_params = ["deviceId", "fromId", "wsPwd"]; let must_existed_params = ["deviceId", "fromId", "wsPwd"];
let mut connection_params = None; let mut connection_params = None;
...@@ -92,7 +87,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -92,7 +87,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
} }
}) })
.await .await
{ {
Ok(ws) => ws, Ok(ws) => ws,
Err(e) => { Err(e) => {
...@@ -106,7 +101,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -106,7 +101,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
if let Some(params) = connection_params { if let Some(params) = connection_params {
if let Some(from_id) = params.get("fromId") { if let Some(from_id) = params.get("fromId") {
let from_id = from_id.clone(); let from_id = from_id.clone();
let from_id_clone = from_id.clone(); // 新增:克隆一份 from_id 用于闭包 let from_id_clone = from_id.clone(); // 克隆一份 from_id 用于闭包
// 检查 Redis 中是否已经存在该 fromId // 检查 Redis 中是否已经存在该 fromId
close_existing_connection(&from_id).await; close_existing_connection(&from_id).await;
...@@ -119,110 +114,130 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -119,110 +114,130 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
// 将新连接添加到全局连接映射 // 将新连接添加到全局连接映射
add_connection(from_id.clone(), Connection { sender, receiver }).await; add_connection(from_id.clone(), Connection { sender, receiver }).await;
event_sender
.send(Event::NewConnection(from_id.clone()))
.unwrap();
// 准备更新用户链接 // 准备更新用户链接
if let Err(e) = send_online_users_resp().await { if let Err(e) = send_online_users_resp().await {
println!("在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", &from_id, e); println!(
"在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}",
&from_id, e
);
} else { } else {
println!("广播消息 来源id {} 成功", &from_id); println!("广播消息 来源id {} 成功", &from_id);
} }
let task = tokio::spawn(async move { let mut last_heartbeat_time = Instant::now();
let mut last_heartbeat_time = Instant::now(); loop {
loop { let mut connection_opt = None;
let mut connection_opt = None; {
{ let mut connections = CONNECTIONS.write().await;
let mut connections = CONNECTIONS.write().await; connection_opt = connections.remove(&from_id_clone);
connection_opt = connections.remove(&from_id_clone); }
}
if let Some(mut connection) = connection_opt { if let Some(connection) = connection_opt {
let (mut receiver_ref, mut sender_ref) = let (mut receiver_ref, mut sender_ref) =
(connection.receiver, connection.sender); (connection.receiver, connection.sender);
tokio::select! { tokio::select! {
// 处理消息接收 // 处理消息接收
maybe_msg = receiver_ref.next() => { maybe_msg = receiver_ref.next() => {
match maybe_msg { match maybe_msg {
Some(Ok(msg)) => { Some(Ok(msg)) => {
if msg.is_text() { if msg.is_text() {
let text = msg.to_text().unwrap(); let text = msg.to_text().unwrap();
match parse_message(text) { match parse_message(text) {
Ok(data) => { Ok(data) => {
match data.msg_type.as_str() { match data.msg_type.as_str() {
"Heart" => { "Heart" => {
println!("收到客户端心跳消息 {:?}", &data); println!("收到客户端心跳消息 {:?}", &data);
handle_heartbeat(&mut last_heartbeat_time); handle_heartbeat(&mut last_heartbeat_time);
if let Ok(json_str) = heart_resp(&from_id_clone) { if let Ok(json_str) = heart_resp(&from_id_clone) {
if let Err(e) = sender_ref.send(Message::text(json_str)).await { if let Err(e) = sender_ref.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e); println!("发送心跳信息失败: {}", e);
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await; // 发送关闭连接事件
break; event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
} handle_connection_error(&from_id_clone, &CONNECTIONS).await;
}
},
"GetOnlineUserList" => {
println!("收到客户端获取在线用户列表 {:?}", &data);
if let Err(e) = send_online_users_resp().await {
println!("处理在线用户列表出错了:{:?}", e);
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break;
}
},
_ => {
if let Err(e) = handle_other_message(&mut sender_ref, &data, &from_id_clone).await {
println!("Failed to handle other message: {}", e);
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break; break;
} }
} }
},
"GetOnlineUserList" => {
println!("收到客户端获取在线用户列表 {:?}", &data);
if let Err(e) = send_online_users_resp().await {
println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await;
break;
}
},
_ => {
if let Err(e) = handle_other_message(&mut sender_ref, &data, &from_id_clone).await {
println!("Failed to handle other message: {}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await;
break;
}
} }
} }
Err(e) => { }
println!("解析JSON数据出错: {}", e); Err(e) => {
} println!("解析JSON数据出错: {}", e);
} }
} }
} }
Some(Err(e)) => {
println!("接受客户端消息出错: {}", e);
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break;
}
None => {
println!("客户端断开连接");
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break;
}
} }
} Some(Err(e)) => {
// 处理心跳超时 println!("接受客户端消息出错: {}", e);
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => { // 发送关闭连接事件
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await; handle_connection_error(&from_id_clone, &CONNECTIONS).await;
break; break;
}
None => {
println!("客户端断开连接");
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await;
break;
}
} }
} }
// 处理完一轮后,若连接还未断开,将其重新放回 CONNECTIONS // 处理心跳超时
add_connection(from_id_clone.clone(), Connection { sender: sender_ref, receiver: receiver_ref }).await; _ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
} else { println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone);
break; // 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await;
break;
}
} }
// 处理完一轮后,若连接还未断开,将其重新放回 CONNECTIONS
add_connection(
from_id_clone.clone(),
Connection {
sender: sender_ref,
receiver: receiver_ref,
},
)
.await;
} else {
break;
} }
println!("断开与用户id: {},连接", from_id_clone); }
// 从全局连接映射中移除该连接 println!("断开与用户id: {},连接", from_id_clone);
remove_connection(&from_id_clone).await; // 从全局连接映射中移除该连接
// 从 Redis 中移除该用户的信息 remove_connection(&from_id_clone).await;
if let Err(e) = remove_this_connection(&from_id_clone).await { // 从 Redis 中移除该用户的信息
println!("从 Redis 中移除用户信息时出错: {}", e); if let Err(e) = remove_this_connection(&from_id_clone).await {
} println!("从 Redis 中移除用户信息时出错: {}", e);
}); }
// 将任务句柄存储到全局任务映射中
let mut tasks = TASKS.write().unwrap();
tasks.insert(from_id, task);
} }
} else { } else {
println!("无法获取连接参数"); println!("无法获取连接参数");
} }
Ok(()) Ok(())
} }
\ No newline at end of file
use crate::handles::online_users_update::send_online_users_resp;
use tokio::sync::mpsc;
// 假设的用户详细信息结构体
#[derive(Debug)]
pub struct UserDetails {
pub username: String,
}
// 定义事件类型
#[derive(Debug)]
pub enum Event {
NewConnection(String),
CloseConnection(String),
UpdateOnlineUsers,
}
// 处理事件的任务
pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
while let Some(event) = receiver.recv().await {
match event {
Event::NewConnection(from_id) => {
println!("新连接: {}", from_id);
// 这里假设在其他地方有获取所有连接发送器的逻辑,暂时省略
// 实际应用中,需要实现向所有连接发送消息的功能
if let Err(e) = send_online_users_resp().await {
println!(
"在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}",
&from_id, e
);
}
}
Event::CloseConnection(from_id) => {
println!("关闭连接: {}", from_id);
if let Err(e) = send_online_users_resp().await {
println!(
"在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}",
&from_id, e
);
}
}
Event::UpdateOnlineUsers => {
// 这里可以实现其他触发更新在线用户列表的逻辑
// 为简单起见,暂未详细实现
println!("更新在线用户列表事件触发");
}
}
}
}
use crate::handles::online_users_update::send_online_users_resp; use crate::handles::online_users_update::send_online_users_resp;
use crate::handles::redis::remove_this_connection; use crate::handles::redis::remove_this_connection;
use crate::typing::used_typed::{ConnectionMap, TaskMap}; use crate::typing::used_typed::{ConnectionMap};
use tokio::sync::RwLockWriteGuard; use tokio::sync::RwLockWriteGuard;
pub async fn handle_connection_error( pub async fn handle_connection_error(
from_id: &str, from_id: &str,
connections: &ConnectionMap, connections: &ConnectionMap,
tasks: &TaskMap,
) { ) {
println!("开始处理用户id: {} 的连接错误", from_id); println!("开始处理用户id: {} 的连接错误", from_id);
// 从全局连接映射中移除该连接 // 从全局连接映射中移除该连接
...@@ -30,22 +29,6 @@ pub async fn handle_connection_error( ...@@ -30,22 +29,6 @@ pub async fn handle_connection_error(
} }
} }
println!("开始尝试关闭用户id: {} 的 redis 连接", from_id);
// 取消对应的任务
{
let mut tasks = tasks.write().unwrap();
match tasks.remove(from_id) {
Some(task) => {
task.abort();
println!("成功取消用户id: {} 的任务", from_id);
}
None => println!("未在全局任务映射中找到用户id: {} 的任务", from_id),
}
}
println!("断开与用户id: {} 的连接并完成清理操作", from_id);
// 准备更新用户链接 // 准备更新用户链接
if let Err(e) = send_online_users_resp().await { if let Err(e) = send_online_users_resp().await {
println!("在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", &from_id,e); println!("在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", &from_id,e);
......
...@@ -5,15 +5,22 @@ mod config; ...@@ -5,15 +5,22 @@ mod config;
mod handles; mod handles;
mod typing; mod typing;
mod utils; mod utils;
mod events;
use crate::events::handle_events;
use client::handle_client; use client::handle_client;
use config::config::STATIC_ADDR as addr; use config::config::STATIC_ADDR as addr;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let listener = TcpListener::bind(addr).await.unwrap(); let listener = TcpListener::bind(addr).await.unwrap();
// 创建事件通道
let (event_sender, event_receiver) = mpsc::unbounded_channel();
// 启动事件处理任务
tokio::spawn(handle_events(event_receiver));
while let Ok((stream, _)) = listener.accept().await { while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(handle_client(stream)); tokio::spawn(handle_client(stream, event_sender.clone()));
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment