Commit 14a4c852 by qlintonger xeno

CmdEndMeeting以及CmdConnect完毕!CmdHangup完毕,准备处理线程自动挂断任务

parent 88a41408
......@@ -88,11 +88,7 @@ pub async fn handle_agora_call(
ONLINE_USERS.insert(from_id.to_string(), data_joined.to_string());
let from_id_clone = from_id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&from_id_clone,
data_joined,
)
.await
if let Err(e) = update_client_redis_data(&from_id_clone, data_joined).await
{
println!("更新redis数据失败:{:?}", e);
};
......@@ -156,7 +152,8 @@ pub async fn handle_agora_call(
the_other_caller_data[0] = "idle".to_string();
the_other_caller_data[1] = "".to_string();
the_other_caller_data[6] = "0".to_string();
let all_data_joined = the_other_caller_data.join(",").to_string();
let all_data_joined =
the_other_caller_data.join(",").to_string();
println!("step - 0");
drop(the_other_caller_data);
ONLINE_USERS.insert(
......@@ -165,10 +162,8 @@ pub async fn handle_agora_call(
);
println!("step - 1");
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&to_hangup_id,
all_data_joined,
)
if let Err(e) =
update_client_redis_data(&to_hangup_id, all_data_joined)
.await
{
println!("更新redis数据失败:{:?}", e);
......@@ -190,18 +185,12 @@ pub async fn handle_agora_call(
drop(user_status);
drop(the_current_user_data);
println!("step - 1.7");
ONLINE_USERS.insert(
from_id.to_string(),
the_current_data_joined.to_string(),
);
ONLINE_USERS.insert(from_id.to_string(), the_current_data_joined.to_string());
let from_id_clone = from_id.to_string();
println!("step - 2.1");
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&from_id_clone,
the_current_data_joined,
)
.await
if let Err(e) =
update_client_redis_data(&from_id_clone, the_current_data_joined).await
{
println!("更新redis数据失败:{:?}", e);
};
......@@ -390,7 +379,11 @@ pub async fn handle_agora_call(
})
.to_string();
// 更新状态数据
the_user_data[0] = "calling";
the_user_data[0] = if the_user_data[0] == "calling" {
"calling"
} else {
"callout"
};
the_user_data[1] = channel_id_now.as_str();
the_user_data[6] = "0";
// 在连接成功之后再设置为主持人,在此之前一律将所有用户的isHost数据段设置为0
......@@ -436,6 +429,7 @@ pub async fn handle_agora_call(
tokio::spawn(async move {
tokio::select! {
_ = sleep(Duration::from_secs(20)) => {
println!("20秒内没有接听,准备挂断通话");
// 20秒内没有接听,准备挂断通话
let mut current_user_data_vec: Vec<String> = user_data_cloned_current.split(',').map(|s| s.to_string()).collect();
// 提前取出channelID,判断是否还有人在频道里面
......@@ -443,11 +437,13 @@ pub async fn handle_agora_call(
current_user_data_vec[0] = "idle".to_string();
current_user_data_vec[1] = "".to_string();
current_user_data_vec[6] = "0".to_string();
println!("step R-H 1");
// 更新本地数据
ONLINE_USERS.insert(
user_id_clone.clone(),
current_user_data_vec.join(","),
);
println!("step R-H 2");
let user_id_clone_for = user_id_clone.clone();
tokio::spawn(async move {
// 更新redis数据
......@@ -461,6 +457,7 @@ pub async fn handle_agora_call(
println!("通知挂断refuse线程,更新redis数据成功");
}
});
println!("step R-H 3");
// 对其发送CmdHangup,对发起方发送CmdRefuse
// 发起方的isHost一定是1,找到该用户即可
let host_user_id_now = {
......@@ -474,6 +471,7 @@ pub async fn handle_agora_call(
})
.map(|entry| entry.key().clone())
};
println!("step R-H 4");
// 再找到对应的Sender
let host_user_sender_which = if let Some(host_user_id) = host_user_id_now {
CLIENT_SENDERS
......@@ -483,6 +481,7 @@ pub async fn handle_agora_call(
} else {
None
};
println!("step R-H 5");
if let Some(host_user_sender_which) = host_user_sender_which {
// 对其发送消息
send_inside_message(
......@@ -502,6 +501,7 @@ pub async fn handle_agora_call(
)
.await;
}
println!("step R-H 6");
// 向当前用户Sender发送CmdHangup要求其挂断
let hangup_user_sender_which = if let Some(hangup_user_sender_which) =
CLIENT_SENDERS.iter().find(|entry| entry.key().0 == user_id_clone) {
......@@ -509,6 +509,7 @@ pub async fn handle_agora_call(
} else {
None
};
println!("step R-H 7");
if let Some(hangup_user_sender_which) = hangup_user_sender_which {
// 对其发送消息
send_inside_message(
......@@ -527,6 +528,7 @@ pub async fn handle_agora_call(
&user_id_clone.to_string(),
).await;
}
println!("step R-H 8");
// 检查剩余频道人员数量
let left_users: Vec<(String, String)> = {
ONLINE_USERS.iter()
......@@ -534,15 +536,18 @@ pub async fn handle_agora_call(
.map(|entry| (entry.key().clone(), entry.value().clone())) // 克隆键和值
.collect()
};
println!("step R-H 9");
if left_users.len() <= 1 {
let only_left_user = left_users.get(0);
if let Some((user_id, user_data)) = only_left_user {
let mut user_data = user_data.split(',').collect::<Vec<&str>>();
{
println!("step R-H 10");
let user_is_host = user_data[1] == "1";
if user_is_host {
// 发起方
// 找到发起方的Sender
println!("step R-H 10-1");
let sender_or_not = {
CLIENT_SENDERS.iter().find(|entry| entry.key().0 == user_id.to_string())
};
......@@ -551,13 +556,20 @@ pub async fn handle_agora_call(
user_data[0] = "idle";
user_data[1] = "";
user_data[6] = "0";
println!("step R-H 10-2");
ONLINE_USERS.insert(user_id.to_string(), user_data.join(","));
println!("step R-H 10-3");
let user_data_j = user_data.join(",");
let uid = user_id.clone();
tokio::spawn(async move {
// 更新redis数据
if let Err(e) = update_client_redis_data(&user_id.to_string(), user_data.join(",")).await {
if let Err(e) = update_client_redis_data(&uid, user_data_j).await {
println!("只有一人在会议中,更新redis数据失败: {}", e)
} else {
println!("只有一人在会议中,更新redis数据成功")
}
});
println!("step R-H 10-4");
// 对其发送CmdCancelCall
send_inside_message(
host_user_sender_which.key(),
......@@ -575,6 +587,7 @@ pub async fn handle_agora_call(
}
}
}
println!("step R-H 11");
tokio::spawn(async move {
// 要求所有用户更新在线人员列表
notify_all_clients_to_update_online_users().await;
......@@ -610,7 +623,8 @@ pub async fn handle_agora_call(
let mut current_channel_id = "".to_string();
// 给当前from_id用户发送CmdHangup,表示已经成功拒接了电话,并且修改了状态
if let Some(ref current_user_data) = user_data {
let mut current_user_data_vec = current_user_data.split(',')
let mut current_user_data_vec = current_user_data
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
let hangup_refuse_message = serde_json::json!({
......@@ -696,7 +710,8 @@ pub async fn handle_agora_call(
println!("step - Refuse - 7");
// 检查同频道下是不是目前只有1人在线
let current_all_chatters = {
ONLINE_USERS.iter()
ONLINE_USERS
.iter()
.filter(|entry| entry.value().split(',').nth(1).unwrap() == current_channel_id)
.map(|entry| entry.key().to_string())
.collect::<Vec<String>>()
......@@ -770,12 +785,15 @@ pub async fn handle_agora_call(
}
// 主持人结束通话
"EndMeeting" => {
println!("step - EndMeeting - 1 收到挂断会议通知!");
let user_found = { ONLINE_USERS.get(from_id) };
// 要求判断是否为主持人,只有主持人可以结束通话
if let Some(current_user_data) = user_found {
if let Some(ref current_user_data) = user_found {
let current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
// 如果不是主持人,不允许发送该消息
println!("step - EndMeeting - 1.1 判断是否为主持人");
if current_user_data_vec[6] != "1" {
println!("step - EndMeeting - 1.2 不是主持人");
// 发送错误信息
let error_message = serde_json::json!({
"msgType": "Error",
......@@ -792,22 +810,29 @@ pub async fn handle_agora_call(
&from_id,
)
.await;
println!("step - EndMeeting - 1.3 不是主持人消息发送完毕");
drop(current_user_data_vec);
drop(user_found);
} else {
// 如果是主持人,则向所有人发送CmdEndMeeting消息,并且清理所有数据
// 提取出目前channel_id
let channel_id = current_user_data_vec[1];
println!("step - EndMeeting - 1.4 判断是主持人");
let channel_id = current_user_data_vec[1].to_string();
if channel_id != "" {
// 从ONLINE_USERS中过滤筛选出所有与channelID相同的用户id集合
println!("step - EndMeeting - 1.5 判断channel_id");
drop(user_found);
let users_to_notify = {
ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[2] == channel_id
entry.value().split(",").collect::<Vec<_>>()[1] == channel_id
})
.map(|entry| entry.key().to_string())
.collect::<Vec<String>>()
};
let clone_for_users_to_notify = users_to_notify.clone();
println!("step - EndMeeting - 1.6 判断channel_id 遍历循环");
// 再根据上面的id,找到所有sender,并且修改所有对应ONLINE_USERS数据
for user_id_current_chat in users_to_notify {
let user_d = { ONLINE_USERS.get(&user_id_current_chat) };
......@@ -818,10 +843,14 @@ pub async fn handle_agora_call(
current_user_data_vec[1] = "";
current_user_data_vec[6] = "0";
let current_user_data_joined = current_user_data_vec.join(",");
println!("step - EndMeeting - 1.7 判断channel_id 遍历循环 1");
drop(current_user_data_vec);
drop(current_user_data);
ONLINE_USERS.insert(
user_id_current_chat.to_string(),
current_user_data_joined.clone(),
);
println!("step - EndMeeting - 1.8 判断channel_id 遍历循环 2");
tokio::spawn(async move {
// 更新对应redis数据
if let Err(e) = update_client_redis_data(
......@@ -838,8 +867,10 @@ pub async fn handle_agora_call(
println!("更新redis数据成功");
}
});
println!("玄幻完毕");
}
}
println!("step - EndMeeting - 1.9 发送消息 遍历循环 3");
for user_id in clone_for_users_to_notify {
if let Some(target_sender) = CLIENT_SENDERS
.iter()
......@@ -847,6 +878,7 @@ pub async fn handle_agora_call(
.map(|entry| entry.key().clone())
{
// 找到这个sender标记符之后,发送消息CmdEndMeeting
println!("step - EndMeeting 消息发送给 {:?}", &(target_sender));
send_inside_message(
&target_sender,
event_sender,
......@@ -1051,7 +1083,7 @@ pub async fn handle_agora_call(
if let Some(allowed_user_id) = allowed_users_clone.get(0) {
let allowed_user_id_clone = allowed_user_id.clone(); // 克隆 allowed_user_id
let data_f = { ONLINE_USERS.get(&allowed_user_id_clone) };
if let Some(allowed_user_info_data) = data_f {
if let Some(ref allowed_user_info_data) = data_f {
let mut user_info_data_for_allowed = allowed_user_info_data
.split(',')
.map(|s| s.to_string())
......@@ -1060,6 +1092,7 @@ pub async fn handle_agora_call(
// 修改online数据信息
let user_info_data_for_allowed_joined =
user_info_data_for_allowed.join(",");
drop(data_f);
ONLINE_USERS.insert(
allowed_user_id_clone.clone(),
user_info_data_for_allowed_joined.clone(),
......@@ -1087,13 +1120,14 @@ pub async fn handle_agora_call(
if let Some(disallowed_user_id) = disallowed_users_clone.get(0) {
let disallowed_user_id_clone = disallowed_user_id.clone(); // 克隆 disallowed_user_id
let d_f = { ONLINE_USERS.get(&disallowed_user_id_clone) };
if let Some(disallowed_user_info_data) = d_f {
if let Some(ref disallowed_user_info_data) = d_f {
let mut current_host_data = disallowed_user_info_data
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
current_host_data[6] = "1".to_string();
let current_host_data_joined = current_host_data.join(",");
drop(d_f);
// 更新ONLINE_USERS数据
ONLINE_USERS.insert(
disallowed_user_id_clone.clone(),
......@@ -1140,6 +1174,7 @@ pub async fn handle_agora_call(
// 发送connect消息之时,默认以toID用户的channelID作为channelID
let to_id_user = &client_message_data.to_id;
// 找到对应to_id用户的数据
println!("Step Connect-1");
let to_id_user_data_here = { ONLINE_USERS.get(to_id_user) };
if let Some(to_id_user_data) = to_id_user_data_here {
// 如果存在,则获取这个channel_id
......@@ -1148,11 +1183,13 @@ pub async fn handle_agora_call(
.map(|s| s.to_string())
.collect::<Vec<String>>();
// 提前释放资源,释放对应锁
println!("Step Connect-1.1");
drop(to_id_user_data);
let target_channel_id = data_split[1].clone();
if !target_channel_id.is_empty() {
// 这里的from_id就是被呼叫方,to_id就是呼叫方
// 首先先取消掉所有的挂断from_id的任务
println!("Step Connect-1.2");
let hangup_p = { refuse_procedure_map.get(from_id) };
if let Some(hangup_personnel) = hangup_p {
if let Err(e) = hangup_personnel.send(()) {
......@@ -1161,6 +1198,7 @@ pub async fn handle_agora_call(
println!("挂断personnel的任务已经发送成功")
}
}
println!("Step Connect-1.3");
// 检查一下是否存在挂断channel的任务,如果存在,对齐发送消息,然后关闭
let hangup_p = { channel_hangup_procedure_map.get(&target_channel_id) };
if let Some(hangup_channel_task) = hangup_p {
......@@ -1170,42 +1208,48 @@ pub async fn handle_agora_call(
println!("挂断channel的任务已经发送成功")
}
}
println!("Step Connect-1.4");
// 如果当前channel已经有主持人,则不做处理
let already_had_host = {
ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[1]
== target_channel_id
entry.value().split(",").collect::<Vec<_>>()[1] == target_channel_id
})
.any(|entry| entry.value().split(",").collect::<Vec<_>>()[6] == "1")
};
println!("Step Connect-1.5");
if already_had_host {
println!("Step Connect-1.6");
println!("当前channel已经有主持人,直接返回 {}", target_channel_id)
} else {
println!("当前channel没有主持人,开始处理 {}", target_channel_id);
// 首先判断发起方,也就是channel_id用_分割之后的第一个元素,是否具备成为主持人的资格
println!("Step Connect-1.8");
let last_remaining_id_set = target_channel_id
.split("_")
.map(|s| s.to_string())
.collect::<Vec<_>>();
if let Some(channel_id_first_element) = last_remaining_id_set.get(0) {
if HOST_ENABLED_ID_SET
.contains(&channel_id_first_element.to_string())
{
println!("Step Connect-2");
if HOST_ENABLED_ID_SET.contains(&channel_id_first_element.to_string()) {
// 具备成为主持人资格,直接修改其对应数据即可
let online_d =
{ ONLINE_USERS.get(&channel_id_first_element.to_string()) };
if let Some(host_data) = online_d {
if let Some(ref host_data) = online_d {
let mut host_data_vec =
host_data.split(",").collect::<Vec<_>>();
host_data_vec[6] = "1";
let host_data_joined = host_data_vec.join(",");
println!("Step Connect-3");
drop(online_d);
println!("Step Connect-4");
// 直接再次更新online_users
ONLINE_USERS.insert(
channel_id_first_element.to_string(),
host_data_joined.clone(),
);
println!("Step Connect-5");
let the_first_for = channel_id_first_element.to_string();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
......@@ -1222,9 +1266,12 @@ pub async fn handle_agora_call(
println!("更新redis数据成功");
}
});
println!("Step Connect-6");
}
} else {
println!("Step Connect-7");
// 获取所有同channel_id的用户,在ONLINE_USERS中寻找第一个满足条件的id
// 记录是否找到主持人对应id
let all_in_group_chatters_id: Vec<String> = {
ONLINE_USERS
.iter()
......@@ -1235,46 +1282,99 @@ pub async fn handle_agora_call(
.map(|entry| entry.key().clone())
.collect::<Vec<String>>()
};
// 记录是否找到主持人对应id
let mut is_host_found = false;
// 在all_in_group_chatters_id中,找寻第一个在HOST_ENABLED_ID_SET中的id
for id in all_in_group_chatters_id {
if HOST_ENABLED_ID_SET.contains(&id) {
// 具备成为主持人资格,直接修改其对应数据即可
let hd = { ONLINE_USERS.get(&id) };
if let Some(host_data) = hd {
let first_host_id = all_in_group_chatters_id.iter()
.find(|id| HOST_ENABLED_ID_SET.contains(&id.to_string()));
let mut id = "";
if let Some(target_id) = first_host_id {
id = target_id;
} else {
// 若是空的,则直接取值第一个即可
if let Some(first_element) =
all_in_group_chatters_id.first()
{
id = first_element;
}
}
println!("Step Connect-7.5 找到具备成为主持人的host-id");
let hd = { ONLINE_USERS.get(id) };
if let Some(ref host_data) = hd {
let mut host_data_vec =
host_data.split(",").collect::<Vec<_>>();
host_data_vec[6] = "1";
is_host_found = true;
println!("Step Connect-8");
// 更新对应数据
let host_data_joined = host_data_vec.join(",");
ONLINE_USERS.insert(
id.to_string(),
host_data_joined.clone(),
);
println!("Step Connect-9");
drop(hd);
ONLINE_USERS
.insert(id.to_string(), host_data_joined.clone());
println!("Step Connect-9.1");
let host_id_clone = id.to_string();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&id,
&host_id_clone,
host_data_joined.clone(),
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {} 成为主持人",
e, id
e, host_id_clone
)
} else {
println!("更新redis数据成功");
}
});
break;
println!("Step Connect-10");
}
}
}
if !is_host_found {
println!("当前channel没有主持人,并且没有找到合适的主持人,直接派遣第一个id即可 {}", target_channel_id);
}
// 设置主持人之后,需要对双方发送CmdConnect消息
println!("Step Connect-11");
// 注意,用户接听之后,是要对同channel_id下的所有其余状态不为calling的用户发送
let all_not_calling_users = {
ONLINE_USERS
.iter()
.filter(|entry| {
let entry_split = entry.value().split(",").collect::<Vec<_>>();
entry_split[1] == target_channel_id && entry_split[0] != "calling"
})
.map(|entry| entry.key().clone())
.collect::<Vec<String>>()
};
println!("Step Connect-11.5 当前所有channel用户 {:?}和channelID {}", all_not_calling_users,target_channel_id.to_string());
// 遍历上述结构,然后针对他们均发送CmdConnect消息
for user_id in all_not_calling_users {
// 找到对应sender
let target_sender_which = {
CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == user_id)
.map(|entry| entry.key().clone())
};
if let Some(target_sender_which) = target_sender_which {
let json_str = serde_json::json!({
"msgType": "CmdConnect",
"msgData": {
"channelID": target_channel_id,
"rtcToken": ""
},
"fromID" : "0",
"fromName": "Server",
"toID": user_id
})
.to_string();
// 发送消息
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
json_str,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", user_id, e);
} else {
println!("发送给用户id {} 独立消息成功 CmdConnect", user_id);
}
}
}
......@@ -1282,6 +1382,7 @@ pub async fn handle_agora_call(
// 最后广播所有用户更新状态
notify_all_clients_to_update_online_users().await;
});
println!("Step Connect-11");
} else {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment