Commit 5328a85f by qlintonger xeno

基本消息类型完成,准备测试一下

parent 701a290a
......@@ -34,7 +34,7 @@ pub async fn handle_ws_msg(
}
// 通话类消息直接托管给对应句柄即可
"Call" | "CancelCall" | "Refuse" | "EndMeeting" | "Hangup" | "Connect" | "Mute"
| "MuteAll" | "KickOut" | "MuteSelf" | "UnMuteSelf" => {
| "MuteAll" | "KickOut" | "MuteSelf" => {
handle_agora_call(
&client_message_data,
&from_id,
......
use crate::client::ONLINE_USERS;
use crate::events::{notify_all_clients_to_update_online_users, Event, CLIENT_SENDERS};
use crate::handles::handshake::HOST_ENABLED_ID_SET;
use crate::handles::redis::update_client_redis_data;
use crate::typing::message_typed::ClientMessageData;
use crate::utils::json_utils::get_current_timestamp;
......@@ -12,6 +13,8 @@ use tokio::time::sleep;
lazy_static! {
pub static ref refuse_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new();
pub static ref channel_hangup_procedure_map: DashMap<String, UnboundedSender<()>> =
DashMap::new();
}
async fn send_inside_message(
......@@ -687,9 +690,10 @@ pub async fn handle_agora_call(
if let Some(current_user_data) = ONLINE_USERS.get(from_id) {
// 直接修改对应数据即可
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
let is_current_user_host = current_user_data_vec[6] == "1";
let current_chatting_channel_id = current_user_data_vec[1];
current_user_data_vec[0] = "idle";
current_user_data_vec[1] = "";
let is_current_user_host = current_user_data_vec[6] == "1";
// 无论是否是主持人,都需要发送对应消息,更新数据以及同步redis
current_user_data_vec[6] = "0";
let current_user_data_joined = current_user_data_vec.join(",");
......@@ -718,16 +722,312 @@ pub async fn handle_agora_call(
&from_id,
)
.await;
// 判断一下当前用户是不是主持人
// 如果当前用户是主持人,主持人退出之后,需要在剩下的用户中决定新主持人
if is_current_user_host {
// 过滤出剩下来的channelID同一的在线用户,并且整合为Vec集合
let remaining_users: Vec<String> = ONLINE_USERS
.iter()
.filter(|entry| {
let v_vec: Vec<&str> = entry.value().split(',').collect();
v_vec[1] == current_chatting_channel_id
})
.map(|entry| entry.key().to_string())
.collect();
// 没有人参与会议了,直接退出即可
if remaining_users.len() == 0 {
println!("当前频道没有人员,请重新发起通话");
}
// 如果余留下的用户数量仅为1,则开启线程,20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令
// 如果大于2人,则需要判断主持人
else {
// 无论如何,都要向他们发送CmdLeave消息
for user_id in remaining_users.iter() {
if let Some(target_sender) = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == *user_id)
.map(|entry| entry.key().clone())
{
// 找到这个sender标记符之后,发送消息CmdEndMeeting
let cmd_leave_message = serde_json::json!({
"msgType": "CmdLeave",
"fromID": &from_id,
"fromName": "Unknown",
"toID": user_id,
"msgData": {}
})
.to_string();
send_inside_message(
&target_sender,
event_sender,
cmd_leave_message,
user_id,
)
.await;
}
}
if remaining_users.len() == 1 {
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
channel_hangup_procedure_map
.insert(current_chatting_channel_id.to_string(), cancel_tx);
let channel_id_clone = current_chatting_channel_id.to_string();
let ev_clone = event_sender.clone();
// 开启一个tokio线程,使用async move
tokio::spawn(async move {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(20)) => {
// 20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令
println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {:?}", remaining_users);
if let Some(remaining_user_id) = remaining_users.get(0) {
{
if let Some(remain_user_info_data) = ONLINE_USERS.get(remaining_user_id){
// 找到对应Sender
if let Some(target_sender_which) = CLIENT_SENDERS.iter()
.find(|entry| entry.key().0 == *remaining_user_id) {
// 修改状态数据
let mut user_info_data_for_remained = remain_user_info_data.split(',').collect::<Vec<_>>();
user_info_data_for_remained[0] = "idle";
user_info_data_for_remained[1] = "";
user_info_data_for_remained[6] = "0";
let user_info_data_for_remained_joined = user_info_data_for_remained.join(",");
ONLINE_USERS.insert(remaining_user_id.to_string(), user_info_data_for_remained_joined.clone());
if let Err(e) = update_client_redis_data(remaining_user_id, user_info_data_for_remained_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, remaining_user_id);
} else {
println!("更新redis数据成功");
}
// 发送CmdHangup
send_inside_message(
&target_sender_which.key(),
&ev_clone,
serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": remaining_user_id,
"msgData": {
"channelId":"",
}
}).to_string(),
&remaining_user_id,
).await;
// 要求所有用户更新数据
notify_all_clients_to_update_online_users().await;
}
}
}
}
}
_ = cancel_rx.recv() => {
// 如果收到取消信号,则直接结束线程
println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone);
}
}
});
}
// 如果余下用户数量超过2人及以上,则需要判断应该让谁让主持人
else {
// 分组:1.筛选出id在HOST_ENABLED_ID_SET中的用户,即允许成为主持人的用户,2.筛选出id不在HOST_ENABLED_ID_SET中的用户,即不允许成为主持人的用户
// 注意,这里只需要修改一下状态数据即可,不用单独发送消息
let (allowed_users, disallowed_users): (Vec<_>, Vec<_>) =
remaining_users
.iter()
.partition(|user_id| HOST_ENABLED_ID_SET.contains(user_id.as_str()));
if !allowed_users.is_empty() {
// 让第一位id的用户成为主持人
if let Some(allowed_user_id) = allowed_users.get(0) {
if let Some(allowed_user_info_data) =
ONLINE_USERS.get(allowed_user_id.as_str())
{
{
// 修改状态数据
let mut user_info_data_for_allowed =
allowed_user_info_data
.split(',')
.collect::<Vec<_>>();
user_info_data_for_allowed[6] = "1";
// 修改online数据信息
let user_info_data_for_allowed_joined =
user_info_data_for_allowed.join(",");
ONLINE_USERS.insert(
allowed_user_id.to_string(),
user_info_data_for_allowed_joined.clone(),
);
if let Err(e) = update_client_redis_data(
allowed_user_id,
user_info_data_for_allowed_joined.clone(),
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {}",
e, allowed_user_id
)
} else {
println!("更新redis数据成功");
}
}
}
}
} else if !disallowed_users.is_empty() {
// 让第一位
if let Some(disallowed_user_id) = disallowed_users.get(0) {
// 完全照搬上面的逻辑
if let Some(disallowed_user_info_data) =
ONLINE_USERS.get(disallowed_user_id.as_str())
{
{
let mut current_host_data = disallowed_user_info_data
.split(',')
.collect::<Vec<_>>();
current_host_data[6] = "1";
let current_host_data_joined =
current_host_data.join(",");
// 更新ONLINE_USERS数据
ONLINE_USERS.insert(
disallowed_user_id.to_string(),
current_host_data_joined.clone(),
);
if let Err(e) = update_client_redis_data(
disallowed_user_id,
current_host_data_joined.clone(),
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {}",
e, disallowed_user_id
)
} else {
println!("更新redis数据成功");
}
}
}
}
}
}
}
}
// 无论如何都要发送通知在线人员更新
notify_all_clients_to_update_online_users().await;
}
}
// 接听通话,这个是目前最重要的逻辑部分
// 在这里要处理如下事项
// 1.取消20s内挂断某个用户的线程;
// 2.若加入的频道号之前只有一个用户,那么目前频道是超过2人的,那么要取消20s内退出的线程;
// 3.修改状态并且广播出去
// 4.决定主持人。一般来说,谁发起了这个频道的人,并且具备成为主持人的资格,那么就直接成为主持人,否则就让第一个加入的人成为主持人。
"Connect" => {
println!("收到客户端Connect消息连接 {} 频道号 {}", &from_id, client_message_data.msg_data.get("channelID").unwrap_or(&serde_json::Value::String("null".to_string())));
if let Some(channel_id) = client_message_data.msg_data.get("channelID") {
// 这里的from_id就是被呼叫方,to_id就是呼叫方
// 首先先取消掉所有的挂断from_id的任务
if let Some(hangup_personnel) = refuse_procedure_map.get(from_id) {
if let Err(e) = hangup_personnel.send(()) {
println!("挂断personnel的任务终结失败 {:?}", e)
} else {
println!("挂断personnel的任务已经发送成功")
}
}
// 将channel_id转换为字符串,并且进行解析
if let Some(channel_id_str) = channel_id.as_str() {
// 检查一下是否存在挂断channel的任务,如果存在,对齐发送消息,然后关闭
if let Some(hangup_channel_task) = channel_hangup_procedure_map.get(channel_id_str) {
if let Err(e) = hangup_channel_task.send(()) {
println!("挂断channel的任务终结失败 {:?}", e)
} else {
println!("挂断channel的任务已经发送成功")
}
}
// 如果当前channel已经有主持人,则不做处理
let already_had_host = ONLINE_USERS.iter()
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] ==channel_id_str)
.any(|entry| entry.value().split(",").collect::<Vec<_>>()[6] == "1");
if already_had_host {
println!("当前channel已经有主持人,直接返回 {}", channel_id_str)
} else {
println!("当前channel没有主持人,开始处理 {}", channel_id_str);
// 首先判断发起方,也就是channel_id用_分割之后的第一个元素,是否具备成为主持人的资格
if let Some(channel_id_first_element) = channel_id_str.split("_").collect::<Vec<_>>().get(0) {
if HOST_ENABLED_ID_SET.contains(&channel_id_first_element.to_string()) {
// 具备成为主持人资格,直接修改其对应数据即可
if let Some(host_data) = ONLINE_USERS.get(&channel_id_first_element.to_string()) {
let mut host_data_vec = host_data.split(",").collect::<Vec<_>>();
host_data_vec[6] = "1";
let host_data_joined = host_data_vec.join(",");
// 直接再次更新online_users
ONLINE_USERS.insert(
channel_id_first_element.to_string(),
host_data_joined.clone(),
);
if let Err(e) = update_client_redis_data(
channel_id_first_element,
host_data_joined.clone(),
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {} 成为主持人",
e, channel_id_first_element
)
} else {
println!("更新redis数据成功");
}
}
} else {
// 获取所有同channel_id的用户,在ONLINE_USERS中寻找第一个满足条件的id
let all_in_group_chatters_id: Vec<String> = ONLINE_USERS.iter()
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] ==channel_id_str)
.map(|entry| entry.key().clone())
.collect::<Vec<String>>();
// 记录是否找到主持人对应id
let mut is_host_found = false;
// 在all_in_group_chatters_id中,找寻第一个在HOST_ENABLED_ID_SET中的id
for id in all_in_group_chatters_id {
if HOST_ENABLED_ID_SET.contains(&id) {
// 具备成为主持人资格,直接修改其对应数据即可
if let Some(host_data) = ONLINE_USERS.get(&id) {
let mut host_data_vec = host_data.split(",").collect::<Vec<_>>();
host_data_vec[6] = "1";
is_host_found = true;
// 更新对应数据
let host_data_joined = host_data_vec.join(",");
ONLINE_USERS.insert(
id.to_string(),
host_data_joined.clone(),
);
if let Err(e) = update_client_redis_data(
&id,
host_data_joined.clone(),
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {} 成为主持人",
e, id
)
} else {
println!("更新redis数据成功");
}
break;
}
}
}
if !is_host_found {
println!("当前channel没有主持人,并且没有找到合适的主持人,直接派遣第一个id即可 {}", channel_id_str);
}
}
}
}
}
// 最后广播所有用户更新状态
notify_all_clients_to_update_online_users().await;
} else {
println!("客户端Connect消息 缺乏channel-id,拒绝处理消息")
}
}
// 处理静音
"Mute" => {
}
_ => {}
......
......@@ -31,7 +31,10 @@ pub async fn remove_this_connection(from_id: &str) -> Result<(), redis::RedisErr
Ok(())
}
pub async fn update_client_redis_data(from_id: &str, data_str: String) -> Result<(), redis::RedisError> {
pub async fn update_client_redis_data(
from_id: &str,
data_str: String,
) -> Result<(), redis::RedisError> {
// 修改对应用户redis数据
let mut con = REDIS_POOL
.get_connection()
......@@ -54,21 +57,26 @@ pub async fn insert_this_connection(
let device_id = params.get("deviceID").cloned().unwrap_or("".to_string());
let from_name = params.get("fromName").cloned().unwrap_or("".to_string());
let has_mike = if let Some(has_mike) = params.get("hasMike"){
let has_mike = if let Some(has_mike) = params.get("hasMike") {
has_mike.to_string()
} else {
"0".to_string()
};
let has_camera = if let Some(has_camera) = params.get("hasCamera"){
let has_camera = if let Some(has_camera) = params.get("hasCamera") {
has_camera.to_string()
} else {
"0".to_string()
};
let user_call_group = if let Some(user_call_group) = params.get("userCallGroup") {
user_call_group.to_string()
} else {
"0".to_string()
};
// 按照结构体OnlineUserMessage的格式构造用户信息字符串,用逗号拼接即可
// callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName
let user_info_str = format!(
"{},{},{},{},{},{},0,0,{}",
"idle", "", from_id, device_id, has_camera, has_mike,from_name
"{},{},{},{},{},{},0,{},{}",
"idle", "", from_id, device_id, has_camera, has_mike, user_call_group, from_name
);
if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &user_info_str) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment