Commit 4f7d3b9c by qlintonger xeno

redis处理存储onlineUsers以及分包

parent 8fd6a07a
use crate::config::config::{REDIS_ADDR, STATIC_WS_PWD};
use crate::config::config::STATIC_WS_PWD;
use crate::handles::handle_messages::handle_other_message;
use crate::handles::heartbeat::handle_heartbeat;
use crate::typing::used_typed::{Connection, ConnectionMap, TaskMap};
use crate::utils::json_utils::{make_common_resp, parse_message};
use crate::utils::utils::get_connection_params;
use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static;
use redis::Client;
use redis::Commands;
use redis_pool::SingleRedisPool;
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
......@@ -17,13 +13,9 @@ use tokio::time;
use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message};
lazy_static! {
static ref REDIS_POOL: SingleRedisPool = {
let client = Client::open(REDIS_ADDR).expect("Failed to connect to Redis");
redis_pool::RedisPool::new(client, 16, Some(512))
};
}
use crate::handles::close_connection::handle_connection_error;
use crate::handles::handshake::handle_handshake;
use crate::handles::redis::{insert_this_connection, remove_this_connection};
lazy_static! {
static ref CONNECTIONS: ConnectionMap = Arc::new(AsyncMutex::new(HashMap::new()));
......@@ -58,129 +50,20 @@ async fn close_existing_connection(from_id: &str) {
}
println!("关闭旧的 WebSocket 连接: {}", from_id);
}
}
// 更新 update_connected_redis 函数,使其返回 Result 类型
async fn update_connected_redis() -> Result<(), redis::RedisError> {
let from_ids: Vec<String>;
{
let connections = CONNECTIONS.lock().await;
from_ids = connections.keys().cloned().collect();
}
println!("当前全局连接映射中的 from_id 数量: {}", from_ids.len());
let mut con = REDIS_POOL
.get_connection()
.expect("Failed to get Redis connection");
// 先清空集合
println!("开始清空 Redis 中的 connected 集合");
if let Err(e) = con.del::<_, ()>("connected") {
println!("清空 Redis 中的 connected 集合时出错: {}", e);
return Err(e);
}
// 将 from_ids 一次性批量添加到集合中
if !from_ids.is_empty() {
println!("开始将 from_id 批量添加到 Redis 中的 connected 集合");
if let Err(e) = con.sadd::<_, _, ()>("connected", from_ids.as_slice()) {
println!(
"将 from_id 批量添加到 Redis 中的 connected 集合时出错: {}",
e
);
return Err(e);
}
}
println!("成功更新 Redis 中的 connected 集合");
Ok(())
}
// 提取出来的处理握手的函数
fn handle_handshake(
req: &Request,
must_existed_params: &[&str],
) -> Result<HashMap<String, String>, String> {
println!("新客户端连接: {}", req.uri());
let connection_params = match get_connection_params(req.uri().to_string()) {
Ok(p) => p,
Err(e) => {
let error_msg = format!("缺少重要连接数据段: {}", e);
println!("{}", error_msg);
return Err(error_msg);
}
};
let not_existed = must_existed_params
.iter()
.find(|param| !connection_params.contains_key(&param.to_string()));
if let Some(param) = not_existed {
let error_msg = format!("缺少重要连接数据段: {}", param);
println!("{}", error_msg);
return Err(error_msg);
}
if connection_params.get("wsPwd").unwrap() != (STATIC_WS_PWD) {
println!("wsPwd不正确!");
return Err("wsPwd不正确!".to_string());
// 从 Redis 中移除该用户的信息
if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
Ok(connection_params)
}
async fn handle_connection_error(from_id: &str) {
println!("开始处理用户id: {} 的连接错误", from_id);
// 从全局连接映射中移除该连接
let removed = {
let mut connections = CONNECTIONS.lock().await;
let result = connections.remove(from_id).is_some();
drop(connections); // 提前释放锁
result
};
println!(
"是否成功从全局连接映射中移除用户id: {},结果: {}",
from_id, removed
);
// 更新 Redis 中的 connected 集合
if removed {
match update_connected_redis().await {
Ok(_) => println!(
"成功更新 Redis 中的 connected 集合,移除用户id: {}",
from_id
),
Err(e) => println!(
"更新 Redis 中的 connected 集合时出错,用户id: {},错误信息: {}",
from_id, e
),
}
}
println!("开始尝试关闭用户id: {} 的 redis 连接", from_id);
// 取消对应的任务
{
let mut tasks = TASKS.lock().unwrap();
match tasks.remove(from_id) {
Some(task) => {
task.abort();
println!("成功取消用户id: {} 的任务", from_id);
}
None => println!("未在全局任务映射中找到用户id: {} 的任务", from_id),
}
}
println!("断开与用户id: {} 的连接并完成清理操作", from_id);
}
pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> {
let must_existed_params = ["deviceId", "fromId", "wsPwd"];
let mut connection_params = None;
let ws_stream = match accept_hdr_async(stream, |req: &Request, resp| {
match handle_handshake(req, &must_existed_params) {
match handle_handshake(req, &must_existed_params, STATIC_WS_PWD) {
Ok(params) => {
connection_params = Some(params);
Ok(resp)
......@@ -221,8 +104,10 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
connections.insert(from_id.clone(), Connection { sender, receiver });
}
// 更新 Redis 中的 connected 集合
let _ = update_connected_redis().await;
// 将该用户的信息插入到 Redis 中
if let Err(e) = insert_this_connection(&from_id, &params).await {
println!("将用户信息插入到 Redis 中时出错: {}", e);
}
let task = tokio::spawn(async move {
let mut last_heartbeat_time = Instant::now();
......@@ -231,7 +116,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
let mut connections = CONNECTIONS.lock().await;
connections.remove(&from_id_clone) // 移除连接,将其从 map 中取出
};
if let Some(mut connection) = current_connection {
if let Some(connection) = current_connection {
let (mut receiver_ref, mut sender_ref) =
(connection.receiver, connection.sender);
tokio::select! {
......@@ -250,7 +135,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
if let Ok(json_str) = make_common_resp(Default::default(), "Heart") {
if let Err(e) = sender_ref.send(Message::text(json_str)).await {
println!("发送心跳信息失败: {}", e);
handle_connection_error(&from_id_clone).await;
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break;
}
}
......@@ -258,7 +143,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
_ => {
if let Err(e) = handle_other_message(&mut sender_ref, &data).await {
println!("Failed to handle other message: {}", e);
handle_connection_error(&from_id_clone).await;
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break;
}
}
......@@ -272,12 +157,12 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
}
Some(Err(e)) => {
println!("接受客户端消息出错: {}", e);
handle_connection_error(&from_id_clone).await;
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break;
}
None => {
println!("客户端断开连接");
handle_connection_error(&from_id_clone).await;
handle_connection_error(&from_id_clone, &CONNECTIONS, &TASKS).await;
break;
}
}
......@@ -309,8 +194,10 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
let mut connections = CONNECTIONS.lock().await;
connections.remove(&from_id_clone); // 使用克隆后的 from_id
}
// 更新 Redis 中的 connected 集合
let _ = update_connected_redis().await;
// 从 Redis 中移除该用户的信息
if let Err(e) = remove_this_connection(&from_id_clone).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
});
// 将任务句柄存储到全局任务映射中
......
use crate::handles::redis::remove_this_connection;
use crate::typing::used_typed::{ConnectionMap, TaskMap};
pub async fn handle_connection_error(
from_id: &str,
connections: &ConnectionMap,
tasks: &TaskMap,
) {
println!("开始处理用户id: {} 的连接错误", from_id);
// 从全局连接映射中移除该连接
let removed = {
let mut connections = connections.lock().await;
let result = connections.remove(from_id).is_some();
drop(connections); // 提前释放锁
result
};
println!(
"是否成功从全局连接映射中移除用户id: {},结果: {}",
from_id, removed
);
// 从 Redis 中移除该用户的信息
if removed {
if let Err(e) = remove_this_connection(from_id).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
}
println!("开始尝试关闭用户id: {} 的 redis 连接", from_id);
// 取消对应的任务
{
let mut tasks = tasks.lock().unwrap();
match tasks.remove(from_id) {
Some(task) => {
task.abort();
println!("成功取消用户id: {} 的任务", from_id);
}
None => println!("未在全局任务映射中找到用户id: {} 的任务", from_id),
}
}
println!("断开与用户id: {} 的连接并完成清理操作", from_id);
}
\ No newline at end of file
use crate::utils::json_utils::{make_common_resp, MessageData};
use crate::utils::json_utils::make_common_resp;
use futures::SinkExt;
use tungstenite::{Error, Message};
use crate::typing::message_typed::MessageData;
// 处理其他类型消息
pub(crate) async fn handle_other_message(
......@@ -9,7 +10,7 @@ pub(crate) async fn handle_other_message(
data: &MessageData,
) -> Result<(), Error> {
println!("收到客户端消息: {:?}", data);
if let Ok(json_str) = make_common_resp(Default::default(), "") {
if let Ok(json_str) = make_common_resp(Default::default(), "Echo") {
sender.send(Message::text(json_str)).await?;
}
Ok(())
......
use std::collections::HashMap;
// 提取出来的处理握手的函数
pub(crate) fn handle_handshake(
req: &tungstenite::handshake::server::Request,
must_existed_params: &[&str],
static_ws_pwd: &str,
) -> Result<HashMap<String, String>, String> {
println!("新客户端连接: {}", req.uri());
let connection_params = match crate::utils::utils::get_connection_params(req.uri().to_string()) {
Ok(p) => p,
Err(e) => {
let error_msg = format!("缺少重要连接数据段: {}", e);
println!("{}", error_msg);
return Err(error_msg);
}
};
let not_existed = must_existed_params
.iter()
.find(|param| !connection_params.contains_key(&param.to_string()));
if let Some(param) = not_existed {
let error_msg = format!("缺少重要连接数据段: {}", param);
println!("{}", error_msg);
return Err(error_msg);
}
if connection_params.get("wsPwd").unwrap() != static_ws_pwd {
println!("wsPwd不正确!");
return Err("wsPwd不正确!".to_string());
}
Ok(connection_params)
}
\ No newline at end of file
pub mod handle_messages;
pub mod heartbeat;
pub mod handshake;
pub mod redis;
pub mod close_connection;
\ No newline at end of file
use crate::config::config::REDIS_ADDR;
use lazy_static::lazy_static;
use redis::Client;
use redis::Commands;
use redis_pool::SingleRedisPool;
use std::collections::HashMap;
lazy_static! {
static ref REDIS_POOL: SingleRedisPool = {
let client = Client::open(REDIS_ADDR).expect("Failed to connect to Redis");
redis_pool::RedisPool::new(client, 16, Some(512))
};
}
// 从 Redis 的 onlineUsers 集合中移除当前用户的信息
pub async fn remove_this_connection(from_id: &str) -> Result<(), redis::RedisError> {
let mut con = REDIS_POOL
.get_connection()
.expect("Failed to get Redis connection");
if let Err(e) = con.hdel::<&str, &str, ()>("onlineUsers", from_id) {
println!("从 Redis 中的 onlineUsers 哈希表删除用户信息时出错: {}", e);
return Err(e);
}
println!("成功从 Redis 中移除用户id: {} 的信息", from_id);
Ok(())
}
// 将当前用户的信息插入到 Redis 的 onlineUsers 集合中
pub async fn insert_this_connection(
from_id: &str,
params: &HashMap<String, String>,
) -> Result<(), redis::RedisError> {
let mut con = REDIS_POOL
.get_connection()
.expect("Failed to get Redis connection");
let device_id = params.get("deviceId").cloned().unwrap_or_default();
let from_name = params.get("fromName").cloned().unwrap_or_default();
let user_info_str = format!(
"{},{},{},1,1,0,0,{}",
"idle",
"",
device_id,
from_name
);
if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &user_info_str) {
println!(
"将用户信息添加到 Redis 中的 onlineUsers 哈希表时出错: {}",
e
);
return Err(e);
}
println!("成功将用户id: {} 的信息插入到 Redis 中", from_id);
Ok(())
}
\ No newline at end of file
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct OnlineUserMessage {
#[serde(rename = "callState")]
pub call_state: String,
#[serde(rename = "channelID")]
pub channel_id: String,
#[serde(rename = "deviceID")]
pub device_id: String,
#[serde(rename = "fromID")]
pub from_id: String,
#[serde(rename = "hasCamera")]
pub has_camera: String,
#[serde(rename = "hasMike")]
pub has_mike: String,
#[serde(rename = "isHost")]
pub is_host: String,
#[serde(rename = "userCallGroup")]
pub user_call_group: String,
#[serde(rename = "fromName")]
pub from_name: String,
}
// 定义消息结构
#[derive(Serialize, Deserialize, Debug)]
pub struct MessageData {
#[serde(rename = "msgType")]
pub msg_type: String,
#[serde(rename = "fromId")]
pub from_id: String,
#[serde(rename = "fromName")]
pub from_name: String,
#[serde(rename = "msgData")]
pub msg_data: serde_json::Value,
}
\ No newline at end of file
pub mod used_typed;
pub mod message_typed;
use serde::{Deserialize, Serialize};
use serde_json::Result;
// 定义消息结构
#[derive(Serialize, Deserialize, Debug)]
pub struct MessageData {
#[serde(rename = "msgType")]
pub msg_type: String,
#[serde(rename = "fromId")]
pub from_id: String,
#[serde(rename = "fromName")]
pub from_name: String,
#[serde(rename = "msgData")]
pub msg_data: serde_json::Value,
}
use crate::typing::message_typed::MessageData;
pub fn make_common_resp(value: serde_json::Value, msg_type: &str) -> Result<String> {
serialize_message(&MessageData {
......
import redis
# 连接到 Redis 服务器
r = redis.Redis(host='localhost', port=6379, db=0)
# 哈希表的键名
hash_key = 'onlineUsers'
# 使用 hgetall 方法获取哈希表的所有字段和值
result = r.hgetall(hash_key)
# 遍历结果
for field, value in result.items():
print(f"Field: {field.decode('utf-8')}, Value: {value.decode('utf-8')}")
\ No newline at end of file
......@@ -7,7 +7,7 @@ import json
# 服务器地址
SERVER_URL = "ws://localhost:12345"
# 客户端数量
CLIENT_COUNT = 200
CLIENT_COUNT = 10
# wsPWD
WS_PWD = "Q8kFm5LzJ2Ab"
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment