Commit b6614cc5 by qlintonger xeno

成功解决CmdUpdateOnlineUsers的问题+完毕+准备测试

parent e49f3e28
...@@ -4,9 +4,9 @@ use crate::handles::close_connection::handle_connection_error; ...@@ -4,9 +4,9 @@ use crate::handles::close_connection::handle_connection_error;
use crate::handles::handle_messages::handle_other_message; use crate::handles::handle_messages::handle_other_message;
use crate::handles::handshake::handle_handshake; use crate::handles::handshake::handle_handshake;
use crate::handles::heartbeat::{handle_heartbeat, heart_resp}; use crate::handles::heartbeat::{handle_heartbeat, heart_resp};
use crate::handles::online_users_update::send_online_users_resp; use crate::handles::online_users_update::{send_online_users_resp, OnlineUserMessage, ServerOnlineUserMessage};
use crate::handles::redis::{insert_this_connection, remove_this_connection}; use crate::handles::redis::{insert_this_connection, remove_this_connection};
use crate::utils::json_utils::parse_message; use crate::utils::json_utils::{get_current_timestamp, parse_message};
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
...@@ -15,6 +15,7 @@ use std::time::{Duration, Instant}; ...@@ -15,6 +15,7 @@ use std::time::{Duration, Instant};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender}; use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
use tokio::sync::RwLock as AsyncRwLock; use tokio::sync::RwLock as AsyncRwLock;
use tokio::time; use tokio::time;
use tokio::time::interval;
use tokio_tungstenite::accept_hdr_async; use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response}; use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
...@@ -29,7 +30,7 @@ async fn close_existing_connection(from_id: &str) { ...@@ -29,7 +30,7 @@ async fn close_existing_connection(from_id: &str) {
{ {
// 移除客户端的发送者 // 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await; let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id); senders.remove(from_id.clone());
} }
if let Err(e) = remove_this_connection(from_id).await { if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e); println!("从 Redis 中移除用户信息时出错: {}", e);
...@@ -95,6 +96,8 @@ pub(crate) async fn handle_client( ...@@ -95,6 +96,8 @@ pub(crate) async fn handle_client(
.unwrap(); .unwrap();
let mut last_heartbeat_time = Instant::now(); let mut last_heartbeat_time = Instant::now();
let msg_check_time = Duration::from_millis(100); // 检查事件中心消息的时间间隔
let mut interval = time::interval(msg_check_time);
loop { loop {
tokio::select! { tokio::select! {
...@@ -192,15 +195,14 @@ pub(crate) async fn handle_client( ...@@ -192,15 +195,14 @@ pub(crate) async fn handle_client(
} }
} }
} }
// 处理来自事件中心的消息 // 定期检查事件中心的消息
maybe_msg = center_to_client_receiver.try_recv() => { _ = interval.tick() => {
if let Some(msg) = maybe_msg { if let Ok(msg) = center_to_client_receiver.try_recv() {
match msg { match msg {
ClientMessage::CmdUpdateOnlineUsers => { ClientMessage::CmdUpdateOnlineUsers => {
println!("消息中心:==> 收到 CmdUpdateOnlineUsers 消息"); println!("消息中心:==> 收到 CmdUpdateOnlineUsers 消息 发送给 {}", &from_id_clone);
if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await { if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e); println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件
{ {
// 移除客户端的发送者 // 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await; let mut senders = CLIENT_SENDERS.write().await;
...@@ -248,11 +250,17 @@ async fn send_online_users_and_send( ...@@ -248,11 +250,17 @@ async fn send_online_users_and_send(
let messages = send_online_users_resp() let messages = send_online_users_resp()
.await .await
.map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?; .map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
for (_, json) in messages { let json_message = serde_json::to_string(&ServerOnlineUserMessage {
if let Err(e) = sender.send(Message::text(json)).await { msg_type: "CmdUpdateOnlineUsers".to_string(),
println!("发送在线用户列表消息给用户 {} 失败: {}", from_id, e); from_name: "Server".to_string(),
return Err(e); from_id: "0".to_string(),
} to_id: from_id.to_string(),
msg_data: messages,
time: get_current_timestamp()
}).map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
if let Err(e) = sender.send(Message::text(json_message)).await {
println!("发送在线用户列表消息给用户 {} 失败: {}", from_id, e);
return Err(e);
} }
println!("发送在线用户列表消息给用户 {} 成功", from_id); println!("发送在线用户列表消息给用户 {} 成功", from_id);
Ok(()) Ok(())
......
...@@ -52,7 +52,6 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) { ...@@ -52,7 +52,6 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
// 这里可以实现其他触发更新在线用户列表的逻辑 // 这里可以实现其他触发更新在线用户列表的逻辑
// 为简单起见,暂未详细实现 // 为简单起见,暂未详细实现
println!("更新在线用户列表事件触发"); println!("更新在线用户列表事件触发");
notify_all_clients_to_update_online_users().await;
} }
} }
} }
......
...@@ -25,7 +25,7 @@ pub struct OnlineUserMessage { ...@@ -25,7 +25,7 @@ pub struct OnlineUserMessage {
} }
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct OnlineUsersMessage { pub struct ServerOnlineUserMessage {
#[serde(rename = "msgType")] #[serde(rename = "msgType")]
pub msg_type: String, pub msg_type: String,
#[serde(rename = "fromId")] #[serde(rename = "fromId")]
...@@ -40,9 +40,8 @@ pub struct OnlineUsersMessage { ...@@ -40,9 +40,8 @@ pub struct OnlineUsersMessage {
pub time: u128, pub time: u128,
} }
pub async fn send_online_users_resp() -> Result<Vec<(String, String)>, serde_json::Error> { pub async fn send_online_users_resp() -> Result<Vec<OnlineUserMessage>, serde_json::Error> {
let mut msg_data = Vec::new(); let mut msg_data = Vec::new();
let mut user_ids = Vec::new();
{ {
let online_users = ONLINE_USERS.read().await; let online_users = ONLINE_USERS.read().await;
...@@ -62,26 +61,9 @@ pub async fn send_online_users_resp() -> Result<Vec<(String, String)>, serde_jso ...@@ -62,26 +61,9 @@ pub async fn send_online_users_resp() -> Result<Vec<(String, String)>, serde_jso
from_name: parts[8].to_string(), from_name: parts[8].to_string(),
}; };
msg_data.push(user_msg); msg_data.push(user_msg);
user_ids.push(user_id.clone());
} }
} }
} }
let mut messages = Vec::new(); Ok(msg_data)
for user_id in user_ids {
let to_id = user_id.clone();
let message = OnlineUsersMessage {
msg_type: "CmdUpdateOnlineUsers".to_string(),
from_id: "0".to_string(),
from_name: "Server".to_string(),
to_id,
msg_data: msg_data.clone(),
time: get_current_timestamp(),
};
let json = serde_json::to_string(&message)?;
messages.push((user_id, json));
}
Ok(messages)
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment