Commit 043688e3 by qlintonger xeno

完美解决CmdOnlineUserLists

parent 5742ebd6
......@@ -2,18 +2,18 @@ use crate::config::config::STATIC_WS_PWD;
use crate::deport::handle_ws_msg;
use crate::events::{register_client, ClientMessage, Event};
use crate::handles::handshake::handle_handshake;
use crate::handles::heartbeat::heart_resp;
use crate::handles::online_users_update::send_online_users_and_send;
use crate::utils::json_utils::parse_message;
use dashmap::DashMap;
use futures::{StreamExt};
use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
use tokio::sync::Mutex;
use tokio::time;
use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response};
use tungstenite::Error;
use tungstenite::{Error, Message};
lazy_static! {
pub static ref ONLINE_USERS: DashMap<String, String> = DashMap::new();
......@@ -54,14 +54,12 @@ pub(crate) async fn handle_client(
}
};
let (sender, mut receiver) = ws_stream.split();
let sender = Arc::new(Mutex::new(sender)); // 包装为 Arc<Mutex<...>>
let last_heartbeat_time = Arc::new(Mutex::new(Instant::now())); // 包装为 Arc<Mutex<Instant>>
let (mut sender, mut receiver) = ws_stream.split();
let mut last_heartbeat_time = Instant::now(); // 包装为 Arc<Mutex<Instant>>
if let Some(params) = connection_params {
if let Some(from_id) = params.get("fromId") {
let from_id = from_id.clone();
let from_id_clone = from_id.clone();
// 注册客户端到事件中心
register_client(from_id.clone(), center_to_client_sender).await;
......@@ -71,51 +69,90 @@ pub(crate) async fn handle_client(
.send(Event::NewConnection(from_id.clone(), params))
.unwrap();
println!("断开与用户id: {},连接", from_id_clone);
println!("用户 {} 已连接", from_id);
// 启动事件中心调度处理任务
let event_task = tokio::spawn({
let from_id = from_id.clone();
async move {
while let Some(msg) = center_to_client_receiver.recv().await {
println!("消息中心:==> 收到消息 {:?}", &msg);
match msg {
ClientMessage::CmdUpdateOnlineUsers => {
println!(
"消息中心:==> 收到 CmdUpdateOnlineUsers 消息 发送给 {}",
&from_id
);
if let Err(e) =
send_online_users_and_send(&mut sender, &from_id).await
{
println!("处理在线用户列表出错了:{:?}", e);
}
}
ClientMessage::SendClientMessage(
from_id,
client_message,
) => {
if let Err(e) = sender.send(Message::text(client_message)).await {
println!("发送给用户id {} 独立消息失败:{:?}", from_id.clone(), e);
}
}
}
}
}
});
// 处理 WebSocket 消息和心跳
loop {
let sender_clone = Arc::clone(&sender);
let from_id_clone = from_id.clone();
let event_sender_clone = event_sender.clone();
let last_heartbeat_time_clone = Arc::clone(&last_heartbeat_time);
tokio::select! {
// 处理消息接收
maybe_msg = receiver.next() => {
if let Some(msg) = maybe_msg {
tokio::spawn(handle_ws_msg(
msg?,
sender_clone,
from_id_clone,
event_sender_clone,
last_heartbeat_time_clone,
));
}
}
// 定期检查事件中心的消息
msg = center_to_client_receiver.recv() => {
if let Some(msg) = msg {
println!("消息中心:==> 收到消息 {:?}", &msg);
match msg {
ClientMessage::CmdUpdateOnlineUsers => {
println!("消息中心:==> 收到 CmdUpdateOnlineUsers 消息 发送给 {}", &from_id_clone);
if let Err(e) = send_online_users_and_send(&mut *sender_clone.lock().await, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
break;
if let Some(Ok(msg)) = maybe_msg {
if msg.is_text() {
let message = msg.to_text()?;
match parse_message(message) {
Ok(data) => {
match data.msg_type.as_str() {
"Heart" => {
println!("收到客户端心跳消息 {:?}", &data);
last_heartbeat_time = Instant::now();
if let Ok(json_str) = heart_resp(&from_id) {
event_sender.clone().send(Event::SendClientMessage(from_id.clone(), json_str)).unwrap();
}
},
_ => {
handle_ws_msg(data.msg_type, from_id.clone(), event_sender.clone());
}
}
}
Err(e) => {
println!("解析JSON数据出错: {}", e);
}
}
}
} else {
break; // 客户端断开连接
}
}
// 处理心跳超时
_ = time::sleep_until(tokio::time::Instant::from(*last_heartbeat_time.lock().await) + Duration::from_secs(20)) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone);
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
event_sender.send(Event::CloseConnection(from_id.clone())).unwrap();
break;
}
}
}
// 发送关闭连接事件
event_sender
.send(Event::CloseConnection(from_id.clone()))
.unwrap();
// 等待事件中心调度任务结束
if let Err(e) = event_task.await {
println!("事件中心调度任务出错: {:?}", e);
}
}
} else {
println!("无法获取连接参数");
......
use std::sync::{Arc};
use std::time::Instant;
use futures::SinkExt;
use crate::events::Event;
use futures::stream::SplitSink;
use tokio::net::TcpStream;
use tokio::sync::mpsc::UnboundedSender;
use tokio::sync::Mutex;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
use crate::handles::handle_messages::handle_other_message;
use crate::handles::heartbeat::{heart_resp};
use crate::handles::online_users_update::send_online_users_and_send;
use crate::utils::json_utils::parse_message;
pub async fn handle_ws_msg(
msg: Message,
sender: Arc<Mutex<SplitSink<WebSocketStream<TcpStream>, Message>>>,
pub fn handle_ws_msg(
msg_type: String,
from_id: String,
event_sender: UnboundedSender<Event>,
last_heartbeat_time: Arc<Mutex<Instant>>,
) {
if msg.is_text() {
let text = msg.to_text().unwrap();
match parse_message(text) {
Ok(data) => {
match data.msg_type.as_str() {
"Heart" => {
println!("收到客户端心跳消息 {:?}", &data);
*last_heartbeat_time.lock().await = Instant::now();
if let Ok(json_str) = heart_resp(&from_id) {
let mut sender = sender.lock().await;
if let Err(e) = sender.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id.clone())).unwrap();
}
}
},
"GetOnlineUserList" => {
println!("收到客户端获取在线用户列表 {:?}", &data);
let mut sender = sender.lock().await;
if let Err(e) = send_online_users_and_send(&mut *sender, &from_id).await {
println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id.clone())).unwrap();
}
},
_ => {
let mut sender = sender.lock().await;
if let Err(e) = handle_other_message(&mut *sender, &data, &from_id).await {
println!("Failed to handle other message: {}", e);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id.clone())).unwrap();
}
}
}
}
Err(e) => {
println!("解析JSON数据出错: {}", e);
}
match msg_type.as_str() {
"GetOnlineUserList" => {
println!("收到客户端获取在线用户列表 {}", &from_id);
},
_ => {
}
}
}
\ No newline at end of file
......@@ -9,12 +9,14 @@ use tokio::sync::mpsc;
pub enum Event {
NewConnection(String, HashMap<String, String>),
CloseConnection(String),
SendClientMessage(String, String),
}
// 定义事件中心发送给客户端的消息类型
#[derive(Debug)]
pub enum ClientMessage {
CmdUpdateOnlineUsers,
SendClientMessage(String, String),
}
lazy_static! {
......@@ -62,13 +64,29 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
close_existing_connection(&from_id).await;
notify_all_clients_to_update_online_users().await;
}
Event::SendClientMessage(from_id, msg) => {
let target_sender = CLIENT_SENDERS.get(&from_id).unwrap();
if let Err(e) = target_sender
.send(ClientMessage::SendClientMessage(
from_id, msg,
))
.await
{
println!("通知对应sender发送ws消息失败 {}", e);
} else {
println!("通知对应sender发送ws消息成功");
}
}
}
}
}
// 通知所有客户端线程发送 CmdUpdateOnlineUsers 消息
async fn notify_all_clients_to_update_online_users() {
println!("尝试发送在新用户更新消息体,目前总人数为:{}", CLIENT_SENDERS.len());
println!(
"尝试发送在新用户更新消息体,目前总人数为:{}",
CLIENT_SENDERS.len()
);
for entry in CLIENT_SENDERS.iter() {
let sender: &mpsc::Sender<ClientMessage> = entry.value();
let from_id = entry.key();
......
......@@ -43,7 +43,7 @@ pub struct ServerOnlineUserMessage {
}
pub async fn send_online_users_and_send(
sender: &mut (impl SinkExt<Message, Error = Error> + Unpin),
sender: &mut (impl SinkExt<Message, Error=Error> + Unpin),
from_id: &str,
) -> Result<(), Error> {
let messages = send_online_users_resp()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment