Commit 2b845919 by qlintonger xeno

200短线和redis读写等完毕,

parent 862ef612
use crate::config::config::STATIC_WS_PWD; use crate::config::config::{REDIS_ADDR, STATIC_WS_PWD};
use crate::handles::handle_messages::handle_other_message; use crate::handles::handle_messages::handle_other_message;
use crate::handles::heartbeat::handle_heartbeat; use crate::handles::heartbeat::handle_heartbeat;
use crate::typing::used_typed::{Connection, ConnectionMap, TaskMap}; use crate::typing::used_typed::{Connection, ConnectionMap, TaskMap};
...@@ -20,7 +20,7 @@ use tungstenite::{Error, Message}; ...@@ -20,7 +20,7 @@ use tungstenite::{Error, Message};
lazy_static! { lazy_static! {
static ref REDIS_POOL: SingleRedisPool = { static ref REDIS_POOL: SingleRedisPool = {
let client = Client::open("redis://127.0.0.1:6379/").expect("Failed to connect to Redis"); let client = Client::open(REDIS_ADDR).expect("Failed to connect to Redis");
redis_pool::RedisPool::new(client, 16, Some(512)) redis_pool::RedisPool::new(client, 16, Some(512))
}; };
} }
...@@ -41,13 +41,17 @@ async fn close_existing_connection(from_id: &str) { ...@@ -41,13 +41,17 @@ async fn close_existing_connection(from_id: &str) {
task.abort(); task.abort();
} }
let old_connection = {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.lock().await;
let already_done = connections.get(&from_id.to_string()); let already_done = connections.get(&from_id.to_string());
println!( println!(
"关闭之前绑定的 WebSocket 连接: {} {:?}", "关闭之前绑定的 WebSocket 连接: {} {:?}",
from_id, already_done from_id, already_done
); );
if let Some(mut old_connection) = connections.remove(from_id) { connections.remove(from_id)
};
if let Some(mut old_connection) = old_connection {
// 尝试优雅地关闭旧连接 // 尝试优雅地关闭旧连接
if let Err(e) = old_connection.sender.close().await { if let Err(e) = old_connection.sender.close().await {
println!("关闭旧的 WebSocket 发送器时出错: {}", e); println!("关闭旧的 WebSocket 发送器时出错: {}", e);
...@@ -56,29 +60,40 @@ async fn close_existing_connection(from_id: &str) { ...@@ -56,29 +60,40 @@ async fn close_existing_connection(from_id: &str) {
} }
} }
// 更新 Redis 中 connected 集合 // 更新 update_connected_redis 函数,使其返回 Result 类型
async fn update_connected_redis() { async fn update_connected_redis() -> Result<(), redis::RedisError> {
// 改进后
let from_ids: Vec<String>; let from_ids: Vec<String>;
{ {
let connections = CONNECTIONS.lock().await; let connections = CONNECTIONS.lock().await;
from_ids = connections.keys().cloned().collect(); from_ids = connections.keys().cloned().collect();
} }
println!("当前全局连接映射中的 from_id 数量: {}", from_ids.len());
let mut con = REDIS_POOL let mut con = REDIS_POOL
.get_connection() .get_connection()
.expect("Failed to get Redis connection"); .expect("Failed to get Redis connection");
// 先清空集合 // 先清空集合
println!("开始清空 Redis 中的 connected 集合");
if let Err(e) = con.del::<_, ()>("connected") { if let Err(e) = con.del::<_, ()>("connected") {
println!("Failed to delete connected key in Redis: {}", e); println!("清空 Redis 中的 connected 集合时出错: {}", e);
return Err(e);
} }
// 将 from_ids 一次性批量添加到集合中 // 将 from_ids 一次性批量添加到集合中
if !from_ids.is_empty() { if !from_ids.is_empty() {
println!("开始将 from_id 批量添加到 Redis 中的 connected 集合");
if let Err(e) = con.sadd::<_, _, ()>("connected", from_ids.as_slice()) { if let Err(e) = con.sadd::<_, _, ()>("connected", from_ids.as_slice()) {
println!("Failed to add fromIds to connected set in Redis: {}", e); println!(
"将 from_id 批量添加到 Redis 中的 connected 集合时出错: {}",
e
);
return Err(e);
} }
} }
println!("成功更新 Redis 中的 connected 集合");
Ok(())
} }
// 提取出来的处理握手的函数 // 提取出来的处理握手的函数
...@@ -115,19 +130,46 @@ fn handle_handshake( ...@@ -115,19 +130,46 @@ fn handle_handshake(
} }
async fn handle_connection_error(from_id: &str) { async fn handle_connection_error(from_id: &str) {
println!("开始处理用户id: {} 的连接错误", from_id);
// 从全局连接映射中移除该连接 // 从全局连接映射中移除该连接
{ let removed = {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.lock().await;
connections.remove(from_id); let result = connections.remove(from_id).is_some();
} drop(connections); // 提前释放锁
result
};
println!(
"是否成功从全局连接映射中移除用户id: {},结果: {}",
from_id, removed
);
// 更新 Redis 中的 connected 集合 // 更新 Redis 中的 connected 集合
update_connected_redis().await; if removed {
match update_connected_redis().await {
Ok(_) => println!(
"成功更新 Redis 中的 connected 集合,移除用户id: {}",
from_id
),
Err(e) => println!(
"更新 Redis 中的 connected 集合时出错,用户id: {},错误信息: {}",
from_id, e
),
}
}
println!("开始尝试关闭用户id: {} 的 redis 连接", from_id);
// 取消对应的任务 // 取消对应的任务
{
let mut tasks = TASKS.lock().unwrap(); let mut tasks = TASKS.lock().unwrap();
if let Some(task) = tasks.remove(from_id) { match tasks.remove(from_id) {
Some(task) => {
task.abort(); task.abort();
println!("成功取消用户id: {} 的任务", from_id);
}
None => println!("未在全局任务映射中找到用户id: {} 的任务", from_id),
}
} }
println!("断开与用户id: {} 的连接并完成清理操作", from_id); println!("断开与用户id: {} 的连接并完成清理操作", from_id);
...@@ -180,18 +222,18 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -180,18 +222,18 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
// 更新 Redis 中的 connected 集合 // 更新 Redis 中的 connected 集合
update_connected_redis().await; let _ = update_connected_redis().await;
let task = tokio::spawn(async move { let task = tokio::spawn(async move {
let mut last_heartbeat_time = Instant::now(); let mut last_heartbeat_time = Instant::now();
loop { loop {
let current_connection = {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.lock().await;
if let Some(current_connection) = connections.get_mut(&from_id_clone) { connections.remove(&from_id_clone) // 移除连接,将其从 map 中取出
// 使用克隆后的 from_id };
let receiver_ref = &mut current_connection.receiver; if let Some(mut connection) = current_connection {
let sender_ref = &mut current_connection.sender; let (mut receiver_ref, mut sender_ref) =
(connection.receiver, connection.sender);
tokio::select! { tokio::select! {
// 处理消息接收 // 处理消息接收
maybe_msg = receiver_ref.next() => { maybe_msg = receiver_ref.next() => {
...@@ -214,7 +256,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -214,7 +256,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
}, },
_ => { _ => {
if let Err(e) = handle_other_message(sender_ref, &data).await { if let Err(e) = handle_other_message(&mut sender_ref, &data).await {
println!("Failed to handle other message: {}", e); println!("Failed to handle other message: {}", e);
handle_connection_error(&from_id_clone).await; handle_connection_error(&from_id_clone).await;
break; break;
...@@ -246,11 +288,21 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -246,11 +288,21 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
break; break;
} }
} }
// 处理完一轮后,若连接还未断开,将其重新放回 CONNECTIONS
{
let mut connections = CONNECTIONS.lock().await;
connections.insert(
from_id_clone.clone(),
Connection {
sender: sender_ref,
receiver: receiver_ref,
},
);
}
} else { } else {
break; break;
} }
} }
println!("断开与用户id: {},连接", from_id_clone); // 使用克隆后的 from_id println!("断开与用户id: {},连接", from_id_clone); // 使用克隆后的 from_id
// 从全局连接映射中移除该连接 // 从全局连接映射中移除该连接
{ {
...@@ -258,7 +310,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -258,7 +310,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
connections.remove(&from_id_clone); // 使用克隆后的 from_id connections.remove(&from_id_clone); // 使用克隆后的 from_id
} }
// 更新 Redis 中的 connected 集合 // 更新 Redis 中的 connected 集合
update_connected_redis().await; let _ = update_connected_redis().await;
}); });
// 将任务句柄存储到全局任务映射中 // 将任务句柄存储到全局任务映射中
......
pub const STATIC_WS_PWD: &str = "Q8kFm5LzJ2Ab"; pub const STATIC_WS_PWD: &str = "Q8kFm5LzJ2Ab";
pub const STATIC_ADDR: &str = "0.0.0.0:12345"; pub const STATIC_ADDR: &str = "0.0.0.0:12345";
pub const REDIS_ADDR: &str = "redis://127.0.0.1/";
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment