Commit a8e1f401 by qlintonger xeno

cancel-call和call成功,但是出现死锁问题,卡顿在step - 0这里

parent 19bee953
...@@ -28,8 +28,14 @@ pub(crate) async fn handle_client( ...@@ -28,8 +28,14 @@ pub(crate) async fn handle_client(
let must_existed_params = ["deviceID", "fromID", "wsPwd"]; let must_existed_params = ["deviceID", "fromID", "wsPwd"];
let mut connection_params = None; let mut connection_params = None;
let ws_stream = let ws_stream = match handle_websocket_handshake(
match handle_websocket_handshake(stream, &must_existed_params, STATIC_WS_PWD, &mut connection_params).await { stream,
&must_existed_params,
STATIC_WS_PWD,
&mut connection_params,
)
.await
{
Ok(ws) => ws, Ok(ws) => ws,
Err(e) => { Err(e) => {
println!("WebSocket握手失败: {:?}", e); println!("WebSocket握手失败: {:?}", e);
...@@ -83,7 +89,11 @@ pub(crate) async fn handle_client( ...@@ -83,7 +89,11 @@ pub(crate) async fn handle_client(
} }
ClientMessage::SendClientMessage(from_id, client_message, close) => { ClientMessage::SendClientMessage(from_id, client_message, close) => {
let real_user_id = from_id.0; let real_user_id = from_id.0;
println!("消息中心:==> 收到消息 {:?} 发送给 {}", &client_message, real_user_id.clone()); println!(
"消息中心:==> 收到消息 {:?} 发送给 {}",
&client_message,
real_user_id.clone()
);
if let Err(e) = sender.send(Message::text(client_message)).await { if let Err(e) = sender.send(Message::text(client_message)).await {
println!("发送给用户id {} 独立消息失败:{:?}", real_user_id, e); println!("发送给用户id {} 独立消息失败:{:?}", real_user_id, e);
break; break;
...@@ -135,6 +145,8 @@ pub(crate) async fn handle_client( ...@@ -135,6 +145,8 @@ pub(crate) async fn handle_client(
} }
} }
} else { } else {
// 断开连接之后直接移除即可
event_sender.send(Event::CloseConnection((from_id.clone(), connection_time))).unwrap();
break; // 客户端断开连接 break; // 客户端断开连接
} }
} }
...@@ -156,9 +168,13 @@ pub(crate) async fn handle_client( ...@@ -156,9 +168,13 @@ pub(crate) async fn handle_client(
} }
// 发送关闭连接事件 // 发送关闭连接事件
event_sender if let Err(e) =
.send(Event::CloseConnection((from_id.clone(), connection_time))) event_sender.send(Event::CloseConnection((from_id.clone(), connection_time)))
.unwrap(); {
println!("发送关闭消息到事件中心失败! 原因: {:?}", e);
} else {
println!("发送关闭消息到事件中心成功!");
}
// 等待事件中心调度任务结束 // 等待事件中心调度任务结束
if let Err(e) = event_task.await { if let Err(e) = event_task.await {
......
...@@ -75,21 +75,22 @@ pub async fn handle_agora_call( ...@@ -75,21 +75,22 @@ pub async fn handle_agora_call(
// 对同一个channelID的组,发送挂断消息 // 对同一个channelID的组,发送挂断消息
if let Some(user_status) = user_status { if let Some(user_status) = user_status {
// 将获取的字符串用逗号拆分,并存储为 Vec<String> // 将获取的字符串用逗号拆分,并存储为 Vec<String>
let mut the_current_user_data: Vec<String> = user_status.split(',').map(|s| s.to_string()).collect(); let mut the_current_user_data: Vec<String> =
user_status.split(',').map(|s| s.to_string()).collect();
// 如果当前用户channelID为空,则中断后续操作,并且直接重新赋值即可 // 如果当前用户channelID为空,则中断后续操作,并且直接重新赋值即可
if the_current_user_data[1].is_empty() { if the_current_user_data[1].is_empty() {
the_current_user_data[1] = "".to_string(); the_current_user_data[1] = "".to_string();
the_current_user_data[0] = "idle".to_string(); the_current_user_data[0] = "idle".to_string();
the_current_user_data[6] = "0".to_string(); the_current_user_data[6] = "0".to_string();
ONLINE_USERS.insert( let data_joined = the_current_user_data.join(",");
from_id.to_string(), drop(user_status);
the_current_user_data.join(","), drop(the_current_user_data);
); ONLINE_USERS.insert(from_id.to_string(), data_joined.to_string());
let from_id_clone = from_id.clone(); let from_id_clone = from_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = update_client_redis_data( if let Err(e) = update_client_redis_data(
&from_id_clone, &from_id_clone,
the_current_user_data.join(","), data_joined,
) )
.await .await
{ {
...@@ -102,7 +103,8 @@ pub async fn handle_agora_call( ...@@ -102,7 +103,8 @@ pub async fn handle_agora_call(
return; return;
} }
// 拆分toId为其他用户id,并且获取其数据信息 // 拆分toId为其他用户id,并且获取其数据信息
let all_to_hangup_id_vec: Vec<String> = calling_to_id.split(',').map(|s| s.to_string()).collect(); let all_to_hangup_id_vec: Vec<String> =
calling_to_id.split(',').map(|s| s.to_string()).collect();
for to_hangup_id in all_to_hangup_id_vec { for to_hangup_id in all_to_hangup_id_vec {
// 取消拒绝接听的线程队列 // 取消拒绝接听的线程队列
if let Some(ref_procedure_sender) = if let Some(ref_procedure_sender) =
...@@ -117,7 +119,11 @@ pub async fn handle_agora_call( ...@@ -117,7 +119,11 @@ pub async fn handle_agora_call(
let the_other_caller = { ONLINE_USERS.get(&to_hangup_id) }; let the_other_caller = { ONLINE_USERS.get(&to_hangup_id) };
// 获取当前用户状态信息成功后,再进行额外操作 // 获取当前用户状态信息成功后,再进行额外操作
if let Some(the_other_caller_data) = the_other_caller { if let Some(the_other_caller_data) = the_other_caller {
let mut the_other_caller_data: Vec<String> = the_other_caller_data.split(',').map(|s| s.to_string()).collect(); let mut the_other_caller_data: Vec<String> = the_other_caller_data
.split(',')
.map(|s| s.to_string())
.collect();
// 提前释放
// 如果处于同一频道,则发送挂断信息, // 如果处于同一频道,则发送挂断信息,
if the_other_caller_data[1] == the_current_user_data[1] { if the_other_caller_data[1] == the_current_user_data[1] {
let hangup_message_json = serde_json::json!({ let hangup_message_json = serde_json::json!({
...@@ -144,18 +150,23 @@ pub async fn handle_agora_call( ...@@ -144,18 +150,23 @@ pub async fn handle_agora_call(
false, false,
)) { )) {
println!("发送给用户id {} 独立消息失败:{:?}", to_hangup_id, e); println!("发送给用户id {} 独立消息失败:{:?}", to_hangup_id, e);
} else {
// 修改对应sender的数据 // 修改对应sender的数据
the_other_caller_data[0] = "idle".to_string(); the_other_caller_data[0] = "idle".to_string();
the_other_caller_data[1] = "".to_string(); the_other_caller_data[1] = "".to_string();
the_other_caller_data[6] = "0".to_string(); the_other_caller_data[6] = "0".to_string();
let all_data_joined = the_other_caller_data.join(",").to_string();
println!("step - 0");
drop(the_other_caller_data);
ONLINE_USERS.insert( ONLINE_USERS.insert(
to_hangup_id.to_string(), to_hangup_id.to_string(),
the_other_caller_data.join(",").to_string(), all_data_joined.to_string(),
); );
println!("step - 1");
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = update_client_redis_data( if let Err(e) = update_client_redis_data(
&to_hangup_id, &to_hangup_id,
the_other_caller_data.join(",").to_string(), all_data_joined,
) )
.await .await
{ {
...@@ -169,19 +180,24 @@ pub async fn handle_agora_call( ...@@ -169,19 +180,24 @@ pub async fn handle_agora_call(
} }
} }
} }
println!("step - 1.5");
// 要求所有在线用户更新数据 // 要求所有在线用户更新数据
the_current_user_data[1] = "".to_string(); the_current_user_data[1] = "".to_string();
the_current_user_data[0] = "idle".to_string(); the_current_user_data[0] = "idle".to_string();
the_current_user_data[6] = "0".to_string(); the_current_user_data[6] = "0".to_string();
let the_current_data_joined = the_current_user_data.join(",").to_string();
drop(the_current_user_data);
println!("step - 1.7");
ONLINE_USERS.insert( ONLINE_USERS.insert(
from_id.to_string(), from_id.to_string(),
the_current_user_data.join(",").to_string(), the_current_data_joined.to_string(),
); );
let from_id_clone = from_id.to_string(); let from_id_clone = from_id.to_string();
println!("step - 2.1");
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = update_client_redis_data( if let Err(e) = update_client_redis_data(
&from_id_clone, &from_id_clone,
the_current_user_data.join(",").to_string(), the_current_data_joined,
) )
.await .await
{ {
...@@ -189,8 +205,10 @@ pub async fn handle_agora_call( ...@@ -189,8 +205,10 @@ pub async fn handle_agora_call(
}; };
}); });
tokio::spawn(async move { tokio::spawn(async move {
println!("更新在线用户列表 Cancel-Call");
notify_all_clients_to_update_online_users().await; notify_all_clients_to_update_online_users().await;
}); });
println!("step - 2.2");
} }
} }
// 对于通话类消息 // 对于通话类消息
...@@ -325,6 +343,7 @@ pub async fn handle_agora_call( ...@@ -325,6 +343,7 @@ pub async fn handle_agora_call(
println!("step - 1"); println!("step - 1");
let the_another_user_data_joined = the_another_user_data.join(","); let the_another_user_data_joined = the_another_user_data.join(",");
// 更新状态数据 // 更新状态数据
drop(user_status);
ONLINE_USERS.insert( ONLINE_USERS.insert(
calling_to_id.to_string(), calling_to_id.to_string(),
the_another_user_data_joined.clone(), the_another_user_data_joined.clone(),
...@@ -374,15 +393,22 @@ pub async fn handle_agora_call( ...@@ -374,15 +393,22 @@ pub async fn handle_agora_call(
the_user_data[6] = "0"; the_user_data[6] = "0";
// 在连接成功之后再设置为主持人,在此之前一律将所有用户的isHost数据段设置为0 // 在连接成功之后再设置为主持人,在此之前一律将所有用户的isHost数据段设置为0
let the_user_data_joined = the_user_data.join(","); let the_user_data_joined = the_user_data.join(",");
drop(user_status);
println!("step - 4");
ONLINE_USERS.insert(from_id.to_string(), the_user_data_joined.clone()); ONLINE_USERS.insert(from_id.to_string(), the_user_data_joined.clone());
let from_id_c = from_id.to_string();
println!("step - 5");
tokio::spawn(async move {
// 修改redis数据 // 修改redis数据
if let Err(e) = if let Err(e) =
update_client_redis_data(from_id, the_user_data_joined).await update_client_redis_data(&from_id_c, the_user_data_joined).await
{ {
println!("更新redis数据失败:{:?} 用户id {}", e, from_id); println!("更新redis数据失败:{:?} 用户id {}", e, from_id_c);
} else { } else {
println!("更新redis数据成功"); println!("更新redis数据成功");
} }
});
println!("step - 6");
send_inside_message( send_inside_message(
&target_sender_which, &target_sender_which,
event_sender, event_sender,
...@@ -579,7 +605,9 @@ pub async fn handle_agora_call( ...@@ -579,7 +605,9 @@ pub async fn handle_agora_call(
let user_data = { ONLINE_USERS.get(from_id) }; let user_data = { ONLINE_USERS.get(from_id) };
// 给当前from_id用户发送CmdHangup,表示已经成功拒接了电话,并且修改了状态 // 给当前from_id用户发送CmdHangup,表示已经成功拒接了电话,并且修改了状态
if let Some(current_user_data) = user_data { if let Some(current_user_data) = user_data {
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect(); let mut current_user_data_vec = current_user_data.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
let hangup_refuse_message = serde_json::json!({ let hangup_refuse_message = serde_json::json!({
"msgType": "CmdHangup", "msgType": "CmdHangup",
"fromID": "0", "fromID": "0",
...@@ -592,10 +620,11 @@ pub async fn handle_agora_call( ...@@ -592,10 +620,11 @@ pub async fn handle_agora_call(
}) })
.to_string(); .to_string();
// 修改状态数据 // 修改状态数据
current_user_data_vec[0] = "idle"; current_user_data_vec[0] = "idle".to_string();
current_user_data_vec[1] = ""; current_user_data_vec[1] = "".to_string();
current_user_data_vec[6] = "0"; current_user_data_vec[6] = "0".to_string();
let current_user_data_joined = current_user_data_vec.join(","); let current_user_data_joined = current_user_data_vec.join(",");
drop(current_user_data_vec);
ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone()); ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone());
let from_id_clone = from_id.to_string(); let from_id_clone = from_id.to_string();
tokio::spawn(async move { tokio::spawn(async move {
...@@ -696,11 +725,8 @@ pub async fn handle_agora_call( ...@@ -696,11 +725,8 @@ pub async fn handle_agora_call(
let clone_for_users_to_notify = users_to_notify.clone(); let clone_for_users_to_notify = users_to_notify.clone();
// 再根据上面的id,找到所有sender,并且修改所有对应ONLINE_USERS数据 // 再根据上面的id,找到所有sender,并且修改所有对应ONLINE_USERS数据
for user_id_current_chat in users_to_notify { for user_id_current_chat in users_to_notify {
let user_d = { let user_d = { ONLINE_USERS.get(&user_id_current_chat) };
ONLINE_USERS.get(&user_id_current_chat) if let Some(current_user_data) = user_d {
};
if let Some(current_user_data) = user_d
{
let mut current_user_data_vec: Vec<&str> = let mut current_user_data_vec: Vec<&str> =
current_user_data.split(',').collect(); current_user_data.split(',').collect();
current_user_data_vec[0] = "idle"; current_user_data_vec[0] = "idle";
...@@ -939,12 +965,9 @@ pub async fn handle_agora_call( ...@@ -939,12 +965,9 @@ pub async fn handle_agora_call(
// 让第一位id的用户成为主持人 // 让第一位id的用户成为主持人
if let Some(allowed_user_id) = allowed_users_clone.get(0) { if let Some(allowed_user_id) = allowed_users_clone.get(0) {
let allowed_user_id_clone = allowed_user_id.clone(); // 克隆 allowed_user_id let allowed_user_id_clone = allowed_user_id.clone(); // 克隆 allowed_user_id
let data_f = { let data_f = { ONLINE_USERS.get(&allowed_user_id_clone) };
ONLINE_USERS.get(&allowed_user_id_clone)
};
if let Some(allowed_user_info_data) = data_f { if let Some(allowed_user_info_data) = data_f {
let mut user_info_data_for_allowed = let mut user_info_data_for_allowed = allowed_user_info_data
allowed_user_info_data
.split(',') .split(',')
.map(|s| s.to_string()) .map(|s| s.to_string())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
...@@ -978,12 +1001,9 @@ pub async fn handle_agora_call( ...@@ -978,12 +1001,9 @@ pub async fn handle_agora_call(
let disallowed_users_clone = disallowed_users.clone(); // 克隆 disallowed_users let disallowed_users_clone = disallowed_users.clone(); // 克隆 disallowed_users
if let Some(disallowed_user_id) = disallowed_users_clone.get(0) { if let Some(disallowed_user_id) = disallowed_users_clone.get(0) {
let disallowed_user_id_clone = disallowed_user_id.clone(); // 克隆 disallowed_user_id let disallowed_user_id_clone = disallowed_user_id.clone(); // 克隆 disallowed_user_id
let d_f = { let d_f = { ONLINE_USERS.get(&disallowed_user_id_clone) };
ONLINE_USERS.get(&disallowed_user_id_clone)
};
if let Some(disallowed_user_info_data) = d_f { if let Some(disallowed_user_info_data) = d_f {
let mut current_host_data = let mut current_host_data = disallowed_user_info_data
disallowed_user_info_data
.split(',') .split(',')
.map(|s| s.to_string()) .map(|s| s.to_string())
.collect::<Vec<String>>(); .collect::<Vec<String>>();
...@@ -1029,14 +1049,23 @@ pub async fn handle_agora_call( ...@@ -1029,14 +1049,23 @@ pub async fn handle_agora_call(
// 4.决定主持人。一般来说,谁发起了这个频道的人,并且具备成为主持人的资格,那么就直接成为主持人,否则就让第一个加入的人成为主持人。 // 4.决定主持人。一般来说,谁发起了这个频道的人,并且具备成为主持人的资格,那么就直接成为主持人,否则就让第一个加入的人成为主持人。
"Connect" => { "Connect" => {
println!( println!(
"收到客户端Connect消息连接 {} 频道号 {}", "收到客户端Connect消息连接 {} 频道信息 {:?}",
&from_id, &from_id, &client_message_data
client_message_data
.msg_data
.get("channelID")
.unwrap_or(&serde_json::Value::String("null".to_string()))
); );
if let Some(channel_id) = client_message_data.msg_data.get("channelID") { // 发送connect消息之时,默认以toID用户的channelID作为channelID
let to_id_user = &client_message_data.to_id;
// 找到对应to_id用户的数据
let to_id_user_data_here = { ONLINE_USERS.get(to_id_user) };
if let Some(to_id_user_data) = to_id_user_data_here {
// 如果存在,则获取这个channel_id
let data_split = to_id_user_data
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
// 提前释放资源,释放对应锁
drop(to_id_user_data);
let target_channel_id = data_split[1].clone();
if !target_channel_id.is_empty() {
// 这里的from_id就是被呼叫方,to_id就是呼叫方 // 这里的from_id就是被呼叫方,to_id就是呼叫方
// 首先先取消掉所有的挂断from_id的任务 // 首先先取消掉所有的挂断from_id的任务
let hangup_p = { refuse_procedure_map.get(from_id) }; let hangup_p = { refuse_procedure_map.get(from_id) };
...@@ -1047,10 +1076,8 @@ pub async fn handle_agora_call( ...@@ -1047,10 +1076,8 @@ pub async fn handle_agora_call(
println!("挂断personnel的任务已经发送成功") println!("挂断personnel的任务已经发送成功")
} }
} }
// 将channel_id转换为字符串,并且进行解析
if let Some(channel_id_str) = channel_id.as_str() {
// 检查一下是否存在挂断channel的任务,如果存在,对齐发送消息,然后关闭 // 检查一下是否存在挂断channel的任务,如果存在,对齐发送消息,然后关闭
let hangup_p = { channel_hangup_procedure_map.get(channel_id_str) }; let hangup_p = { channel_hangup_procedure_map.get(&target_channel_id) };
if let Some(hangup_channel_task) = hangup_p { if let Some(hangup_channel_task) = hangup_p {
if let Err(e) = hangup_channel_task.send(()) { if let Err(e) = hangup_channel_task.send(()) {
println!("挂断channel的任务终结失败 {:?}", e) println!("挂断channel的任务终结失败 {:?}", e)
...@@ -1063,20 +1090,24 @@ pub async fn handle_agora_call( ...@@ -1063,20 +1090,24 @@ pub async fn handle_agora_call(
ONLINE_USERS ONLINE_USERS
.iter() .iter()
.filter(|entry| { .filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[1] == channel_id_str entry.value().split(",").collect::<Vec<_>>()[1]
== target_channel_id
}) })
.any(|entry| entry.value().split(",").collect::<Vec<_>>()[6] == "1") .any(|entry| entry.value().split(",").collect::<Vec<_>>()[6] == "1")
}; };
if already_had_host { if already_had_host {
println!("当前channel已经有主持人,直接返回 {}", channel_id_str) println!("当前channel已经有主持人,直接返回 {}", target_channel_id)
} else { } else {
println!("当前channel没有主持人,开始处理 {}", channel_id_str); println!("当前channel没有主持人,开始处理 {}", target_channel_id);
// 首先判断发起方,也就是channel_id用_分割之后的第一个元素,是否具备成为主持人的资格 // 首先判断发起方,也就是channel_id用_分割之后的第一个元素,是否具备成为主持人的资格
let last_remaining_id_set = channel_id_str.split("_").map(|s| s.to_string()).collect::<Vec<_>>(); let last_remaining_id_set = target_channel_id
if let Some(channel_id_first_element) = .split("_")
last_remaining_id_set.get(0) .map(|s| s.to_string())
.collect::<Vec<_>>();
if let Some(channel_id_first_element) = last_remaining_id_set.get(0) {
if HOST_ENABLED_ID_SET
.contains(&channel_id_first_element.to_string())
{ {
if HOST_ENABLED_ID_SET.contains(&channel_id_first_element.to_string()) {
// 具备成为主持人资格,直接修改其对应数据即可 // 具备成为主持人资格,直接修改其对应数据即可
let online_d = let online_d =
{ ONLINE_USERS.get(&channel_id_first_element.to_string()) }; { ONLINE_USERS.get(&channel_id_first_element.to_string()) };
...@@ -1114,7 +1145,7 @@ pub async fn handle_agora_call( ...@@ -1114,7 +1145,7 @@ pub async fn handle_agora_call(
.iter() .iter()
.filter(|entry| { .filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[1] entry.value().split(",").collect::<Vec<_>>()[1]
== channel_id_str == target_channel_id
}) })
.map(|entry| entry.key().clone()) .map(|entry| entry.key().clone())
.collect::<Vec<String>>() .collect::<Vec<String>>()
...@@ -1133,8 +1164,10 @@ pub async fn handle_agora_call( ...@@ -1133,8 +1164,10 @@ pub async fn handle_agora_call(
is_host_found = true; is_host_found = true;
// 更新对应数据 // 更新对应数据
let host_data_joined = host_data_vec.join(","); let host_data_joined = host_data_vec.join(",");
ONLINE_USERS ONLINE_USERS.insert(
.insert(id.to_string(), host_data_joined.clone()); id.to_string(),
host_data_joined.clone(),
);
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = update_client_redis_data( if let Err(e) = update_client_redis_data(
&id, &id,
...@@ -1155,8 +1188,7 @@ pub async fn handle_agora_call( ...@@ -1155,8 +1188,7 @@ pub async fn handle_agora_call(
} }
} }
if !is_host_found { if !is_host_found {
println!("当前channel没有主持人,并且没有找到合适的主持人,直接派遣第一个id即可 {}", channel_id_str); println!("当前channel没有主持人,并且没有找到合适的主持人,直接派遣第一个id即可 {}", target_channel_id);
}
} }
} }
} }
...@@ -1166,7 +1198,36 @@ pub async fn handle_agora_call( ...@@ -1166,7 +1198,36 @@ pub async fn handle_agora_call(
notify_all_clients_to_update_online_users().await; notify_all_clients_to_update_online_users().await;
}); });
} else { } else {
println!("客户端Connect消息 缺乏channel-id,拒绝处理消息") if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": "Error",
"msgData": "对方数据出现异常,缺少channelID",
"fromID" : "0",
"fromName": "Server",
"toID": from_id
})
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
}
}
} else {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": "Error",
"msgData": "必须传递to_id",
"fromID" : "0",
"fromName": "Server",
"toID": from_id
})
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
}
} }
} }
// 处理静音 // 处理静音
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment