Commit 1b49c3a6 by qlintonger xeno

尝试使用事件机制来实现多线程沟通+1.5

parent 13b576df
......@@ -12,7 +12,7 @@ use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{UnboundedSender, Receiver};
use tokio::sync::mpsc::{UnboundedSender, Sender, Receiver};
use tokio::sync::{mpsc, RwLock as AsyncRwLock};
use tokio::time;
use tokio_tungstenite::accept_hdr_async;
......@@ -34,7 +34,8 @@ async fn close_existing_connection(event_sender: &UnboundedSender<Event>, from_i
pub(crate) async fn handle_client(
stream: tokio::net::TcpStream,
event_sender: &UnboundedSender<Event>,
event_sender: UnboundedSender<Event>,
center_to_client_sender: Sender<ClientMessage>,
mut center_to_client_receiver: Receiver<ClientMessage>,
) -> Result<(), Error> {
let must_existed_params = ["deviceId", "fromId", "wsPwd"];
......@@ -81,6 +82,9 @@ pub(crate) async fn handle_client(
println!("将用户信息插入到 Redis 中时出错: {}", e);
}
// 注册客户端到事件中心
register_client(from_id.clone(), center_to_client_sender).await;
// 发送新连接事件
event_sender.send(Event::NewConnection(from_id.clone())).unwrap();
......@@ -112,7 +116,7 @@ pub(crate) async fn handle_client(
},
"GetOnlineUserList" => {
println!("收到客户端获取在线用户列表 {:?}", &data);
if let Err(e) = send_online_users_resp().await {
if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
......@@ -166,7 +170,7 @@ pub(crate) async fn handle_client(
if let Some(msg) = maybe_msg {
match msg {
ClientMessage::CmdUpdateOnlineUsers => {
if let Err(e) = send_online_users_resp().await {
if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
......@@ -194,3 +198,14 @@ pub(crate) async fn handle_client(
Ok(())
}
async fn send_online_users_and_send(sender: &mut (impl SinkExt<Message, Error = Error> + std::marker::Unpin), from_id: &str) -> Result<(), Error> {
let messages = send_online_users_resp().await.map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
for (_, json) in messages {
if let Err(e) = sender.send(Message::text(json)).await {
println!("发送在线用户列表消息给用户 {} 失败: {}", from_id, e);
return Err(e);
}
}
Ok(())
}
\ No newline at end of file
use crate::handles::online_users_update::{send_online_users_resp};
use crate::typing::used_typed::Connection;
use tokio::sync::mpsc;
use futures::SinkExt;
use lazy_static::lazy_static;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio::sync::RwLock;
use futures::SinkExt;
use lazy_static::lazy_static;
use tungstenite::{Error, Message};
use tungstenite::Error;
// 假设的用户详细信息结构体
#[derive(Debug)]
......@@ -36,9 +34,7 @@ lazy_static! {
}
// 注册客户端的发送者
pub async fn register_client(_event_sender: mpsc::UnboundedSender<Event>, center_to_client_sender: mpsc::Sender<ClientMessage>) {
// 这里假设可以从某个地方获取到客户端的 from_id
let from_id = "dummy_id".to_string(); // 实际需要替换为真实的 from_id
pub async fn register_client(from_id: String, center_to_client_sender: mpsc::Sender<ClientMessage>) {
let mut senders = CLIENT_SENDERS.write().await;
senders.insert(from_id, center_to_client_sender);
}
......@@ -49,11 +45,9 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
match event {
Event::NewConnection(from_id) => {
println!("新连接: {}", from_id);
}
Event::CloseConnection(from_id) => {
println!("关闭连接: {}", from_id);
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
......@@ -62,7 +56,7 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
// 这里可以实现其他触发更新在线用户列表的逻辑
// 为简单起见,暂未详细实现
println!("更新在线用户列表事件触发");
notify_all_clients_to_update_online_users().await;
}
}
}
......
use crate::events::Event;
use crate::handles::online_users_update::send_online_users_resp;
use crate::handles::redis::remove_this_connection;
use tokio::sync::mpsc::UnboundedSender;
......@@ -17,10 +16,4 @@ pub async fn handle_connection_error(
println!("从 Redis 中移除用户信息时出错: {}", e);
}
// 准备更新用户链接
if let Err(e) = send_online_users_resp().await {
println!("在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", from_id, e);
} else {
println!("成功将用户id {} 退出广播至其余用户", from_id);
}
}
\ No newline at end of file
......@@ -24,8 +24,6 @@ async fn main() {
let client_event_sender = event_sender.clone();
// 创建一个用于事件中心向客户端发送消息的通道
let (center_to_client_sender, center_to_client_receiver) = mpsc::channel(10);
tokio::spawn(handle_client(stream, &client_event_sender, center_to_client_receiver));
// 将客户端的发送者存储到事件中心,以便事件中心可以向客户端发送消息
events::register_client(client_event_sender, center_to_client_sender).await;
tokio::spawn(handle_client(stream, client_event_sender, center_to_client_sender, center_to_client_receiver));
}
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment