Commit e49f3e28 by qlintonger xeno

成功解决CmdUpdateOnlineUsers的问题

parent 1996b323
use crate::config::config::STATIC_WS_PWD; use crate::config::config::STATIC_WS_PWD;
use crate::events::{register_client, ClientMessage, Event}; use crate::events::{register_client, ClientMessage, Event, CLIENT_SENDERS};
use crate::handles::close_connection::handle_connection_error; use crate::handles::close_connection::handle_connection_error;
use crate::handles::handle_messages::handle_other_message; use crate::handles::handle_messages::handle_other_message;
use crate::handles::handshake::handle_handshake; use crate::handles::handshake::handle_handshake;
...@@ -25,8 +25,12 @@ lazy_static! { ...@@ -25,8 +25,12 @@ lazy_static! {
} }
// 关闭之前绑定的 WebSocket 连接 // 关闭之前绑定的 WebSocket 连接
async fn close_existing_connection(event_sender: &UnboundedSender<Event>, from_id: &str) { async fn close_existing_connection(from_id: &str) {
event_sender.send(Event::CloseConnection(from_id.to_string())).unwrap(); {
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
if let Err(e) = remove_this_connection(from_id).await { if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e); println!("从 Redis 中移除用户信息时出错: {}", e);
} }
...@@ -75,7 +79,7 @@ pub(crate) async fn handle_client( ...@@ -75,7 +79,7 @@ pub(crate) async fn handle_client(
let from_id_clone = from_id.clone(); let from_id_clone = from_id.clone();
// 检查 Redis 中是否已经存在该 fromId // 检查 Redis 中是否已经存在该 fromId
close_existing_connection(&event_sender, &from_id).await; close_existing_connection(&from_id).await;
// 将该用户的信息插入到 Redis 中 // 将该用户的信息插入到 Redis 中
if let Err(e) = insert_this_connection(&from_id, &params).await { if let Err(e) = insert_this_connection(&from_id, &params).await {
...@@ -86,7 +90,9 @@ pub(crate) async fn handle_client( ...@@ -86,7 +90,9 @@ pub(crate) async fn handle_client(
register_client(from_id.clone(), center_to_client_sender).await; register_client(from_id.clone(), center_to_client_sender).await;
// 发送新连接事件 // 发送新连接事件
event_sender.send(Event::NewConnection(from_id.clone())).unwrap(); event_sender
.send(Event::NewConnection(from_id.clone()))
.unwrap();
let mut last_heartbeat_time = Instant::now(); let mut last_heartbeat_time = Instant::now();
...@@ -107,9 +113,14 @@ pub(crate) async fn handle_client( ...@@ -107,9 +113,14 @@ pub(crate) async fn handle_client(
if let Ok(json_str) = heart_resp(&from_id_clone) { if let Ok(json_str) = heart_resp(&from_id_clone) {
if let Err(e) = sender.send(Message::text(json_str)).await { if let Err(e) = sender.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e); println!("发送心跳信息失败: {}", e);
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件 // 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await; handle_connection_error(&from_id_clone).await;
break; break;
} }
} }
...@@ -119,8 +130,14 @@ pub(crate) async fn handle_client( ...@@ -119,8 +130,14 @@ pub(crate) async fn handle_client(
if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await { if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e); println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件 // 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await; handle_connection_error(&from_id_clone).await;
break; break;
} }
}, },
...@@ -128,8 +145,14 @@ pub(crate) async fn handle_client( ...@@ -128,8 +145,14 @@ pub(crate) async fn handle_client(
if let Err(e) = handle_other_message(&mut sender, &data, &from_id_clone).await { if let Err(e) = handle_other_message(&mut sender, &data, &from_id_clone).await {
println!("Failed to handle other message: {}", e); println!("Failed to handle other message: {}", e);
// 发送关闭连接事件 // 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await; handle_connection_error(&from_id_clone).await;
break; break;
} }
} }
...@@ -144,21 +167,33 @@ pub(crate) async fn handle_client( ...@@ -144,21 +167,33 @@ pub(crate) async fn handle_client(
Some(Err(e)) => { Some(Err(e)) => {
println!("接受客户端消息出错: {}", e); println!("接受客户端消息出错: {}", e);
// 发送关闭连接事件 // 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await; handle_connection_error(&from_id_clone).await;
break; break;
} }
None => { None => {
println!("客户端断开连接"); println!("客户端断开连接");
// 发送关闭连接事件 // 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await; handle_connection_error(&from_id_clone).await;
break; break;
} }
} }
} }
// 处理来自事件中心的消息 // 处理来自事件中心的消息
maybe_msg = center_to_client_receiver.recv() => { maybe_msg = center_to_client_receiver.try_recv() => {
if let Some(msg) = maybe_msg { if let Some(msg) = maybe_msg {
match msg { match msg {
ClientMessage::CmdUpdateOnlineUsers => { ClientMessage::CmdUpdateOnlineUsers => {
...@@ -166,8 +201,14 @@ pub(crate) async fn handle_client( ...@@ -166,8 +201,14 @@ pub(crate) async fn handle_client(
if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await { if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e); println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件 // 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await; handle_connection_error(&from_id_clone).await;
break; break;
} }
} }
...@@ -179,19 +220,19 @@ pub(crate) async fn handle_client( ...@@ -179,19 +220,19 @@ pub(crate) async fn handle_client(
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => { _ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone); println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone);
// 发送关闭连接事件 // 发送关闭连接事件
{
// 移除客户端的发送者
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
}
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap(); event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await; handle_connection_error(&from_id_clone).await;
break; break;
} }
} }
} }
println!("断开与用户id: {},连接", from_id_clone); println!("断开与用户id: {},连接", from_id_clone);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
// 从 Redis 中移除该用户的信息
if let Err(e) = remove_this_connection(&from_id_clone).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
} }
} else { } else {
println!("无法获取连接参数"); println!("无法获取连接参数");
...@@ -200,8 +241,13 @@ pub(crate) async fn handle_client( ...@@ -200,8 +241,13 @@ pub(crate) async fn handle_client(
Ok(()) Ok(())
} }
async fn send_online_users_and_send(sender: &mut (impl SinkExt<Message, Error = Error> + std::marker::Unpin), from_id: &str) -> Result<(), Error> { async fn send_online_users_and_send(
let messages = send_online_users_resp().await.map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?; sender: &mut (impl SinkExt<Message, Error = Error> + std::marker::Unpin),
from_id: &str,
) -> Result<(), Error> {
let messages = send_online_users_resp()
.await
.map_err(|e| Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)))?;
for (_, json) in messages { for (_, json) in messages {
if let Err(e) = sender.send(Message::text(json)).await { if let Err(e) = sender.send(Message::text(json)).await {
println!("发送在线用户列表消息给用户 {} 失败: {}", from_id, e); println!("发送在线用户列表消息给用户 {} 失败: {}", from_id, e);
......
...@@ -3,7 +3,6 @@ use std::collections::HashMap; ...@@ -3,7 +3,6 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tungstenite::Error;
// 定义事件类型 // 定义事件类型
#[derive(Debug)] #[derive(Debug)]
...@@ -27,9 +26,14 @@ lazy_static! { ...@@ -27,9 +26,14 @@ lazy_static! {
} }
// 注册客户端的发送者 // 注册客户端的发送者
pub async fn register_client(from_id: String, center_to_client_sender: mpsc::Sender<ClientMessage>) { pub async fn register_client(
from_id: String,
center_to_client_sender: mpsc::Sender<ClientMessage>,
) {
let mut senders = CLIENT_SENDERS.write().await; let mut senders = CLIENT_SENDERS.write().await;
println!("注册客户端的发送者数量---》注册前: {:?}", &senders.len());
senders.insert(from_id, center_to_client_sender); senders.insert(from_id, center_to_client_sender);
println!("注册客户端的发送者数量---》注册后: {:?}", &senders.len());
} }
// 处理事件的任务 // 处理事件的任务
...@@ -37,13 +41,12 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) { ...@@ -37,13 +41,12 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
while let Some(event) = receiver.recv().await { while let Some(event) = receiver.recv().await {
match event { match event {
Event::NewConnection(from_id) => { Event::NewConnection(from_id) => {
println!("新连接: {}", from_id); println!("有新的连接 用户id {} 更新在线用户列表事件触发", from_id);
notify_all_clients_to_update_online_users().await;
} }
Event::CloseConnection(from_id) => { Event::CloseConnection(from_id) => {
println!("关闭连接: {}", from_id); println!("有关闭的连接 用户id {} 更新在线用户列表事件触发", from_id);
// 移除客户端的发送者 notify_all_clients_to_update_online_users().await;
let mut senders = CLIENT_SENDERS.write().await;
senders.remove(&from_id);
} }
Event::UpdateOnlineUsers => { Event::UpdateOnlineUsers => {
// 这里可以实现其他触发更新在线用户列表的逻辑 // 这里可以实现其他触发更新在线用户列表的逻辑
...@@ -58,9 +61,9 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) { ...@@ -58,9 +61,9 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
// 通知所有客户端线程发送 CmdUpdateOnlineUsers 消息 // 通知所有客户端线程发送 CmdUpdateOnlineUsers 消息
async fn notify_all_clients_to_update_online_users() { async fn notify_all_clients_to_update_online_users() {
let senders = CLIENT_SENDERS.read().await; let senders = CLIENT_SENDERS.read().await;
for (_, sender) in senders.iter() { for (from_id, sender) in senders.iter() {
if let Err(e) = sender.send(ClientMessage::CmdUpdateOnlineUsers).await { if let Err(e) = sender.send(ClientMessage::CmdUpdateOnlineUsers).await {
println!("通知客户端更新在线用户列表失败: {:?}", e); println!("通知客户端 {} 更新在线用户列表失败: {:?}", from_id, e);
} }
} }
} }
use crate::events::Event;
use crate::handles::redis::remove_this_connection; use crate::handles::redis::remove_this_connection;
use tokio::sync::mpsc::UnboundedSender;
pub async fn handle_connection_error( pub async fn handle_connection_error(from_id: &str) {
from_id: &str,
event_sender: &UnboundedSender<Event>,
) {
println!("开始处理用户id: {} 的连接错误", from_id); println!("开始处理用户id: {} 的连接错误", from_id);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id.to_string())).unwrap();
// 从 Redis 中移除该用户的信息 // 从 Redis 中移除该用户的信息
if let Err(e) = remove_this_connection(from_id).await { if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e); println!("从 Redis 中移除用户信息时出错: {}", e);
} }
} }
...@@ -7,7 +7,8 @@ pub(crate) fn handle_handshake( ...@@ -7,7 +7,8 @@ pub(crate) fn handle_handshake(
static_ws_pwd: &str, static_ws_pwd: &str,
) -> Result<HashMap<String, String>, String> { ) -> Result<HashMap<String, String>, String> {
println!("新客户端连接: {}", req.uri()); println!("新客户端连接: {}", req.uri());
let connection_params = match crate::utils::utils::get_connection_params(req.uri().to_string()) { let connection_params = match crate::utils::utils::get_connection_params(req.uri().to_string())
{
Ok(p) => p, Ok(p) => p,
Err(e) => { Err(e) => {
let error_msg = format!("缺少重要连接数据段: {}", e); let error_msg = format!("缺少重要连接数据段: {}", e);
......
pub mod close_connection;
pub mod handle_messages; pub mod handle_messages;
pub mod heartbeat;
pub mod handshake; pub mod handshake;
pub mod redis; pub mod heartbeat;
pub mod close_connection;
pub mod online_users_update; pub mod online_users_update;
pub mod redis;
use crate::client::ONLINE_USERS;
use crate::config::config::REDIS_ADDR; use crate::config::config::REDIS_ADDR;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use redis::Client; use redis::Client;
use redis::Commands; use redis::Commands;
use redis_pool::SingleRedisPool; use redis_pool::SingleRedisPool;
use std::collections::HashMap; use std::collections::HashMap;
use crate::client::ONLINE_USERS;
lazy_static! { lazy_static! {
static ref REDIS_POOL: SingleRedisPool = { static ref REDIS_POOL: SingleRedisPool = {
...@@ -49,11 +49,7 @@ pub async fn insert_this_connection( ...@@ -49,11 +49,7 @@ pub async fn insert_this_connection(
// callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName // callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName
let user_info_str = format!( let user_info_str = format!(
"{},{},{},{},1,1,0,0,{}", "{},{},{},{},1,1,0,0,{}",
"idle", "idle", "", from_id, device_id, from_name
"",
from_id,
device_id,
from_name
); );
if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &user_info_str) { if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &user_info_str) {
......
...@@ -2,10 +2,10 @@ extern crate core; ...@@ -2,10 +2,10 @@ extern crate core;
mod client; mod client;
mod config; mod config;
mod events;
mod handles; mod handles;
mod typing; mod typing;
mod utils; mod utils;
mod events;
use crate::events::handle_events; use crate::events::handle_events;
use client::handle_client; use client::handle_client;
...@@ -24,6 +24,11 @@ async fn main() { ...@@ -24,6 +24,11 @@ async fn main() {
let client_event_sender = event_sender.clone(); let client_event_sender = event_sender.clone();
// 创建一个用于事件中心向客户端发送消息的通道 // 创建一个用于事件中心向客户端发送消息的通道
let (center_to_client_sender, center_to_client_receiver) = mpsc::channel(10); let (center_to_client_sender, center_to_client_receiver) = mpsc::channel(10);
tokio::spawn(handle_client(stream, client_event_sender, center_to_client_sender, center_to_client_receiver)); tokio::spawn(handle_client(
stream,
client_event_sender,
center_to_client_sender,
center_to_client_receiver,
));
} }
} }
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment