Commit 1996b323 by qlintonger xeno

尝试使用事件机制来实现多线程沟通+1.55

parent 1b49c3a6
use crate::config::config::STATIC_WS_PWD; use crate::config::config::STATIC_WS_PWD;
use crate::events::{Event, register_client, ClientMessage}; use crate::events::{register_client, ClientMessage, Event};
use crate::handles::close_connection::handle_connection_error; use crate::handles::close_connection::handle_connection_error;
use crate::handles::handle_messages::handle_other_message; use crate::handles::handle_messages::handle_other_message;
use crate::handles::handshake::handle_handshake; use crate::handles::handshake::handle_handshake;
...@@ -12,8 +12,8 @@ use lazy_static::lazy_static; ...@@ -12,8 +12,8 @@ use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::mpsc::{UnboundedSender, Sender, Receiver}; use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
use tokio::sync::{mpsc, RwLock as AsyncRwLock}; use tokio::sync::RwLock as AsyncRwLock;
use tokio::time; use tokio::time;
use tokio_tungstenite::accept_hdr_async; use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response}; use tungstenite::handshake::server::{Request, Response};
...@@ -157,19 +157,12 @@ pub(crate) async fn handle_client( ...@@ -157,19 +157,12 @@ pub(crate) async fn handle_client(
} }
} }
} }
// 处理心跳超时
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await;
break;
}
// 处理来自事件中心的消息 // 处理来自事件中心的消息
maybe_msg = center_to_client_receiver.recv() => { maybe_msg = center_to_client_receiver.recv() => {
if let Some(msg) = maybe_msg { if let Some(msg) = maybe_msg {
match msg { match msg {
ClientMessage::CmdUpdateOnlineUsers => { ClientMessage::CmdUpdateOnlineUsers => {
println!("消息中心:==> 收到 CmdUpdateOnlineUsers 消息");
if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await { if let Err(e) = send_online_users_and_send(&mut sender, &from_id_clone).await {
println!("处理在线用户列表出错了:{:?}", e); println!("处理在线用户列表出错了:{:?}", e);
// 发送关闭连接事件 // 发送关闭连接事件
...@@ -182,6 +175,14 @@ pub(crate) async fn handle_client( ...@@ -182,6 +175,14 @@ pub(crate) async fn handle_client(
} }
} }
} }
// 处理心跳超时
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id_clone);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection(from_id_clone.clone())).unwrap();
handle_connection_error(&from_id_clone, &event_sender).await;
break;
}
} }
} }
println!("断开与用户id: {},连接", from_id_clone); println!("断开与用户id: {},连接", from_id_clone);
...@@ -207,5 +208,6 @@ async fn send_online_users_and_send(sender: &mut (impl SinkExt<Message, Error = ...@@ -207,5 +208,6 @@ async fn send_online_users_and_send(sender: &mut (impl SinkExt<Message, Error =
return Err(e); return Err(e);
} }
} }
println!("发送在线用户列表消息给用户 {} 成功", from_id);
Ok(()) Ok(())
} }
\ No newline at end of file
use futures::SinkExt;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
...@@ -6,12 +5,6 @@ use tokio::sync::mpsc; ...@@ -6,12 +5,6 @@ use tokio::sync::mpsc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use tungstenite::Error; use tungstenite::Error;
// 假设的用户详细信息结构体
#[derive(Debug)]
pub struct UserDetails {
pub username: String,
}
// 定义事件类型 // 定义事件类型
#[derive(Debug)] #[derive(Debug)]
pub enum Event { pub enum Event {
...@@ -62,17 +55,6 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) { ...@@ -62,17 +55,6 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
} }
} }
// 向指定客户端发送消息
async fn send_message_to_client(from_id: &str, message: String) -> Result<(), Error> {
let senders = CLIENT_SENDERS.read().await;
if let Some(sender) = senders.get(from_id) {
if let Err(e) = sender.send(ClientMessage::SendMessage(message)).await {
return Err(Error::Io(std::io::Error::new(std::io::ErrorKind::Other, e)));
}
}
Ok(())
}
// 通知所有客户端线程发送 CmdUpdateOnlineUsers 消息 // 通知所有客户端线程发送 CmdUpdateOnlineUsers 消息
async fn notify_all_clients_to_update_online_users() { async fn notify_all_clients_to_update_online_users() {
let senders = CLIENT_SENDERS.read().await; let senders = CLIENT_SENDERS.read().await;
......
pub mod used_typed;
pub mod message_typed; pub mod message_typed;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
// 自定义结构体来存储发送器和接收器
#[derive(Debug)]
pub struct Connection {
pub(crate) sender: futures::stream::SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
pub(crate) receiver: futures::stream::SplitStream<WebSocketStream<tokio::net::TcpStream>>,
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment