Commit 5924414a by qlintonger xeno

同一连接下线完成!

parent 0ed404af
......@@ -8,8 +8,9 @@ use crate::utils::json_utils::parse_message;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use std::time::{Duration, Instant};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
use tokio::sync::watch;
use tokio::time;
use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response};
......@@ -60,9 +61,14 @@ pub(crate) async fn handle_client(
if let Some(params) = connection_params {
if let Some(from_id) = params.get("fromID") {
let from_id = from_id.clone();
// 获取当前时间戳
let connection_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("时间获取失败")
.as_nanos();
// 注册客户端到事件中心
register_client(from_id.clone(), center_to_client_sender).await;
register_client((from_id.clone(), connection_time), center_to_client_sender).await;
// 发送新连接事件
event_sender
......@@ -71,6 +77,9 @@ pub(crate) async fn handle_client(
println!("用户 {} 已连接", from_id);
// 使用 watch 通道来发送关闭信号
let (close_tx, mut close_rx) = watch::channel(false);
// 启动事件中心调度处理任务
let event_task = tokio::spawn({
let from_id = from_id.clone();
......@@ -93,11 +102,19 @@ pub(crate) async fn handle_client(
ClientMessage::SendClientMessage(
from_id,
client_message,
close
) => {
let real_user_id = from_id.0;
if let Err(e) = sender.send(Message::text(client_message)).await {
println!("发送给用户id {} 独立消息失败:{:?}", from_id.clone(), e);
println!("发送给用户id {} 独立消息失败:{:?}", real_user_id, e);
break;
}
if close {
// 通知外层循环关闭
close_tx.send(true).unwrap();
println!("发送给用户id {} 要求关闭连接", real_user_id);
break
}
}
}
}
......@@ -119,7 +136,7 @@ pub(crate) async fn handle_client(
println!("收到客户端心跳消息 {:?}", &data);
last_heartbeat_time = Instant::now();
if let Ok(json_str) = heart_resp(&from_id) {
event_sender.clone().send(Event::SendClientMessage(from_id.clone(), json_str)).unwrap();
event_sender.clone().send(Event::SendClientMessage((from_id.clone(), connection_time), json_str, false)).unwrap();
}
},
_ => {
......@@ -140,15 +157,22 @@ pub(crate) async fn handle_client(
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id.clone())).unwrap();
event_sender.send(Event::CloseConnection((from_id.clone(), connection_time))).unwrap();
break;
}
// 监听关闭通知
_ = close_rx.changed() => {
if *close_rx.borrow() {
println!("收到关闭通知,退出循环");
break;
}
}
}
}
// 发送关闭连接事件
event_sender
.send(Event::CloseConnection(from_id.clone()))
.send(Event::CloseConnection((from_id.clone(), connection_time)))
.unwrap();
// 等待事件中心调度任务结束
......
use crate::handles::redis::{insert_this_connection, remove_this_connection};
use crate::typing::message_typed::OtherLoginMessageData;
use dashmap::DashMap;
use lazy_static::lazy_static;
use serde_json::Value::Null;
use std::collections::HashMap;
use tokio::sync::mpsc;
......@@ -8,43 +10,86 @@ use tokio::sync::mpsc;
#[derive(Debug)]
pub enum Event {
NewConnection(String, HashMap<String, String>),
CloseConnection(String),
SendClientMessage(String, String),
CloseConnection((String, u128)),
SendClientMessage((String, u128), String, bool),
}
// 定义事件中心发送给客户端的消息类型
#[derive(Debug)]
pub enum ClientMessage {
CmdUpdateOnlineUsers,
SendClientMessage(String, String),
SendClientMessage((String, u128), String, bool),
}
lazy_static! {
// 存储每个客户端的发送者,用于事件中心向客户端发送消息
pub static ref CLIENT_SENDERS: DashMap<String, mpsc::Sender<ClientMessage>> = DashMap::new();
pub static ref CLIENT_SENDERS: DashMap<(String, u128), mpsc::Sender<ClientMessage>> =
DashMap::new();
}
// 注册客户端的发送者
pub async fn register_client(
from_id: String,
from_id: (String, u128),
center_to_client_sender: mpsc::Sender<ClientMessage>,
) {
close_existing_connection(&from_id).await;
println!("注册用户 {} 前数量 {}", &from_id, CLIENT_SENDERS.len());
close_existing_connection(&from_id, true).await;
println!("注册用户 {:?} 前数量 {}", &from_id, CLIENT_SENDERS.len());
CLIENT_SENDERS.insert(from_id.clone(), center_to_client_sender);
println!(
"注册用户 {} 后数量 {}",
from_id.clone(),
CLIENT_SENDERS.len()
);
println!("注册用户 {:?} 后数量 {}", from_id, CLIENT_SENDERS.len());
}
// 关闭之前绑定的 WebSocket 连接
pub async fn close_existing_connection(from_id: &str) {
CLIENT_SENDERS.remove(from_id);
if let Err(e) = remove_this_connection(from_id).await {
// 关闭之前绑定的 WebSocket 连接
pub async fn close_existing_connection(from_id: &(String, u128), close_old: bool) {
let (real_user_id, current_time) = (&from_id.0, from_id.1);
if close_old {
// 关闭旧连接:找到所有连接时间小于当前时间的连接
let mut keys_to_remove = Vec::new();
for entry in CLIENT_SENDERS.iter() {
if entry.key().0 == real_user_id.to_string() && entry.key().1 < current_time {
keys_to_remove.push(entry.key().clone());
}
}
for key in keys_to_remove {
if let Some((_, sender)) = CLIENT_SENDERS.remove(&key) {
let other_login_msg = OtherLoginMessageData {
msg_type: "OtherLoginIn".to_string(),
from_id: "0".to_string(),
from_name: "Server".to_string(),
msg_data: Null,
to_id: key.0.clone(),
};
if let Err(e) = sender.send(ClientMessage::SendClientMessage(key.clone(), serde_json::to_string(&other_login_msg).unwrap(), true)).await {
println!("通知客户端 {:?} 关闭连接失败: {:?}", key, e);
} else {
println!("通知客户端 {:?} 关闭连接成功", key);
}
}
if let Err(e) = remove_this_connection(&key.0).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
}
} else {
// 关闭当前连接
if let Some((_, sender)) = CLIENT_SENDERS.remove(from_id) {
let other_login_msg = OtherLoginMessageData {
msg_type: "OtherLoginIn".to_string(),
from_id: "0".to_string(),
from_name: "Server".to_string(),
msg_data: Null,
to_id: real_user_id.clone(),
};
if let Err(e) = sender.send(ClientMessage::SendClientMessage(from_id.clone(), serde_json::to_string(&other_login_msg).unwrap(), true)).await {
println!("通知客户端 {:?} 关闭连接失败: {:?}", from_id, e);
} else {
println!("通知客户端 {:?} 关闭连接成功", from_id);
}
}
if let Err(e) = remove_this_connection(real_user_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
}
}
// 处理事件的任务
......@@ -60,16 +105,14 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
notify_all_clients_to_update_online_users().await;
}
Event::CloseConnection(from_id) => {
println!("有关闭的连接 用户id {} 更新在线用户列表事件触发", from_id);
close_existing_connection(&from_id).await;
println!("有关闭的连接 用户id {:?} 更新在线用户列表事件触发", from_id);
close_existing_connection(&from_id, false).await;
notify_all_clients_to_update_online_users().await;
}
Event::SendClientMessage(from_id, msg) => {
Event::SendClientMessage(from_id, msg, close) => {
let target_sender = CLIENT_SENDERS.get(&from_id).unwrap();
if let Err(e) = target_sender
.send(ClientMessage::SendClientMessage(
from_id, msg,
))
.send(ClientMessage::SendClientMessage(from_id, msg, close))
.await
{
println!("通知对应sender发送ws消息失败 {}", e);
......@@ -91,9 +134,9 @@ async fn notify_all_clients_to_update_online_users() {
let sender: &mpsc::Sender<ClientMessage> = entry.value();
let from_id = entry.key();
if let Err(e) = sender.send(ClientMessage::CmdUpdateOnlineUsers).await {
println!("通知客户端 {} 更新在线用户列表失败: {:?}", from_id, e);
println!("通知客户端 {:?} 更新在线用户列表失败: {:?}", from_id, e);
} else {
println!("通知客户端 {} 更新在线用户列表成功 ===> $$$", from_id);
println!("通知客户端 {:?} 更新在线用户列表成功 ===> $$$", from_id);
}
}
}
use crate::typing::message_typed::ClientMessageData;
use futures::SinkExt;
use tungstenite::{Error, Message};
// 处理其他类型消息
pub(crate) async fn handle_other_message(
// 增加 + std::marker::Unpin 限制
sender: &mut (impl SinkExt<Message, Error = Error> + Unpin),
data: &ClientMessageData,
from_id: &str,
) -> Result<(), Error> {
println!("收到客户端消息: {:?} 来自用户id: {}", data, &from_id);
Ok(())
}
pub mod handle_messages;
pub mod handshake;
pub mod heartbeat;
pub mod online_users_update;
......
......@@ -14,3 +14,17 @@ pub struct ClientMessageData {
#[serde(rename = "toID")]
pub to_id: String
}
#[derive(Serialize, Deserialize, Debug)]
pub struct OtherLoginMessageData {
#[serde(rename = "msgType")]
pub msg_type: String,
#[serde(rename = "fromID")]
pub from_id: String,
#[serde(rename = "fromName")]
pub from_name: String,
#[serde(rename = "msgData")]
pub msg_data: serde_json::Value,
#[serde(rename = "toID")]
pub to_id: String
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment