Commit 683aaa2a by qlintonger xeno

CmdUpdateOnlineUsers完毕+1!需要通过线程通信来重新设计

parent e6b0f4c6
...@@ -21,8 +21,6 @@ use crate::handles::online_users_update::send_online_users_resp; ...@@ -21,8 +21,6 @@ use crate::handles::online_users_update::send_online_users_resp;
lazy_static! { lazy_static! {
pub static ref CONNECTIONS: ConnectionMap = Arc::new(AsyncRwLock::new(HashMap::new())); pub static ref CONNECTIONS: ConnectionMap = Arc::new(AsyncRwLock::new(HashMap::new()));
static ref TASKS: TaskMap = Arc::new(RwLock::new(HashMap::new())); static ref TASKS: TaskMap = Arc::new(RwLock::new(HashMap::new()));
}
lazy_static! {
pub static ref ONLINE_USERS: Arc<AsyncRwLock<HashMap<String, String>>> = Arc::new(AsyncRwLock::new(HashMap::new())); pub static ref ONLINE_USERS: Arc<AsyncRwLock<HashMap<String, String>>> = Arc::new(AsyncRwLock::new(HashMap::new()));
} }
...@@ -61,6 +59,18 @@ async fn close_existing_connection(from_id: &str) { ...@@ -61,6 +59,18 @@ async fn close_existing_connection(from_id: &str) {
} }
} }
// 添加连接到全局连接映射
async fn add_connection(from_id: String, connection: Connection) {
let mut connections = CONNECTIONS.write().await;
connections.insert(from_id, connection);
}
// 从全局连接映射中移除连接
async fn remove_connection(from_id: &str) {
let mut connections = CONNECTIONS.write().await;
connections.remove(from_id);
}
pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> { pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> {
let must_existed_params = ["deviceId", "fromId", "wsPwd"]; let must_existed_params = ["deviceId", "fromId", "wsPwd"];
let mut connection_params = None; let mut connection_params = None;
...@@ -101,20 +111,17 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -101,20 +111,17 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
// 检查 Redis 中是否已经存在该 fromId // 检查 Redis 中是否已经存在该 fromId
close_existing_connection(&from_id).await; close_existing_connection(&from_id).await;
// 将新连接添加到全局连接映射
{
let mut connections = CONNECTIONS.write().await;
connections.insert(from_id.clone(), Connection { sender, receiver, from_id: from_id.to_string() });
}
// 将该用户的信息插入到 Redis 中 // 将该用户的信息插入到 Redis 中
if let Err(e) = insert_this_connection(&from_id, &params).await { if let Err(e) = insert_this_connection(&from_id, &params).await {
println!("将用户信息插入到 Redis 中时出错: {}", e); println!("将用户信息插入到 Redis 中时出错: {}", e);
} }
// 将新连接添加到全局连接映射
add_connection(from_id.clone(), Connection { sender, receiver }).await;
// 准备更新用户链接 // 准备更新用户链接
if let Err(e) = send_online_users_resp().await { if let Err(e) = send_online_users_resp().await {
println!("在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", &from_id,e); println!("在处理新用户id {} 之时广播,处理在线用户列表出错了:{:?}", &from_id, e);
} else { } else {
println!("广播消息 来源id {} 成功", &from_id); println!("广播消息 来源id {} 成功", &from_id);
} }
...@@ -122,11 +129,13 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -122,11 +129,13 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut last_heartbeat_time = Instant::now(); let mut last_heartbeat_time = Instant::now();
loop { loop {
let current_connection = { let mut connection_opt = None;
{
let mut connections = CONNECTIONS.write().await; let mut connections = CONNECTIONS.write().await;
connections.remove(&from_id_clone) // 移除连接,将其从 map 中取出 connection_opt = connections.remove(&from_id_clone);
}; }
if let Some(connection) = current_connection {
if let Some(mut connection) = connection_opt {
let (mut receiver_ref, mut sender_ref) = let (mut receiver_ref, mut sender_ref) =
(connection.receiver, connection.sender); (connection.receiver, connection.sender);
tokio::select! { tokio::select! {
...@@ -187,32 +196,20 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -187,32 +196,20 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
// 处理心跳超时 // 处理心跳超时
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => { _ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone); // 使用克隆后的 from_id println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone);
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break; break;
} }
} }
// 处理完一轮后,若连接还未断开,将其重新放回 CONNECTIONS // 处理完一轮后,若连接还未断开,将其重新放回 CONNECTIONS
{ add_connection(from_id_clone.clone(), Connection { sender: sender_ref, receiver: receiver_ref }).await;
let mut connections = CONNECTIONS.write().await;
connections.insert(
from_id_clone.clone(),
Connection {
sender: sender_ref,
receiver: receiver_ref,
from_id: from_id_clone.to_string(),
},
);
}
} else { } else {
break; break;
} }
} }
println!("断开与用户id: {},连接", from_id_clone); // 使用克隆后的 from_id println!("断开与用户id: {},连接", from_id_clone);
// 从全局连接映射中移除该连接 // 从全局连接映射中移除该连接
{ remove_connection(&from_id_clone).await;
let mut connections = CONNECTIONS.write().await;
connections.remove(&from_id_clone); // 使用克隆后的 from_id
}
// 从 Redis 中移除该用户的信息 // 从 Redis 中移除该用户的信息
if let Err(e) = remove_this_connection(&from_id_clone).await { if let Err(e) = remove_this_connection(&from_id_clone).await {
println!("从 Redis 中移除用户信息时出错: {}", e); println!("从 Redis 中移除用户信息时出错: {}", e);
...@@ -221,7 +218,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -221,7 +218,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
// 将任务句柄存储到全局任务映射中 // 将任务句柄存储到全局任务映射中
let mut tasks = TASKS.write().unwrap(); let mut tasks = TASKS.write().unwrap();
tasks.insert(from_id, task); // 使用原始的 from_id tasks.insert(from_id, task);
} }
} else { } else {
println!("无法获取连接参数"); println!("无法获取连接参数");
......
use crate::handles::online_users_update::send_online_users_resp;
use crate::handles::redis::remove_this_connection; use crate::handles::redis::remove_this_connection;
use crate::typing::used_typed::{ConnectionMap, TaskMap}; use crate::typing::used_typed::{ConnectionMap, TaskMap};
use tokio::sync::RwLockWriteGuard; use tokio::sync::RwLockWriteGuard;
use crate::handles::online_users_update::send_online_users_resp;
pub async fn handle_connection_error( pub async fn handle_connection_error(
from_id: &str, from_id: &str,
...@@ -12,8 +12,9 @@ pub async fn handle_connection_error( ...@@ -12,8 +12,9 @@ pub async fn handle_connection_error(
// 从全局连接映射中移除该连接 // 从全局连接映射中移除该连接
let removed = { let removed = {
let mut connections: RwLockWriteGuard<'_, _> = connections.write().await; let mut connections: RwLockWriteGuard<'_, _> = connections.write().await;
println!("清除之前的keys {:?}", connections.keys());
let result = connections.remove(from_id).is_some(); let result = connections.remove(from_id).is_some();
drop(connections); // 提前释放锁 println!("清除之后的keys {:?}", connections.keys());
result result
}; };
......
...@@ -2,7 +2,6 @@ use crate::client::{CONNECTIONS, ONLINE_USERS}; ...@@ -2,7 +2,6 @@ use crate::client::{CONNECTIONS, ONLINE_USERS};
use crate::utils::json_utils::get_current_timestamp; use crate::utils::json_utils::get_current_timestamp;
use futures::SinkExt; use futures::SinkExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
...@@ -38,27 +37,11 @@ pub struct OnlineUsersMessage { ...@@ -38,27 +37,11 @@ pub struct OnlineUsersMessage {
#[serde(rename = "toId")] #[serde(rename = "toId")]
pub to_id: String, pub to_id: String,
#[serde(rename = "msgData")] #[serde(rename = "msgData")]
#[serde(serialize_with = "serialize_arc_vec")] pub msg_data: Vec<OnlineUserMessage>,
#[serde(deserialize_with = "deserialize_arc_vec")]
pub msg_data: Arc<Vec<OnlineUserMessage>>,
#[serde(rename = "time")] #[serde(rename = "time")]
pub time: u128, pub time: u128,
} }
fn serialize_arc_vec<S>(data: &Arc<Vec<OnlineUserMessage>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
(*data).serialize(serializer)
}
fn deserialize_arc_vec<'de, D>(deserializer: D) -> Result<Arc<Vec<OnlineUserMessage>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let data = Vec::<OnlineUserMessage>::deserialize(deserializer)?;
Ok(Arc::new(data))
}
pub async fn send_online_users_resp() -> Result<(), Error> { pub async fn send_online_users_resp() -> Result<(), Error> {
let mut msg_data = Vec::new(); let mut msg_data = Vec::new();
...@@ -84,10 +67,10 @@ pub async fn send_online_users_resp() -> Result<(), Error> { ...@@ -84,10 +67,10 @@ pub async fn send_online_users_resp() -> Result<(), Error> {
} }
} }
let msg_data = Arc::new(msg_data);
let mut connections = CONNECTIONS.write().await; let mut connections = CONNECTIONS.write().await;
for (_, connection) in connections.iter_mut() { println!("当前的connections的keys是 {:?}", connections.keys());
let to_id = connection.from_id.to_string(); for (from_id, connection) in connections.iter_mut() {
let to_id = from_id.to_string();
let message = OnlineUsersMessage { let message = OnlineUsersMessage {
msg_type: "CmdUpdateOnlineUsers".to_string(), msg_type: "CmdUpdateOnlineUsers".to_string(),
from_id: "0".to_string(), from_id: "0".to_string(),
...@@ -99,13 +82,13 @@ pub async fn send_online_users_resp() -> Result<(), Error> { ...@@ -99,13 +82,13 @@ pub async fn send_online_users_resp() -> Result<(), Error> {
let json_result = serde_json::to_string(&message); let json_result = serde_json::to_string(&message);
if let Err(e) = json_result { if let Err(e) = json_result {
println!("序列化消息给用户 {} 失败: {}", connection.from_id, e); println!("序列化消息给用户 {} 失败: {}", from_id.to_string(), e);
continue; continue;
} }
let json = json_result.unwrap(); let json = json_result.unwrap();
if let Err(e) = connection.sender.send(Message::text(json)).await { if let Err(e) = connection.sender.send(Message::text(json)).await {
println!("发送消息给用户 {} 失败: {}", connection.from_id, e); println!("发送消息给用户 {} 失败: {}", from_id.to_string(), e);
} }
} }
Ok(()) Ok(())
......
...@@ -10,7 +10,6 @@ use tungstenite::Message; ...@@ -10,7 +10,6 @@ use tungstenite::Message;
pub struct Connection { pub struct Connection {
pub(crate) sender: futures::stream::SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, pub(crate) sender: futures::stream::SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
pub(crate) receiver: futures::stream::SplitStream<WebSocketStream<tokio::net::TcpStream>>, pub(crate) receiver: futures::stream::SplitStream<WebSocketStream<tokio::net::TcpStream>>,
pub(crate) from_id: String,
} }
pub type ConnectionMap = Arc<AsyncRwLock<HashMap<String, Connection>>>; pub type ConnectionMap = Arc<AsyncRwLock<HashMap<String, Connection>>>;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment