Commit 862ef612 by qlintonger xeno

重设文件结构+解决报错和断线问题

parent a60b12de
...@@ -43,7 +43,10 @@ async fn close_existing_connection(from_id: &str) { ...@@ -43,7 +43,10 @@ async fn close_existing_connection(from_id: &str) {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.lock().await;
let already_done = connections.get(&from_id.to_string()); let already_done = connections.get(&from_id.to_string());
println!("关闭之前绑定的 WebSocket 连接: {} {:?}", from_id, already_done); println!(
"关闭之前绑定的 WebSocket 连接: {} {:?}",
from_id, already_done
);
if let Some(mut old_connection) = connections.remove(from_id) { if let Some(mut old_connection) = connections.remove(from_id) {
// 尝试优雅地关闭旧连接 // 尝试优雅地关闭旧连接
if let Err(e) = old_connection.sender.close().await { if let Err(e) = old_connection.sender.close().await {
...@@ -55,19 +58,25 @@ async fn close_existing_connection(from_id: &str) { ...@@ -55,19 +58,25 @@ async fn close_existing_connection(from_id: &str) {
// 更新 Redis 中 connected 集合 // 更新 Redis 中 connected 集合
async fn update_connected_redis() { async fn update_connected_redis() {
// 改进后
let from_ids: Vec<String>;
{
let connections = CONNECTIONS.lock().await; let connections = CONNECTIONS.lock().await;
let from_ids: Vec<String> = connections.keys().cloned().collect(); from_ids = connections.keys().cloned().collect();
let mut con = REDIS_POOL.get_connection().expect("Failed to get Redis connection"); }
let mut con = REDIS_POOL
.get_connection()
.expect("Failed to get Redis connection");
// 先清空集合 // 先清空集合
if let Err(e) = con.del::<_, ()>("connected") { if let Err(e) = con.del::<_, ()>("connected") {
println!("Failed to delete connected key in Redis: {}", e); println!("Failed to delete connected key in Redis: {}", e);
} }
// 将每个 fromId 依次添加到集合中 // 将 from_ids 一次性批量添加到集合中
for from_id in from_ids { if !from_ids.is_empty() {
if let Err(e) = con.sadd::<_, _, ()>("connected", from_id) { if let Err(e) = con.sadd::<_, _, ()>("connected", from_ids.as_slice()) {
println!("Failed to add fromId to connected set in Redis: {}", e); println!("Failed to add fromIds to connected set in Redis: {}", e);
} }
} }
} }
...@@ -105,6 +114,25 @@ fn handle_handshake( ...@@ -105,6 +114,25 @@ fn handle_handshake(
Ok(connection_params) Ok(connection_params)
} }
async fn handle_connection_error(from_id: &str) {
// 从全局连接映射中移除该连接
{
let mut connections = CONNECTIONS.lock().await;
connections.remove(from_id);
}
// 更新 Redis 中的 connected 集合
update_connected_redis().await;
// 取消对应的任务
let mut tasks = TASKS.lock().unwrap();
if let Some(task) = tasks.remove(from_id) {
task.abort();
}
println!("断开与用户id: {} 的连接并完成清理操作", from_id);
}
pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> { pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> {
let must_existed_params = ["deviceId", "fromId", "wsPwd"]; let must_existed_params = ["deviceId", "fromId", "wsPwd"];
let mut connection_params = None; let mut connection_params = None;
...@@ -159,7 +187,8 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -159,7 +187,8 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
loop { loop {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.lock().await;
if let Some(current_connection) = connections.get_mut(&from_id_clone) { // 使用克隆后的 from_id if let Some(current_connection) = connections.get_mut(&from_id_clone) {
// 使用克隆后的 from_id
let receiver_ref = &mut current_connection.receiver; let receiver_ref = &mut current_connection.receiver;
let sender_ref = &mut current_connection.sender; let sender_ref = &mut current_connection.sender;
...@@ -177,11 +206,19 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -177,11 +206,19 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
println!("收到客户端心跳消息 {:?}", &data); println!("收到客户端心跳消息 {:?}", &data);
handle_heartbeat(&mut last_heartbeat_time); handle_heartbeat(&mut last_heartbeat_time);
if let Ok(json_str) = make_common_resp(Default::default(), "Heart") { if let Ok(json_str) = make_common_resp(Default::default(), "Heart") {
sender_ref.send(Message::text(json_str)).await.unwrap(); if let Err(e) = sender_ref.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e);
handle_connection_error(&from_id_clone).await;
break;
}
} }
}, },
_ => { _ => {
handle_other_message(sender_ref, &data).await.unwrap(); if let Err(e) = handle_other_message(sender_ref, &data).await {
println!("Failed to handle other message: {}", e);
handle_connection_error(&from_id_clone).await;
break;
}
} }
} }
} }
...@@ -193,10 +230,12 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -193,10 +230,12 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
Some(Err(e)) => { Some(Err(e)) => {
println!("接受客户端消息出错: {}", e); println!("接受客户端消息出错: {}", e);
handle_connection_error(&from_id_clone).await;
break; break;
} }
None => { None => {
println!("客户端断开连接"); println!("客户端断开连接");
handle_connection_error(&from_id_clone).await;
break; break;
} }
} }
......
pub const STATIC_WS_PWD: &str = "Q8kFm5LzJ2Ab"; pub const STATIC_WS_PWD: &str = "Q8kFm5LzJ2Ab";
pub const STATIC_ADDR: &str = "0.0.0.0:12345";
use crate::utils::json_utils::{make_common_resp, MessageData};
use futures::SinkExt; use futures::SinkExt;
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
use crate::utils::json_utils::{make_common_resp, MessageData};
// 处理其他类型消息 // 处理其他类型消息
pub(crate) async fn handle_other_message( pub(crate) async fn handle_other_message(
......
extern crate core; extern crate core;
mod client; mod client;
mod utils;
mod config; mod config;
mod handles; mod handles;
mod typing; mod typing;
mod utils;
use client::handle_client; use client::handle_client;
use config::config::STATIC_ADDR as addr;
use tokio::net::TcpListener; use tokio::net::TcpListener;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let addr = "0.0.0.0:12345";
let listener = TcpListener::bind(addr).await.unwrap(); let listener = TcpListener::bind(addr).await.unwrap();
while let Ok((stream, _)) = listener.accept().await { while let Ok((stream, _)) = listener.accept().await {
tokio::spawn(handle_client(stream)); tokio::spawn(handle_client(stream));
} }
} }
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use tokio::sync::Mutex as AsyncMutex;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
use tungstenite::Message; use tungstenite::Message;
use tokio::sync::Mutex as AsyncMutex;
// 自定义结构体来存储发送器和接收器 // 自定义结构体来存储发送器和接收器
#[derive(Debug)] #[derive(Debug)]
......
use std::collections::HashMap; use std::collections::HashMap;
pub(crate) fn get_connection_params(connection_url: String) -> Result<HashMap<String, String>, &'static str> { pub(crate) fn get_connection_params(
connection_url: String,
) -> Result<HashMap<String, String>, &'static str> {
if let Some(query_part) = connection_url.split('?').nth(1) { if let Some(query_part) = connection_url.split('?').nth(1) {
let mut params_mapping = HashMap::new(); let mut params_mapping = HashMap::new();
for param in query_part.split('&') { for param in query_part.split('&') {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment