Commit 0849b979 by qlintonger xeno

redis-int

parent 2cb248b0
...@@ -6,7 +6,10 @@ edition = "2021" ...@@ -6,7 +6,10 @@ edition = "2021"
[dependencies] [dependencies]
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread", "macros", "time"] } tokio = { version = "1.42.0", features = ["full"] }
tokio-tungstenite = "0.26.1" tokio-tungstenite = "0.26.1"
tungstenite = "0.26.1" tungstenite = "0.26.1"
futures = "0.3.31" futures = "0.3.31"
\ No newline at end of file redis = "0.28.2"
redis_pool = "0.7.0"
lazy_static = "1.4"
\ No newline at end of file
...@@ -4,11 +4,66 @@ use crate::json_utils::{make_common_resp, parse_message}; ...@@ -4,11 +4,66 @@ use crate::json_utils::{make_common_resp, parse_message};
use crate::utils; use crate::utils;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::{Arc};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use tokio::time; use tokio::time;
use tokio_tungstenite::accept_hdr_async; use tokio_tungstenite::{accept_hdr_async, WebSocketStream};
use tungstenite::handshake::server::{Request, Response}; use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
use tokio::sync::Mutex;
use lazy_static::lazy_static;
use redis::{Client};
use redis_pool::{ SingleRedisPool};
use redis::Commands; // 新增导入
lazy_static! {
static ref REDIS_POOL: SingleRedisPool = {
let client = Client::open("redis://127.0.0.1:6379/").expect("Failed to connect to Redis");
redis_pool::RedisPool::new(client, 16, Some(512))
};
}
// 自定义结构体来存储发送器和接收器
struct Connection {
sender: futures::stream::SplitSink<WebSocketStream<tokio::net::TcpStream>, Message>,
receiver: futures::stream::SplitStream<WebSocketStream<tokio::net::TcpStream>>,
}
// 全局连接映射,存储 fromId 到 Connection 的映射
type ConnectionMap = Arc<Mutex<HashMap<String, Connection>>>;
lazy_static! {
static ref CONNECTIONS: ConnectionMap = Arc::new(Mutex::new(HashMap::new()));
}
// 关闭之前绑定的 WebSocket 连接
async fn close_existing_connection(from_id: &str) {
let mut connections = CONNECTIONS.lock();
if let Some(mut old_connection) = connections.await.remove(from_id) {
// 尝试优雅地关闭旧连接
if let Err(e) = old_connection.sender.close().await {
println!("关闭旧的 WebSocket 发送器时出错: {}", e);
}
println!("关闭旧的 WebSocket 连接: {}", from_id);
}
}
// 更新 Redis 中 connected 数组
async fn update_connected_redis() {
let connections = CONNECTIONS.lock().await;
let from_ids: Vec<String> = connections.keys().cloned().collect();
let mut con = REDIS_POOL.get_connection().expect("Failed to get Redis connection");
// 先清空原有的列表
if let Err(e) = con.del::<_, ()>("connected") {
println!("Failed to delete old connected list in Redis: {}", e);
}
// 将每个 fromId 依次添加到列表中
for from_id in from_ids {
if let Err(e) = con.rpush::<_, _, ()>("connected", from_id) {
println!("Failed to add fromId to connected list in Redis: {}", e);
}
}
}
// 提取出来的处理握手的函数 // 提取出来的处理握手的函数
fn handle_handshake( fn handle_handshake(
...@@ -70,14 +125,31 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -70,14 +125,31 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
}; };
// 将 WebSocketStream 拆分为发送器和接收器 // 将 WebSocketStream 拆分为发送器和接收器
let (mut sender, mut receiver) = ws_stream.split(); let (sender, receiver) = ws_stream.split();
if let Some(params) = connection_params { if let Some(params) = connection_params {
if let Some(from_id) = params.get("fromId") { if let Some(from_id) = params.get("fromId") {
let from_id = from_id.clone(); let from_id = from_id.clone();
// 从 Redis 连接池获取连接
let mut con = REDIS_POOL.get_connection().expect("Failed to get Redis connection");
// 检查 Redis 中是否已经存在该 fromId
close_existing_connection(&from_id).await;
// 将新连接添加到全局连接映射
CONNECTIONS.lock().await.insert(from_id.clone(), Connection { sender, receiver });
// 更新 Redis 中的 connected 数组
update_connected_redis().await;
let mut last_heartbeat_time = Instant::now(); let mut last_heartbeat_time = Instant::now();
loop { loop {
let mut connections = CONNECTIONS.lock().await;
let current_connection = connections.get_mut(&from_id).unwrap();
let receiver = &mut current_connection.receiver;
let sender = &mut current_connection.sender;
tokio::select! { tokio::select! {
// 处理消息接收 // 处理消息接收
maybe_msg = receiver.next() => { maybe_msg = receiver.next() => {
...@@ -96,7 +168,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -96,7 +168,7 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
}, },
_ => { _ => {
handle_other_message(&mut sender, &data).await?; handle_other_message(sender, &data).await?;
} }
} }
} }
...@@ -125,6 +197,10 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E ...@@ -125,6 +197,10 @@ pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), E
} }
println!("断开与用户id: {},连接", from_id); println!("断开与用户id: {},连接", from_id);
// 从全局连接映射中移除该连接
CONNECTIONS.lock().await.remove(&from_id);
// 更新 Redis 中的 connected 数组
update_connected_redis().await;
} }
} else { } else {
println!("无法获取连接参数"); println!("无法获取连接参数");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment