Commit 13b576df by qlintonger xeno

尝试使用事件机制来实现多线程沟通+1

parent b6126d1e
use crate::config::config::STATIC_WS_PWD; use crate::config::config::STATIC_WS_PWD;
use crate::events::{Event}; use crate::events::{Event, register_client, ClientMessage};
use crate::handles::close_connection::handle_connection_error; use crate::handles::close_connection::handle_connection_error;
use crate::handles::handle_messages::handle_other_message; use crate::handles::handle_messages::handle_other_message;
use crate::handles::handshake::handle_handshake; use crate::handles::handshake::handle_handshake;
use crate::handles::heartbeat::{handle_heartbeat, heart_resp}; use crate::handles::heartbeat::{handle_heartbeat, heart_resp};
use crate::handles::online_users_update::send_online_users_resp; use crate::handles::online_users_update::send_online_users_resp;
use crate::handles::redis::{insert_this_connection, remove_this_connection}; use crate::handles::redis::{insert_this_connection, remove_this_connection};
use crate::typing::used_typed::{Connection, ConnectionMap};
use crate::utils::json_utils::parse_message; use crate::utils::json_utils::parse_message;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::mpsc::{UnboundedSender, Receiver};
use tokio::sync::{mpsc, RwLock as AsyncRwLock}; use tokio::sync::{mpsc, RwLock as AsyncRwLock};
use tokio::sync::mpsc::UnboundedSender;
use tokio::time; use tokio::time;
use tokio_tungstenite::accept_hdr_async; use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response}; use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
lazy_static! { lazy_static! {
pub static ref CONNECTIONS: ConnectionMap = Arc::new(AsyncRwLock::new(HashMap::new()));
pub static ref ONLINE_USERS: Arc<AsyncRwLock<HashMap<String, String>>> = pub static ref ONLINE_USERS: Arc<AsyncRwLock<HashMap<String, String>>> =
Arc::new(AsyncRwLock::new(HashMap::new())); Arc::new(AsyncRwLock::new(HashMap::new()));
} }
// 关闭之前绑定的 WebSocket 连接 // 关闭之前绑定的 WebSocket 连接
async fn close_existing_connection(from_id: &str) { async fn close_existing_connection(event_sender: &UnboundedSender<Event>, from_id: &str) {
let old_connection = { event_sender.send(Event::CloseConnection(from_id.to_string())).unwrap();
let mut connections = CONNECTIONS.write().await;
let already_done = connections.get(&from_id.to_string());
println!(
"关闭之前绑定的 WebSocket 连接: {} {:?}",
from_id, already_done
);
connections.remove(from_id)
};
if let Some(mut old_connection) = old_connection {
// 尝试优雅地关闭旧连接
if let Err(e) = old_connection.sender.close().await {
println!("关闭旧的 WebSocket 发送器时出错: {}", e);
}
println!("关闭旧的 WebSocket 连接: {}", from_id);
}
// 从 Redis 中移除该用户的信息
if let Err(e) = remove_this_connection(from_id).await { if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e); println!("从 Redis 中移除用户信息时出错: {}", e);
} }
} }
// 添加连接到全局连接映射 pub(crate) async fn handle_client(
async fn add_connection(from_id: String, connection: Connection) { stream: tokio::net::TcpStream,
let mut connections = CONNECTIONS.write().await; event_sender: &UnboundedSender<Event>,
connections.insert(from_id, connection); mut center_to_client_receiver: Receiver<ClientMessage>,
} ) -> Result<(), Error> {
// 从全局连接映射中移除连接
async fn remove_connection(from_id: &str) {
let mut connections = CONNECTIONS.write().await;
connections.remove(from_id);
}
pub(crate) async fn handle_client(stream: tokio::net::TcpStream,
event_sender: UnboundedSender<Event>,
) -> Result<(), Error> {
let must_existed_params = ["deviceId", "fromId", "wsPwd"]; let must_existed_params = ["deviceId", "fromId", "wsPwd"];
let mut connection_params = None; let mut connection_params = None;
...@@ -96,52 +66,30 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream, ...@@ -96,52 +66,30 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream,
} }
}; };
let (sender, receiver) = ws_stream.split(); let (mut sender, mut receiver) = ws_stream.split();
if let Some(params) = connection_params { if let Some(params) = connection_params {
if let Some(from_id) = params.get("fromId") { if let Some(from_id) = params.get("fromId") {
let from_id = from_id.clone(); let from_id = from_id.clone();
let from_id_clone = from_id.clone(); // 克隆一份 from_id 用于闭包 let from_id_clone = from_id.clone();
// 检查 Redis 中是否已经存在该 fromId // 检查 Redis 中是否已经存在该 fromId
close_existing_connection(&from_id).await; close_existing_connection(&event_sender, &from_id).await;
// 将该用户的信息插入到 Redis 中 // 将该用户的信息插入到 Redis 中
if let Err(e) = insert_this_connection(&from_id, &params).await { if let Err(e) = insert_this_connection(&from_id, &params).await {
println!("将用户信息插入到 Redis 中时出错: {}", e); println!("将用户信息插入到 Redis 中时出错: {}", e);
} }
// 将新连接添加到全局连接映射 // 发送新连接事件
add_connection(from_id.clone(), Connection { sender, receiver }).await; event_sender.send(Event::NewConnection(from_id.clone())).unwrap();
event_sender
.send(Event::NewConnection(from_id.clone()))
.unwrap();
// 准备更新用户链接
if let Err(e) = send_online_users_resp().await {
println!(
"在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}",
&from_id, e
);
} else {
println!("广播消息 来源id {} 成功", &from_id);
}
let mut last_heartbeat_time = Instant::now(); let mut last_heartbeat_time = Instant::now();
loop {
let mut connection_opt = None;
{
let mut connections = CONNECTIONS.write().await;
connection_opt = connections.remove(&from_id_clone);
}
if let Some(connection) = connection_opt { loop {
let (mut receiver_ref, mut sender_ref) =
(connection.receiver, connection.sender);
tokio::select! { tokio::select! {
// 处理消息接收 // 处理消息接收
maybe_msg = receiver_ref.next() => { maybe_msg = receiver.next() => {
match maybe_msg { match maybe_msg {
Some(Ok(msg)) => { Some(Ok(msg)) => {
if msg.is_text() { if msg.is_text() {
...@@ -153,11 +101,11 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream, ...@@ -153,11 +101,11 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream,
println!("收到客户端心跳消息 {:?}", &data); println!("收到客户端心跳消息 {:?}", &data);
handle_heartbeat(&mut last_heartbeat_time); handle_heartbeat(&mut last_heartbeat_time);
if let Ok(json_str) = heart_resp(&from_id_clone) { if let Ok(json_str) = heart_resp(&from_id_clone) {
if let Err(e) = sender_ref.send(Message::text(json_str)).await { if let Err(e) = sender.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e); println!("发送心跳信息失败: {}", e);
// 发送关闭连接事件 // 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await; handle_connection_error(&from_id_clone, &event_sender).await;
break; break;
} }
} }
...@@ -168,16 +116,16 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream, ...@@ -168,16 +116,16 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream,
println!("处理在线用户列表出错了:{:?}", e); println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件 // 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await; handle_connection_error(&from_id_clone, &event_sender).await;
break; break;
} }
}, },
_ => { _ => {
if let Err(e) = handle_other_message(&mut sender_ref, &data, &from_id_clone).await { if let Err(e) = handle_other_message(&mut sender, &data, &from_id_clone).await {
println!("Failed to handle other message: {}", e); println!("Failed to handle other message: {}", e);
// 发送关闭连接事件 // 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await; handle_connection_error(&from_id_clone, &event_sender).await;
break; break;
} }
} }
...@@ -193,14 +141,14 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream, ...@@ -193,14 +141,14 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream,
println!("接受客户端消息出错: {}", e); println!("接受客户端消息出错: {}", e);
// 发送关闭连接事件 // 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await; handle_connection_error(&from_id_clone, &event_sender).await;
break; break;
} }
None => { None => {
println!("客户端断开连接"); println!("客户端断开连接");
// 发送关闭连接事件 // 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await; handle_connection_error(&from_id_clone, &event_sender).await;
break; break;
} }
} }
...@@ -210,26 +158,31 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream, ...@@ -210,26 +158,31 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream,
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone); println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone);
// 发送关闭连接事件 // 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &CONNECTIONS).await; handle_connection_error(&from_id_clone, &event_sender).await;
break; break;
} }
} // 处理来自事件中心的消息
// 处理完一轮后,若连接还未断开,将其重新放回 CONNECTIONS maybe_msg = center_to_client_receiver.recv() => {
add_connection( if let Some(msg) = maybe_msg {
from_id_clone.clone(), match msg {
Connection { ClientMessage::CmdUpdateOnlineUsers => {
sender: sender_ref, if let Err(e) = send_online_users_resp().await {
receiver: receiver_ref, println!("处理在线用户列表出错了:{:?}", e);
}, // 发送关闭连接事件
) event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
.await; handle_connection_error(&from_id_clone, &event_sender).await;
} else {
break; break;
} }
} }
_ => {}
}
}
}
}
}
println!("断开与用户id: {},连接", from_id_clone); println!("断开与用户id: {},连接", from_id_clone);
// 从全局连接映射中移除该连接 // 发送关闭连接事件
remove_connection(&from_id_clone).await; event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
// 从 Redis 中移除该用户的信息 // 从 Redis 中移除该用户的信息
if let Err(e) = remove_this_connection(&from_id_clone).await { if let Err(e) = remove_this_connection(&from_id_clone).await {
println!("从 Redis 中移除用户信息时出错: {}", e); println!("从 Redis 中移除用户信息时出错: {}", e);
......
use crate::handles::online_users_update::send_online_users_resp; use crate::handles::online_users_update::{send_online_users_resp};
use crate::typing::used_typed::Connection;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use futures::SinkExt;
use lazy_static::lazy_static;
use tungstenite::{Error, Message};
// 假设的用户详细信息结构体 // 假设的用户详细信息结构体
#[derive(Debug)] #[derive(Debug)]
...@@ -15,35 +22,69 @@ pub enum Event { ...@@ -15,35 +22,69 @@ pub enum Event {
UpdateOnlineUsers, UpdateOnlineUsers,
} }
// 定义事件中心发送给客户端的消息类型
#[derive(Debug)]
pub enum ClientMessage {
CmdUpdateOnlineUsers,
SendMessage(String),
}
lazy_static! {
// 存储每个客户端的发送者,用于事件中心向客户端发送消息
pub static ref CLIENT_SENDERS: Arc<RwLock<HashMap<String, mpsc::Sender<ClientMessage>>>> =
Arc::new(RwLock::new(HashMap::new()));
}
// 注册客户端的发送者
pub async fn register_client(_event_sender: mpsc::UnboundedSender<Event>, center_to_client_sender: mpsc::Sender<ClientMessage>) {
// 这里假设可以从某个地方获取到客户端的 from_id
let from_id = "dummy_id".to_string(); // 实际需要替换为真实的 from_id
let mut senders = CLIENT_SENDERS.write().await;
senders.insert(from_id, center_to_client_sender);
}
// 处理事件的任务 // 处理事件的任务
pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) { pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
while let Some(event) = receiver.recv().await { while let Some(event) = receiver.recv().await {
match event { match event {
Event::NewConnection(from_id) => { Event::NewConnection(from_id) => {
println!("新连接: {}", from_id); println!("新连接: {}", from_id);
// 这里假设在其他地方有获取所有连接发送器的逻辑,暂时省略
// 实际应用中,需要实现向所有连接发送消息的功能
if let Err(e) = send_online_users_resp().await {
println!(
"在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}",
&from_id, e
);
}
} }
Event::CloseConnection(from_id) => { Event::CloseConnection(from_id) => {
println!("关闭连接: {}", from_id); println!("关闭连接: {}", from_id);
if let Err(e) = send_online_users_resp().await {
println!( // 移除客户端的发送者
"在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", let mut senders = CLIENT_SENDERS.write().await;
&from_id, e senders.remove(&from_id);
);
}
} }
Event::UpdateOnlineUsers => { Event::UpdateOnlineUsers => {
// 这里可以实现其他触发更新在线用户列表的逻辑 // 这里可以实现其他触发更新在线用户列表的逻辑
// 为简单起见,暂未详细实现 // 为简单起见,暂未详细实现
println!("更新在线用户列表事件触发"); println!("更新在线用户列表事件触发");
}
}
}
}
// 向指定客户端发送消息
async fn send_message_to_client(from_id: &str, message: String) -> Result<(), Error> {
let senders = CLIENT_SENDERS.read().await;
if let Some(sender) = senders.get(from_id) {
if let Err(e) = sender.send(ClientMessage::SendMessage(message)).await {
return Err(Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)));
}
} }
Ok(())
}
// 通知所有客户端线程发送 CmdUpdateOnlineUsers 消息
async fn notify_all_clients_to_update_online_users() {
let senders = CLIENT_SENDERS.read().await;
for (_, sender) in senders.iter() {
if let Err(e) = sender.send(ClientMessage::CmdUpdateOnlineUsers).await {
println!("通知客户端更新在线用户列表失败: {:?}", e);
} }
} }
} }
\ No newline at end of file
use crate::events::Event;
use crate::handles::online_users_update::send_online_users_resp; use crate::handles::online_users_update::send_online_users_resp;
use crate::handles::redis::remove_this_connection; use crate::handles::redis::remove_this_connection;
use crate::typing::used_typed::{ConnectionMap}; use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::RwLockWriteGuard;
pub async fn handle_connection_error( pub async fn handle_connection_error(
from_id: &str, from_id: &str,
connections: &ConnectionMap, event_sender: &UnboundedSender<Event>,
) { ) {
println!("开始处理用户id: {} 的连接错误", from_id); println!("开始处理用户id: {} 的连接错误", from_id);
// 从全局连接映射中移除该连接
let removed = {
let mut connections: RwLockWriteGuard<'_, _> = connections.write().await;
println!("清除之前的keys {:?}", connections.keys());
let result = connections.remove(from_id).is_some();
println!("清除之后的keys {:?}", connections.keys());
result
};
println!( // 发送关闭连接事件
"是否成功从全局连接映射中移除用户id: {},结果: {}", event_sender.send(Event::CloseConnection(from_id.to_string())).unwrap();
from_id, removed
);
// 从 Redis 中移除该用户的信息 // 从 Redis 中移除该用户的信息
if removed {
if let Err(e) = remove_this_connection(from_id).await { if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e); println!("从 Redis 中移除用户信息时出错: {}", e);
} }
}
// 准备更新用户链接 // 准备更新用户链接
if let Err(e) = send_online_users_resp().await { if let Err(e) = send_online_users_resp().await {
println!("在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", &from_id,e); println!("在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", from_id, e);
} else { } else {
println!("成功将用户id {} 退出广播至其余用户", &from_id); println!("成功将用户id {} 退出广播至其余用户", from_id);
} }
} }
\ No newline at end of file
use crate::client::{CONNECTIONS, ONLINE_USERS}; use crate::client::ONLINE_USERS;
use crate::utils::json_utils::get_current_timestamp; use crate::utils::json_utils::get_current_timestamp;
use futures::SinkExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tungstenite::{Error, Message};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct OnlineUserMessage { pub struct OnlineUserMessage {
...@@ -42,13 +40,14 @@ pub struct OnlineUsersMessage { ...@@ -42,13 +40,14 @@ pub struct OnlineUsersMessage {
pub time: u128, pub time: u128,
} }
pub async fn send_online_users_resp() -> Result<Vec<(String, String)>, serde_json::Error> {
pub async fn send_online_users_resp() -> Result<(), Error> {
let mut msg_data = Vec::new(); let mut msg_data = Vec::new();
let mut user_ids = Vec::new();
{ {
let online_users = ONLINE_USERS.read().await; let online_users = ONLINE_USERS.read().await;
println!("当前所有用户数据信息 ONLINE_USERS: {:?}", online_users); println!("当前所有用户数据信息 ONLINE_USERS: {:?}", online_users);
for (_, user_info_str) in online_users.iter() { for (user_id, user_info_str) in online_users.iter() {
let parts: Vec<&str> = user_info_str.split(',').collect(); let parts: Vec<&str> = user_info_str.split(',').collect();
if parts.len() == 9 { if parts.len() == 9 {
let user_msg = OnlineUserMessage { let user_msg = OnlineUserMessage {
...@@ -63,14 +62,14 @@ pub async fn send_online_users_resp() -> Result<(), Error> { ...@@ -63,14 +62,14 @@ pub async fn send_online_users_resp() -> Result<(), Error> {
from_name: parts[8].to_string(), from_name: parts[8].to_string(),
}; };
msg_data.push(user_msg); msg_data.push(user_msg);
user_ids.push(user_id.clone());
} }
} }
} }
let mut connections = CONNECTIONS.write().await; let mut messages = Vec::new();
println!("当前的connections的keys是 {:?}", connections.keys()); for user_id in user_ids {
for (from_id, connection) in connections.iter_mut() { let to_id = user_id.clone();
let to_id = from_id.to_string();
let message = OnlineUsersMessage { let message = OnlineUsersMessage {
msg_type: "CmdUpdateOnlineUsers".to_string(), msg_type: "CmdUpdateOnlineUsers".to_string(),
from_id: "0".to_string(), from_id: "0".to_string(),
...@@ -80,16 +79,9 @@ pub async fn send_online_users_resp() -> Result<(), Error> { ...@@ -80,16 +79,9 @@ pub async fn send_online_users_resp() -> Result<(), Error> {
time: get_current_timestamp(), time: get_current_timestamp(),
}; };
let json_result = serde_json::to_string(&message); let json = serde_json::to_string(&message)?;
if let Err(e) = json_result { messages.push((user_id, json));
println!("序列化消息给用户 {} 失败: {}", from_id.to_string(), e);
continue;
} }
let json = json_result.unwrap();
if let Err(e) = connection.sender.send(Message::text(json)).await { Ok(messages)
println!("发送消息给用户 {} 失败: {}", from_id.to_string(), e);
}
}
Ok(())
} }
\ No newline at end of file
...@@ -21,6 +21,11 @@ async fn main() { ...@@ -21,6 +21,11 @@ async fn main() {
// 启动事件处理任务 // 启动事件处理任务
tokio::spawn(handle_events(event_receiver)); tokio::spawn(handle_events(event_receiver));
while let Ok((stream, _)) = listener.accept().await { while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(handle_client(stream, event_sender.clone())); let client_event_sender = event_sender.clone();
// 创建一个用于事件中心向客户端发送消息的通道
let (center_to_client_sender, center_to_client_receiver) = mpsc::channel(10);
tokio::spawn(handle_client(stream, &client_event_sender, center_to_client_receiver));
// 将客户端的发送者存储到事件中心,以便事件中心可以向客户端发送消息
events::register_client(client_event_sender, center_to_client_sender).await;
} }
} }
\ No newline at end of file
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::sync::RwLock as AsyncRwLock;
use tokio::task::JoinHandle;
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
use tungstenite::Message; use tungstenite::Message;
...@@ -11,6 +7,3 @@ pub struct Connection { ...@@ -11,6 +7,3 @@ pub struct Connection {
pub(crate) sender: futures::stream::SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, pub(crate) sender: futures::stream::SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
pub(crate) receiver: futures::stream::SplitStream<WebSocketStream<tokio::net::TcpStream>>, pub(crate) receiver: futures::stream::SplitStream<WebSocketStream<tokio::net::TcpStream>>,
} }
\ No newline at end of file
pub type ConnectionMap = Arc<AsyncRwLock<HashMap<String, Connection>>>;
pub type TaskMap = Arc<RwLock<HashMap<String, JoinHandle<()>>>>;
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment