Commit 9302eff9 by qlintonger xeno

准备发送online-user-list

parent 4f7d3b9c
use crate::config::config::STATIC_WS_PWD; use crate::config::config::STATIC_WS_PWD;
use crate::handles::close_connection::handle_connection_error;
use crate::handles::handle_messages::handle_other_message; use crate::handles::handle_messages::handle_other_message;
use crate::handles::heartbeat::handle_heartbeat; use crate::handles::handshake::handle_handshake;
use crate::handles::heartbeat::{handle_heartbeat, heart_resp};
use crate::handles::redis::{insert_this_connection, remove_this_connection};
use crate::typing::used_typed::{Connection, ConnectionMap, TaskMap}; use crate::typing::used_typed::{Connection, ConnectionMap, TaskMap};
use crate::utils::json_utils::{make_common_resp, parse_message}; use crate::utils::json_utils::parse_message;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::sync::Mutex as AsyncMutex; use tokio::sync::RwLock as AsyncRwLock;
use tokio::time; use tokio::time;
use tokio_tungstenite::accept_hdr_async; use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response}; use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
use crate::handles::close_connection::handle_connection_error;
use crate::handles::handshake::handle_handshake;
use crate::handles::redis::{insert_this_connection, remove_this_connection};
lazy_static! { lazy_static! {
static ref CONNECTIONS: ConnectionMap = Arc::new(AsyncMutex::new(HashMap::new())); pub static ref CONNECTIONS: ConnectionMap = Arc::new(AsyncRwLock::new(HashMap::new()));
static ref TASKS: TaskMap = Arc::new(Mutex::new(HashMap::new())); static ref TASKS: TaskMap = Arc::new(RwLock::new(HashMap::new()));
}
lazy_static! {
pub static ref ONLINE_USERS: Arc<AsyncRwLock<HashMap<String, String>>> = Arc::new(AsyncRwLock::new(HashMap::new()));
} }
// 关闭之前绑定的 WebSocket 连接并取消对应的任务 // 关闭之前绑定的 WebSocket 连接并取消对应的任务
async fn close_existing_connection(from_id: &str) { async fn close_existing_connection(from_id: &str) {
let task_to_abort = { let task_to_abort = {
let mut tasks = TASKS.lock().unwrap(); let mut tasks = TASKS.write().unwrap();
tasks.remove(from_id) tasks.remove(from_id)
}; };
...@@ -34,7 +37,7 @@ async fn close_existing_connection(from_id: &str) { ...@@ -34,7 +37,7 @@ async fn close_existing_connection(from_id: &str) {
} }
let old_connection = { let old_connection = {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.write().await;
let already_done = connections.get(&from_id.to_string()); let already_done = connections.get(&from_id.to_string());
println!( println!(
"关闭之前绑定的 WebSocket 连接: {} {:?}", "关闭之前绑定的 WebSocket 连接: {} {:?}",
...@@ -57,7 +60,6 @@ async fn close_existing_connection(from_id: &str) { ...@@ -57,7 +60,6 @@ async fn close_existing_connection(from_id: &str) {
} }
} }
pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> { pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> {
let must_existed_params = ["deviceId", "fromId", "wsPwd"]; let must_existed_params = ["deviceId", "fromId", "wsPwd"];
let mut connection_params = None; let mut connection_params = None;
...@@ -100,8 +102,8 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -100,8 +102,8 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
// 将新连接添加到全局连接映射 // 将新连接添加到全局连接映射
{ {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.write().await;
connections.insert(from_id.clone(), Connection { sender, receiver }); connections.insert(from_id.clone(), Connection { sender, receiver, from_id: from_id.to_string() });
} }
// 将该用户的信息插入到 Redis 中 // 将该用户的信息插入到 Redis 中
...@@ -113,7 +115,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -113,7 +115,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
let mut last_heartbeat_time = Instant::now(); let mut last_heartbeat_time = Instant::now();
loop { loop {
let current_connection = { let current_connection = {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.write().await;
connections.remove(&from_id_clone) // 移除连接,将其从 map 中取出 connections.remove(&from_id_clone) // 移除连接,将其从 map 中取出
}; };
if let Some(connection) = current_connection { if let Some(connection) = current_connection {
...@@ -132,7 +134,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -132,7 +134,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
"Heart" => { "Heart" => {
println!("收到客户端心跳消息 {:?}", &data); println!("收到客户端心跳消息 {:?}", &data);
handle_heartbeat(&mut last_heartbeat_time); handle_heartbeat(&mut last_heartbeat_time);
if let Ok(json_str) = make_common_resp(Default::default(), "Heart") { if let Ok(json_str) = heart_resp(&from_id_clone) {
if let Err(e) = sender_ref.send(Message::text(json_str)).await { if let Err(e) = sender_ref.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e); println!("发送心跳信息失败: {}", e);
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await; handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
...@@ -141,7 +143,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -141,7 +143,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
}, },
_ => { _ => {
if let Err(e) = handle_other_message(&mut sender_ref, &data).await { if let Err(e) = handle_other_message(&mut sender_ref, &data, &from_id_clone).await {
println!("Failed to handle other message: {}", e); println!("Failed to handle other message: {}", e);
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await; handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break; break;
...@@ -175,12 +177,13 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -175,12 +177,13 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
// 处理完一轮后,若连接还未断开,将其重新放回 CONNECTIONS // 处理完一轮后,若连接还未断开,将其重新放回 CONNECTIONS
{ {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.write().await;
connections.insert( connections.insert(
from_id_clone.clone(), from_id_clone.clone(),
Connection { Connection {
sender: sender_ref, sender: sender_ref,
receiver: receiver_ref, receiver: receiver_ref,
from_id: from_id_clone.to_string(),
}, },
); );
} }
...@@ -191,7 +194,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -191,7 +194,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
println!("断开与用户id: {},连接", from_id_clone); // 使用克隆后的 from_id println!("断开与用户id: {},连接", from_id_clone); // 使用克隆后的 from_id
// 从全局连接映射中移除该连接 // 从全局连接映射中移除该连接
{ {
let mut connections = CONNECTIONS.lock().await; let mut connections = CONNECTIONS.write().await;
connections.remove(&from_id_clone); // 使用克隆后的 from_id connections.remove(&from_id_clone); // 使用克隆后的 from_id
} }
// 从 Redis 中移除该用户的信息 // 从 Redis 中移除该用户的信息
...@@ -201,7 +204,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -201,7 +204,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
}); });
// 将任务句柄存储到全局任务映射中 // 将任务句柄存储到全局任务映射中
let mut tasks = TASKS.lock().unwrap(); let mut tasks = TASKS.write().unwrap();
tasks.insert(from_id, task); // 使用原始的 from_id tasks.insert(from_id, task); // 使用原始的 from_id
} }
} else { } else {
......
use crate::handles::redis::remove_this_connection; use crate::handles::redis::remove_this_connection;
use crate::typing::used_typed::{ConnectionMap, TaskMap}; use crate::typing::used_typed::{ConnectionMap, TaskMap};
use tokio::sync::RwLockWriteGuard;
pub async fn handle_connection_error( pub async fn handle_connection_error(
from_id: &str, from_id: &str,
...@@ -9,7 +10,7 @@ pub async fn handle_connection_error( ...@@ -9,7 +10,7 @@ pub async fn handle_connection_error(
println!("开始处理用户id: {} 的连接错误", from_id); println!("开始处理用户id: {} 的连接错误", from_id);
// 从全局连接映射中移除该连接 // 从全局连接映射中移除该连接
let removed = { let removed = {
let mut connections = connections.lock().await; let mut connections: RwLockWriteGuard<'_, _> = connections.write().await;
let result = connections.remove(from_id).is_some(); let result = connections.remove(from_id).is_some();
drop(connections); // 提前释放锁 drop(connections); // 提前释放锁
result result
...@@ -31,7 +32,7 @@ pub async fn handle_connection_error( ...@@ -31,7 +32,7 @@ pub async fn handle_connection_error(
// 取消对应的任务 // 取消对应的任务
{ {
let mut tasks = tasks.lock().unwrap(); let mut tasks = tasks.write().unwrap();
match tasks.remove(from_id) { match tasks.remove(from_id) {
Some(task) => { Some(task) => {
task.abort(); task.abort();
......
use crate::utils::json_utils::make_common_resp; use crate::typing::message_typed::ClientMessageData;
use futures::SinkExt; use futures::SinkExt;
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
use crate::typing::message_typed::MessageData;
// 处理其他类型消息 // 处理其他类型消息
pub(crate) async fn handle_other_message( pub(crate) async fn handle_other_message(
// 增加 + std::marker::Unpin 限制 // 增加 + std::marker::Unpin 限制
sender: &mut (impl SinkExt<Message, Error = Error> + Unpin), sender: &mut (impl SinkExt<Message, Error = Error> + Unpin),
data: &MessageData, data: &ClientMessageData,
from_id: &str,
) -> Result<(), Error> { ) -> Result<(), Error> {
println!("收到客户端消息: {:?}", data); println!("收到客户端消息: {:?}", data);
if let Ok(json_str) = make_common_resp(Default::default(), "Echo") {
sender.send(Message::text(json_str)).await?;
}
Ok(()) Ok(())
} }
use crate::utils::json_utils;
use serde::{Deserialize, Serialize};
use std::time::Instant; use std::time::Instant;
pub fn handle_heartbeat(last_time: &mut Instant) { pub fn handle_heartbeat(last_time: &mut Instant) {
*last_time = Instant::now(); *last_time = Instant::now();
} }
// 定义服务器心跳数据消息结构
#[derive(Serialize, Deserialize, Debug)]
pub struct HeartMessage {
#[serde(rename = "msgType")]
pub msg_type: String,
#[serde(rename = "fromId")]
pub from_id: String,
#[serde(rename = "fromName")]
pub from_name: String,
#[serde(rename = "toId")]
pub to_id: String,
#[serde(rename = "msgData")]
pub msg_data: String,
#[serde(rename = "time")]
pub time: u128,
}
pub fn heart_resp(to_id: &str) -> serde_json::Result<String> {
serde_json::to_string(&HeartMessage {
msg_type: "Heart".to_string(),
from_id: "0".to_string(),
from_name: "Server".to_string(),
msg_data: "".to_string(),
to_id: to_id.to_string(),
time: json_utils::get_current_timestamp(),
})
}
...@@ -3,3 +3,4 @@ pub mod heartbeat; ...@@ -3,3 +3,4 @@ pub mod heartbeat;
pub mod handshake; pub mod handshake;
pub mod redis; pub mod redis;
pub mod close_connection; pub mod close_connection;
pub mod online_users_update;
\ No newline at end of file
use crate::client::{CONNECTIONS, ONLINE_USERS};
use crate::utils::json_utils::get_current_timestamp;
use futures::SinkExt;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tungstenite::{Error, Message};
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct OnlineUserMessage {
#[serde(rename = "callState")]
pub call_state: String,
#[serde(rename = "channelID")]
pub channel_id: String,
#[serde(rename = "deviceID")]
pub device_id: String,
#[serde(rename = "fromID")]
pub from_id: String,
#[serde(rename = "hasCamera")]
pub has_camera: String,
#[serde(rename = "hasMike")]
pub has_mike: String,
#[serde(rename = "isHost")]
pub is_host: String,
#[serde(rename = "userCallGroup")]
pub user_call_group: String,
#[serde(rename = "fromName")]
pub from_name: String,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct OnlineUsersMessage {
#[serde(rename = "msgType")]
pub msg_type: String,
#[serde(rename = "fromId")]
pub from_id: String,
#[serde(rename = "fromName")]
pub from_name: String,
#[serde(rename = "toId")]
pub to_id: String,
#[serde(rename = "msgData")]
#[serde(serialize_with = "serialize_arc_vec")]
#[serde(deserialize_with = "deserialize_arc_vec")]
pub msg_data: Arc<Vec<OnlineUserMessage>>,
#[serde(rename = "time")]
pub time: u128,
}
fn serialize_arc_vec<S>(data: &Arc<Vec<OnlineUserMessage>>, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
(*data).serialize(serializer)
}
fn deserialize_arc_vec<'de, D>(deserializer: D) -> Result<Arc<Vec<OnlineUserMessage>>, D::Error>
where
D: serde::Deserializer<'de>,
{
let data = Vec::<OnlineUserMessage>::deserialize(deserializer)?;
Ok(Arc::new(data))
}
pub async fn send_multiple_messages() -> Result<(), Error> {
let mut msg_data = Vec::new();
{
let online_users = ONLINE_USERS.read().await;
for (_, user_info_str) in online_users.iter() {
let parts: Vec<&str> = user_info_str.split(',').collect();
if parts.len() == 9 {
let user_msg = OnlineUserMessage {
call_state: parts[0].to_string(),
channel_id: parts[1].to_string(),
device_id: parts[2].to_string(),
from_id: parts[3].to_string(),
has_camera: parts[4].to_string(),
has_mike: parts[5].to_string(),
is_host: parts[6].to_string(),
user_call_group: parts[7].to_string(),
from_name: parts[8].to_string(),
};
msg_data.push(user_msg);
}
}
}
let msg_data = Arc::new(msg_data);
let mut connections = CONNECTIONS.write().await;
for (_, connection) in connections.iter_mut() {
let to_id = connection.from_id.to_string();
let message = OnlineUsersMessage {
msg_type: "GetOnlineUserList".to_string(),
from_id: "0".to_string(),
from_name: "Server".to_string(),
to_id,
msg_data: msg_data.clone(),
time: get_current_timestamp(),
};
let json_result = serde_json::to_string(&message);
if let Err(e) = json_result {
println!("序列化消息给用户 {} 失败: {}", connection.from_id, e);
continue;
}
let json = json_result.unwrap();
if let Err(e) = connection.sender.send(Message::text(json)).await {
println!("发送消息给用户 {} 失败: {}", connection.from_id, e);
}
}
Ok(())
}
\ No newline at end of file
...@@ -4,6 +4,7 @@ use redis::Client; ...@@ -4,6 +4,7 @@ use redis::Client;
use redis::Commands; use redis::Commands;
use redis_pool::SingleRedisPool; use redis_pool::SingleRedisPool;
use std::collections::HashMap; use std::collections::HashMap;
use crate::client::ONLINE_USERS;
lazy_static! { lazy_static! {
static ref REDIS_POOL: SingleRedisPool = { static ref REDIS_POOL: SingleRedisPool = {
...@@ -23,7 +24,12 @@ pub async fn remove_this_connection(from_id: &str) -> Result<(), redis::RedisErr ...@@ -23,7 +24,12 @@ pub async fn remove_this_connection(from_id: &str) -> Result<(), redis::RedisErr
return Err(e); return Err(e);
} }
println!("成功从 Redis 中移除用户id: {} 的信息", from_id); {
// 获取写锁以进行写操作
let mut online_users = ONLINE_USERS.write().await;
online_users.remove(from_id);
println!("成功从全局变量中移除用户id: {} 的信息", from_id);
}
Ok(()) Ok(())
} }
...@@ -39,6 +45,8 @@ pub async fn insert_this_connection( ...@@ -39,6 +45,8 @@ pub async fn insert_this_connection(
let device_id = params.get("deviceId").cloned().unwrap_or_default(); let device_id = params.get("deviceId").cloned().unwrap_or_default();
let from_name = params.get("fromName").cloned().unwrap_or_default(); let from_name = params.get("fromName").cloned().unwrap_or_default();
// 按照结构体OnlineUserMessage的格式构造用户信息字符串,用逗号拼接即可
// callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName
let user_info_str = format!( let user_info_str = format!(
"{},{},{},1,1,0,0,{}", "{},{},{},1,1,0,0,{}",
"idle", "idle",
...@@ -55,6 +63,11 @@ pub async fn insert_this_connection( ...@@ -55,6 +63,11 @@ pub async fn insert_this_connection(
return Err(e); return Err(e);
} }
{
// 获取写锁以进行写操作
let mut online_users = ONLINE_USERS.write().await;
online_users.insert(from_id.to_string(), user_info_str);
println!("成功将用户id: {} 的信息插入到 Redis 中", from_id); println!("成功将用户id: {} 的信息插入到 Redis 中", from_id);
}
Ok(()) Ok(())
} }
\ No newline at end of file
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct OnlineUserMessage {
#[serde(rename = "callState")]
pub call_state: String,
#[serde(rename = "channelID")]
pub channel_id: String,
#[serde(rename = "deviceID")]
pub device_id: String,
#[serde(rename = "fromID")]
pub from_id: String,
#[serde(rename = "hasCamera")]
pub has_camera: String,
#[serde(rename = "hasMike")]
pub has_mike: String,
#[serde(rename = "isHost")]
pub is_host: String,
#[serde(rename = "userCallGroup")]
pub user_call_group: String,
#[serde(rename = "fromName")]
pub from_name: String,
}
// 定义消息结构 // 定义消息结构
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub struct MessageData { pub struct ClientMessageData {
#[serde(rename = "msgType")] #[serde(rename = "msgType")]
pub msg_type: String, pub msg_type: String,
#[serde(rename = "fromId")] #[serde(rename = "fromId")]
......
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, RwLock};
use tokio::sync::Mutex as AsyncMutex; use tokio::sync::RwLock as AsyncRwLock;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
use tungstenite::Message; use tungstenite::Message;
...@@ -10,9 +10,8 @@ use tungstenite::Message; ...@@ -10,9 +10,8 @@ use tungstenite::Message;
pub struct Connection { pub struct Connection {
pub(crate) sender: futures::stream::SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>, pub(crate) sender: futures::stream::SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
pub(crate) receiver: futures::stream::SplitStream<WebSocketStream<tokio::net::TcpStream>>, pub(crate) receiver: futures::stream::SplitStream<WebSocketStream<tokio::net::TcpStream>>,
pub(crate) from_id: String,
} }
// 全局连接映射,存储 fromId 到 Connection 的映射 pub type ConnectionMap = Arc<AsyncRwLock<HashMap<String, Connection>>>;
pub type ConnectionMap = Arc<AsyncMutex<HashMap<String, Connection>>>; pub type TaskMap = Arc<RwLock<HashMap<String, JoinHandle<()>>>>;
// 全局任务映射,存储 fromId 到 JoinHandle 的映射 \ No newline at end of file
pub type TaskMap = Arc<Mutex<HashMap<String, JoinHandle<()>>>>;
use crate::typing::message_typed::ClientMessageData;
use serde_json::Result; use serde_json::Result;
use crate::typing::message_typed::MessageData; use std::time::{SystemTime, UNIX_EPOCH};
pub fn make_common_resp(value: serde_json::Value, msg_type: &str) -> Result<String> { pub fn get_current_timestamp() -> u128 {
serialize_message(&MessageData { let now = SystemTime::now();
msg_type: msg_type.to_string(), let duration_since_epoch = now.duration_since(UNIX_EPOCH).expect("时间戳计算出错");
from_id: "0".to_string(), duration_since_epoch.as_millis()
from_name: "Server".to_string(),
msg_data: value,
})
} }
// 解析 JSON 消息 // 解析 JSON 消息
pub fn parse_message(json_str: &str) -> Result<MessageData> { pub fn parse_message(json_str: &str) -> Result<ClientMessageData> {
serde_json::from_str(json_str) serde_json::from_str(json_str)
} }
// 序列化消息为 JSON 字符串
pub fn serialize_message(msg: &MessageData) -> Result<String> {
serde_json::to_string(msg)
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment