Commit 3fde40ca by qlintonger xeno

再次更新+1

parent 1b2984f7
......@@ -59,9 +59,10 @@ pub(crate) async fn handle_client(
register_client((from_id.clone(), connection_time), center_to_client_sender).await;
// 发送新连接事件
event_sender
.send(Event::NewConnection(from_id.clone(), params))
.unwrap();
match event_sender.send(Event::NewConnection(from_id.clone(), params)) {
Ok(_) => println!("新连接事件发送成功"),
Err(e) => println!("发送新连接事件时出错: {:?}", e),
}
println!("用户 {} 已连接", from_id);
......@@ -100,7 +101,10 @@ pub(crate) async fn handle_client(
}
if close {
// 通知外层循环关闭
close_tx.send(true).unwrap();
match close_tx.send(true) {
Ok(_) => println!("关闭信号发送成功"),
Err(e) => println!("发送关闭信号时出错: {:?}", e),
}
println!("发送给用户id {} 要求关闭连接", real_user_id);
break;
}
......@@ -146,7 +150,10 @@ pub(crate) async fn handle_client(
}
} else {
// 断开连接之后直接移除即可
event_sender.send(Event::CloseConnection((from_id.clone(), connection_time))).unwrap();
match event_sender.send(Event::CloseConnection((from_id.clone(), connection_time))) {
Ok(_) => println!("关闭连接事件发送成功"),
Err(e) => println!("发送关闭连接事件时出错: {:?}", e),
}
break; // 客户端断开连接
}
}
......@@ -154,7 +161,10 @@ pub(crate) async fn handle_client(
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
println!("用户id-{} 20秒内没有发送心跳,挂断连接", from_id);
// 发送关闭连接事件
event_sender.send(Event::CloseConnection((from_id.clone(), connection_time))).unwrap();
match event_sender.send(Event::CloseConnection((from_id.clone(), connection_time))) {
Ok(_) => println!("关闭连接事件发送成功"),
Err(e) => println!("发送关闭连接事件时出错: {:?}", e),
}
break;
}
// 监听关闭通知
......
......@@ -5,6 +5,8 @@ use lazy_static::lazy_static;
use serde_json::Value::Null;
use std::collections::HashMap;
use tokio::sync::mpsc;
use urlencoding::decode;
use crate::client::ONLINE_USERS;
// 定义事件类型
#[derive(Debug)]
......@@ -67,9 +69,15 @@ pub async fn close_existing_connection(from_id: &(String, u128), close_old: bool
println!("通知客户端 {:?} 关闭连接成功", key);
}
}
{
ONLINE_USERS.remove(&key.0.clone());
println!("成功从全局变量中移除用户id: {:?} 的信息", from_id);
}
tokio::spawn(async move {
if let Err(e) = remove_this_connection(&key.0).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
});
}
} else {
// 关闭当前连接
......@@ -87,12 +95,50 @@ pub async fn close_existing_connection(from_id: &(String, u128), close_old: bool
println!("通知客户端 {:?} 关闭连接成功", from_id);
}
}
if let Err(e) = remove_this_connection(real_user_id).await {
{
ONLINE_USERS.remove(&real_user_id.clone());
println!("成功从全局变量中移除用户id: {:?} 的信息", from_id);
}
let user_id_clone = real_user_id.clone();
tokio::spawn(async move{
if let Err(e) = remove_this_connection(user_id_clone.as_str()).await {
println!("从 Redis 中移除用户信息时出错: {}", e);
}
});
}
}
// 创建连接参数集合
fn create_connection_params(from_id: &str, params: &HashMap<String, String>,) -> String {
let device_id = params.get("deviceID").cloned().unwrap_or("".to_string());
let from_name = params.get("fromName").cloned().unwrap_or("".to_string());
let has_mike = if let Some(has_mike) = params.get("hasMike") {
has_mike.to_string()
} else {
"0".to_string()
};
let has_camera = if let Some(has_camera) = params.get("hasCamera") {
has_camera.to_string()
} else {
"0".to_string()
};
let user_call_group = if let Some(user_call_group) = params.get("userCallGroup") {
user_call_group.to_string()
} else {
"0".to_string()
};
let from_name_decoded = match decode(from_name.as_str()) {
Ok(decoded) => decoded.to_string(),
Err(_) => from_name,
};
// 按照结构体OnlineUserMessage的格式构造用户信息字符串,用逗号拼接即可
// callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName
format!(
"{},{},{},{},{},{},0,{},{}",
"idle", "", device_id, from_id, has_camera, has_mike, user_call_group, from_name_decoded
)
}
// 处理事件的任务
pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
while let Some(event) = receiver.recv().await {
......@@ -100,9 +146,19 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
Event::NewConnection(from_id, params) => {
println!("有新的连接 用户id {} 更新在线用户列表事件触发", from_id);
// 将该用户的信息插入到 Redis 中
if let Err(e) = insert_this_connection(&from_id, &params).await {
let connection_p = create_connection_params(
&from_id, &params
);
{
// 获取写锁以进行写操作
ONLINE_USERS.insert(from_id.to_string(), connection_p.clone());
println!("成功将用户id: {} 的信息插入到 ONLINE_USERS 中", from_id);
}
tokio::spawn(async move {
if let Err(e) = insert_this_connection(&from_id, connection_p).await {
println!("将用户信息插入到 Redis 中时出错: {}", e);
}
});
notify_all_clients_to_update_online_users().await;
}
Event::CloseConnection(from_id) => {
......
......@@ -90,6 +90,29 @@ fn get_users_by_channel(channel_id: &str) -> Vec<String> {
.collect()
}
fn hangup_last_user(user_id: String, event_sender_clone: UnboundedSender<Event>, channel_id_clone: String, mut cancel_rx: mpsc::UnboundedReceiver<()>) {
tokio::spawn(async move {
tokio::select! {
_ = sleep(Duration::from_secs(15)) => {
println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {}", user_id);
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
let joined = update_user_status(&user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined);
send_inside_message(&sender, &event_sender_clone, serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": user_id,
"msgData": {"channelId": "", "rtcToken": ""}
}).to_string(), &user_id).await;
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
}
}
_ = cancel_rx.recv() => println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone),
}
});
}
pub async fn handle_agora_call(
client_message_data: &ClientMessageData,
from_id: &String,
......@@ -377,31 +400,15 @@ pub async fn handle_agora_call(
}
}
if remaining_users.len() == 1 {
// 判断时候已经存在挂断任务
let user_id = remaining_users[0].clone();
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
if !channel_hangup_procedure_map.contains_key(&channel_id) {
let (cancel_tx, cancel_rx) = mpsc::unbounded_channel::<()>();
channel_hangup_procedure_map.insert(channel_id.clone(), cancel_tx);
let event_sender_clone = event_sender.clone();
let channel_id_clone = channel_id.clone();
tokio::spawn(async move {
tokio::select! {
_ = sleep(Duration::from_secs(15)) => {
println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {}", user_id);
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
let joined = update_user_status(&user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined);
send_inside_message(&sender, &event_sender_clone, serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": user_id,
"msgData": {"channelId": "", "rtcToken": ""}
}).to_string(), &user_id).await;
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
}
}
_ = cancel_rx.recv() => println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone),
hangup_last_user(user_id, event_sender_clone, channel_id_clone, cancel_rx);
}
});
} else {
let (allowed, disallowed): (Vec<_>, Vec<_>) = remaining_users.into_iter().partition(|id| HOST_ENABLED_ID_SET.contains(id.as_str()));
let new_host = allowed.into_iter().next().or(disallowed.into_iter().next());
......@@ -474,9 +481,12 @@ pub async fn handle_agora_call(
} else {
let channel_id = user_data_vec[CHANNEL_IDX].clone();
let users = get_users_by_channel(&channel_id);
for user_id in users {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
if user_id == client_message_data.to_id {
println!("step - KickOut - 2 获取到频道 {:?} 的所有在线用户 {:?}", channel_id, users);
// 存储最后一位的用户
let mut last_user_id = String::new();
for user_id in users.iter() {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == *user_id).map(|e| e.key().clone()) {
if *user_id == client_message_data.to_id {
let joined = update_user_status(&user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined);
send_inside_message(&sender, event_sender, serde_json::json!({
......@@ -487,16 +497,27 @@ pub async fn handle_agora_call(
"toID": user_id
}).to_string(), &user_id).await;
} else {
last_user_id = user_id.clone();
send_inside_message(&sender, event_sender, serde_json::json!({
"msgType": "CmdKickOut",
"msgData": {"channelID": channel_id, "rtcToken": ""},
"toID": client_message_data.to_id,
"toID": &user_id,
"fromName": "Server",
"fromID": "0"
}).to_string(), &user_id).await;
}
}
}
// 判断是否只留下一人
println!("step - KickOut - 3 判断是否只留下一人 {}", users.len());
if users.len() <= 2 {
if !channel_hangup_procedure_map.contains_key(&channel_id) {
// 创建channel挂断
let (cancel_tx, cancel_rx) = mpsc::unbounded_channel::<()>();
channel_hangup_procedure_map.insert(channel_id.clone(), cancel_tx);
hangup_last_user(last_user_id, event_sender.clone(), channel_id, cancel_rx);
}
}
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
}
}
......
use crate::client::ONLINE_USERS;
use crate::config::config::REDIS_ADDR;
use lazy_static::lazy_static;
use redis::Client;
use redis::Commands;
use redis_pool::SingleRedisPool;
use std::collections::HashMap;
use urlencoding::decode;
lazy_static! {
static ref REDIS_POOL: SingleRedisPool = {
......@@ -16,14 +13,13 @@ lazy_static! {
// 从 Redis 的 onlineUsers 集合中移除当前用户的信息
pub async fn remove_this_connection(from_id: &str) -> Result<(), redis::RedisError> {
{
ONLINE_USERS.remove(from_id);
println!("成功从全局变量中移除用户id: {} 的信息", from_id);
}
let mut con = REDIS_POOL
.get_connection()
.expect("Failed to get Redis connection");
.map_err(|e| {
println!("Failed to get Redis connection: {}", e);
e
})?;
if let Err(e) = con.hdel::<&str, &str, ()>("onlineUsers", from_id) {
println!("从 Redis 中的 onlineUsers 哈希表删除用户信息时出错: {}", e);
......@@ -39,7 +35,10 @@ pub async fn update_client_redis_data(
// 修改对应用户redis数据
let mut con = REDIS_POOL
.get_connection()
.expect("Failed to get Redis connection");
.map_err(|e| {
println!("Failed to get Redis connection: {}", e);
e
})?;
if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &data_str) {
println!("修改 Redis 中的 onlineUsers 哈希表时出错: {}", e);
return Err(e);
......@@ -50,46 +49,15 @@ pub async fn update_client_redis_data(
// 将当前用户的信息插入到 Redis 的 onlineUsers 集合中
pub async fn insert_this_connection(
from_id: &str,
params: &HashMap<String, String>,
user_info_str: String,
) -> Result<(), redis::RedisError> {
let device_id = params.get("deviceID").cloned().unwrap_or("".to_string());
let from_name = params.get("fromName").cloned().unwrap_or("".to_string());
let has_mike = if let Some(has_mike) = params.get("hasMike") {
has_mike.to_string()
} else {
"0".to_string()
};
let has_camera = if let Some(has_camera) = params.get("hasCamera") {
has_camera.to_string()
} else {
"0".to_string()
};
let user_call_group = if let Some(user_call_group) = params.get("userCallGroup") {
user_call_group.to_string()
} else {
"0".to_string()
};
let from_name_decoded = match decode(from_name.as_str()) {
Ok(decoded) => decoded.to_string(),
Err(_) => from_name,
};
// 按照结构体OnlineUserMessage的格式构造用户信息字符串,用逗号拼接即可
// callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName
let user_info_str = format!(
"{},{},{},{},{},{},0,{},{}",
"idle", "", device_id, from_id, has_camera, has_mike, user_call_group, from_name_decoded
);
{
// 获取写锁以进行写操作
ONLINE_USERS.insert(from_id.to_string(), user_info_str.clone());
println!("成功将用户id: {} 的信息插入到 ONLINE_USERS 中", from_id);
}
let mut con = REDIS_POOL
.get_connection()
.expect("Failed to get Redis connection");
.map_err(|e| {
println!("Failed to get Redis connection: {}", e);
e
})?;
if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &user_info_str) {
println!(
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment