Commit 1b2984f7 by qlintonger xeno

修改挂断呼叫逻辑,增添命令行参数以及环境变量适配+1

parent e93a18f7
...@@ -12,8 +12,7 @@ use tokio::time::sleep; ...@@ -12,8 +12,7 @@ use tokio::time::sleep;
lazy_static! { lazy_static! {
pub static ref refuse_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new(); pub static ref refuse_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new();
pub static ref channel_hangup_procedure_map: DashMap<String, UnboundedSender<()>> = pub static ref channel_hangup_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new();
DashMap::new();
} }
// 用户状态字段的索引 // 用户状态字段的索引
...@@ -37,15 +36,7 @@ fn update_user_status(user_id: &str, status: &str, channel_id: &str, is_host: bo ...@@ -37,15 +36,7 @@ fn update_user_status(user_id: &str, status: &str, channel_id: &str, is_host: bo
let mut user_data = ONLINE_USERS let mut user_data = ONLINE_USERS
.get(user_id) .get(user_id)
.map(|v| v.split(',').map(String::from).collect()) .map(|v| v.split(',').map(String::from).collect())
.unwrap_or(vec![ .unwrap_or(vec!["idle".into(), "".into(), "".into(), "".into(), "".into(), "".into(), "0".into()]);
"idle".into(),
"".into(),
"".into(),
"".into(),
"".into(),
"".into(),
"0".into(),
]);
user_data[STATUS_IDX] = status.into(); user_data[STATUS_IDX] = status.into();
user_data[CHANNEL_IDX] = channel_id.into(); user_data[CHANNEL_IDX] = channel_id.into();
user_data[HOST_IDX] = if is_host { "1".into() } else { "0".into() }; user_data[HOST_IDX] = if is_host { "1".into() } else { "0".into() };
...@@ -62,11 +53,7 @@ async fn send_inside_message( ...@@ -62,11 +53,7 @@ async fn send_inside_message(
from_id: &String, from_id: &String,
) { ) {
println!("发送给用户id {} 的消息 {}", from_id, json_message); println!("发送给用户id {} 的消息 {}", from_id, json_message);
if let Err(e) = event_sender.send(Event::SendClientMessage( if let Err(e) = event_sender.send(Event::SendClientMessage(target_sender.clone(), json_message, false)) {
target_sender.clone(),
json_message,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e); println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else { } else {
println!("发送给用户id {} 独立消息成功", from_id); println!("发送给用户id {} 独立消息成功", from_id);
...@@ -86,10 +73,8 @@ async fn send_error_message( ...@@ -86,10 +73,8 @@ async fn send_error_message(
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": from_id "toID": from_id
}) }).to_string();
.to_string(); if let Err(e) = event_sender.send(Event::SendClientMessage(target_sender.clone(), json, false)) {
if let Err(e) = event_sender.send(Event::SendClientMessage(target_sender.clone(), json, false))
{
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e); println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else { } else {
println!("发送给用户id {} 独立消息成功", from_id); println!("发送给用户id {} 独立消息成功", from_id);
...@@ -116,65 +101,40 @@ pub async fn handle_agora_call( ...@@ -116,65 +101,40 @@ pub async fn handle_agora_call(
match client_message_data.msg_type.as_str() { match client_message_data.msg_type.as_str() {
"CancelCall" => { "CancelCall" => {
let calling_to_id = &client_message_data.to_id; let calling_to_id = &client_message_data.to_id;
println!( println!("收到客户端取消呼叫 取消呼叫组: {} 呼叫方id {}", calling_to_id, from_id);
"收到客户端取消呼叫 取消呼叫组: {} 呼叫方id {}", send_inside_message(&target_sender_which, event_sender, serde_json::json!({
calling_to_id, from_id "msgType": "CmdCancelCall",
); "fromID": "0",
send_inside_message( "fromName": "Server",
&target_sender_which, "toID": from_id,
event_sender, "msgData": {"channelId": "", "rtcToken": ""}
serde_json::json!({ }).to_string(), from_id).await;
"msgType": "CmdCancelCall",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
from_id,
)
.await;
if let Some(user_status) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_status) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data = user_status.split(',').map(String::from).collect::<Vec<_>>(); let user_data = user_status.split(',').map(String::from).collect::<Vec<_>>();
if user_data[CHANNEL_IDX].is_empty() { if user_data[CHANNEL_IDX].is_empty() {
let joined = update_user_status(from_id, "idle", "", false); let joined = update_user_status(from_id, "idle", "", false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
return; return;
} }
let to_hangup_ids: Vec<String> = let to_hangup_ids: Vec<String> = calling_to_id.split(',').map(String::from).collect();
calling_to_id.split(',').map(String::from).collect();
for to_id in to_hangup_ids { for to_id in to_hangup_ids {
if let Some(sender) = refuse_procedure_map.get(&to_id) { if let Some(sender) = refuse_procedure_map.get(&to_id) {
sender.send(()).ok(); sender.send(()).ok();
} }
if let Some(other_data) = ONLINE_USERS.get(&to_id).map(|v| v.clone()) { if let Some(other_data) = ONLINE_USERS.get(&to_id).map(|v| v.clone()) {
let other_data_vec = let other_data_vec = other_data.split(',').map(String::from).collect::<Vec<_>>();
other_data.split(',').map(String::from).collect::<Vec<_>>();
if other_data_vec[CHANNEL_IDX] == user_data[CHANNEL_IDX] { if other_data_vec[CHANNEL_IDX] == user_data[CHANNEL_IDX] {
if let Some(target_sender) = CLIENT_SENDERS if let Some(target_sender) = CLIENT_SENDERS.iter().find(|entry| entry.key().0 == *calling_to_id).map(|entry| entry.key().clone()) {
.iter() send_inside_message(&target_sender, event_sender, serde_json::json!({
.find(|entry| entry.key().0 == *calling_to_id) "msgType": "CmdHangup",
.map(|entry| entry.key().clone()) "fromID": "0",
{ "fromName": "Server",
send_inside_message( "toID": to_id,
&target_sender, "msgData": {"channelId": "", "rtcToken": ""}
event_sender, }).to_string(), &to_id).await;
serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": to_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
&to_id,
)
.await;
let joined = update_user_status(&to_id, "idle", "", false); let joined = update_user_status(&to_id, "idle", "", false);
update_redis_async(to_id, joined); update_redis_async(to_id, joined);
} else { } else {
...@@ -185,36 +145,19 @@ pub async fn handle_agora_call( ...@@ -185,36 +145,19 @@ pub async fn handle_agora_call(
} }
let joined = update_user_status(from_id, "idle", "", false); let joined = update_user_status(from_id, "idle", "", false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
} }
} }
"Call" => { "Call" => {
let calling_to_id = &client_message_data.to_id; let calling_to_id = &client_message_data.to_id;
println!( println!("收到客户端呼叫 呼叫方id {} 呼叫组id {}", from_id, calling_to_id);
"收到客户端呼叫 呼叫方id {} 呼叫组id {}",
from_id, calling_to_id
);
if calling_to_id.is_empty() { if calling_to_id.is_empty() {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "请指定呼叫对象").await;
&target_sender_which,
event_sender,
from_id,
"请指定呼叫对象",
)
.await;
return; return;
} }
if calling_to_id == from_id { if calling_to_id == from_id {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "不能给自己打电话").await;
&target_sender_which,
event_sender,
from_id,
"不能给自己打电话",
)
.await;
return; return;
} }
...@@ -225,55 +168,29 @@ pub async fn handle_agora_call( ...@@ -225,55 +168,29 @@ pub async fn handle_agora_call(
} else { } else {
user_data[CHANNEL_IDX].clone() user_data[CHANNEL_IDX].clone()
}; };
println!( println!("当前用户channelId {} 呼叫方id集合是 {:?}", channel_id, calling_to_id.split(',').collect::<Vec<_>>());
"当前用户channelId {} 呼叫方id集合是 {:?}", if let Some(sender) = channel_hangup_procedure_map.get(&channel_id) { sender.send(()).ok(); }
channel_id,
calling_to_id.split(',').collect::<Vec<_>>()
);
let mut refuse_users = Vec::new(); let mut refuse_users = Vec::new();
for to_id in calling_to_id.split(',') { for to_id in calling_to_id.split(',') {
if !ONLINE_USERS.contains_key(to_id) { if !ONLINE_USERS.contains_key(to_id) {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "对方不在线").await;
&target_sender_which,
event_sender,
from_id,
"对方不在线",
)
.await;
continue; continue;
} }
if let Some(to_status) = ONLINE_USERS.get(to_id).map(|v| v.clone()) { if let Some(to_status) = ONLINE_USERS.get(to_id).map(|v| v.clone()) {
let to_data = to_status.split(',').map(String::from).collect::<Vec<_>>(); let to_data = to_status.split(',').map(String::from).collect::<Vec<_>>();
if to_data[STATUS_IDX] != "idle" { if to_data[STATUS_IDX] != "idle" {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "对方正在呼叫中").await;
&target_sender_which,
event_sender,
from_id,
"对方正在呼叫中",
)
.await;
continue; continue;
} }
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == to_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, serde_json::json!({
.find(|e| e.key().0 == to_id) "msgType": "Call",
.map(|e| e.key().clone()) "fromID": from_id,
{ "fromName": client_message_data.from_name,
send_inside_message( "toID": to_id,
&sender, "msgData": {"channelId": channel_id, "rtcToken": ""}
event_sender, }).to_string(), &to_id.to_string()).await;
serde_json::json!({
"msgType": "Call",
"fromID": from_id,
"fromName": client_message_data.from_name,
"toID": to_id,
"msgData": {"channelId": channel_id, "rtcToken": ""}
})
.to_string(),
&to_id.to_string(),
)
.await;
let joined = update_user_status(to_id, "callin", &channel_id, false); let joined = update_user_status(to_id, "callin", &channel_id, false);
update_redis_async(to_id.to_string(), joined); update_redis_async(to_id.to_string(), joined);
refuse_users.push(to_id.to_string()); refuse_users.push(to_id.to_string());
...@@ -284,30 +201,17 @@ pub async fn handle_agora_call( ...@@ -284,30 +201,17 @@ pub async fn handle_agora_call(
} }
if !refuse_users.is_empty() { if !refuse_users.is_empty() {
let status = if user_data[STATUS_IDX] == "calling" { let status = if user_data[STATUS_IDX] == "calling" { "calling" } else { "callout" };
"calling" send_inside_message(&target_sender_which, event_sender, serde_json::json!({
} else { "msgType": "CmdCall",
"callout" "fromID": calling_to_id,
}; "fromName": client_message_data.from_name,
send_inside_message( "toID": from_id,
&target_sender_which, "msgData": {"channelId": channel_id, "rtcToken": ""}
event_sender, }).to_string(), from_id).await;
serde_json::json!({
"msgType": "CmdCall",
"fromID": calling_to_id,
"fromName": client_message_data.from_name,
"toID": from_id,
"msgData": {"channelId": channel_id, "rtcToken": ""}
})
.to_string(),
from_id,
)
.await;
let joined = update_user_status(from_id, status, &channel_id, false); let joined = update_user_status(from_id, status, &channel_id, false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
for user_id in refuse_users { for user_id in refuse_users {
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>(); let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
...@@ -375,72 +279,41 @@ pub async fn handle_agora_call( ...@@ -375,72 +279,41 @@ pub async fn handle_agora_call(
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let joined = update_user_status(from_id, "idle", "", false); let joined = update_user_status(from_id, "idle", "", false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
send_inside_message( send_inside_message(&target_sender_which, event_sender, serde_json::json!({
&target_sender_which, "msgType": "CmdHangup",
event_sender, "fromID": "0",
serde_json::json!({ "fromName": "Server",
"msgType": "CmdHangup", "toID": from_id,
"fromID": "0", "msgData": {"channelId": "", "rtcToken": ""}
}).to_string(), from_id).await;
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == client_message_data.to_id).map(|e| e.key().clone()) {
send_inside_message(&sender, event_sender, serde_json::json!({
"msgType": "CmdRefuse",
"fromID": from_id,
"fromName": "Server", "fromName": "Server",
"toID": from_id, "toID": client_message_data.to_id,
"msgData": {"channelId": "", "rtcToken": ""} "msgData": {"channelId": "", "rtcToken": ""}
}) }).to_string(), &client_message_data.to_id).await;
.to_string(),
from_id,
)
.await;
if let Some(sender) = CLIENT_SENDERS
.iter()
.find(|e| e.key().0 == client_message_data.to_id)
.map(|e| e.key().clone())
{
send_inside_message(
&sender,
event_sender,
serde_json::json!({
"msgType": "CmdRefuse",
"fromID": from_id,
"fromName": "Server",
"toID": client_message_data.to_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
&client_message_data.to_id,
)
.await;
} else { } else {
println!("找不到toID对应的sender"); println!("找不到toID对应的sender");
} }
let chatters = get_users_by_channel(&channel_id); let chatters = get_users_by_channel(&channel_id);
if chatters.len() == 1 { if chatters.len() == 1 {
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == chatters[0]).map(|e| e.key().clone()) {
.iter()
.find(|e| e.key().0 == chatters[0])
.map(|e| e.key().clone())
{
let joined = update_user_status(&chatters[0], "idle", "", false); let joined = update_user_status(&chatters[0], "idle", "", false);
update_redis_async(chatters[0].clone(), joined); update_redis_async(chatters[0].clone(), joined);
send_inside_message( send_inside_message(&sender, event_sender, serde_json::json!({
&sender, "msgType": "CmdCancelCall",
event_sender, "fromID": "0",
serde_json::json!({ "fromName": "Server",
"msgType": "CmdCancelCall", "toID": chatters[0],
"fromID": "0", "msgData": {"channelId": "", "rtcToken": ""}
"fromName": "Server", }).to_string(), &chatters[0]).await;
"toID": chatters[0],
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
&chatters[0],
)
.await;
} }
} }
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
} }
} }
...@@ -449,13 +322,7 @@ pub async fn handle_agora_call( ...@@ -449,13 +322,7 @@ pub async fn handle_agora_call(
if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>(); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
if user_data_vec[HOST_IDX] != "1" { if user_data_vec[HOST_IDX] != "1" {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "只有主持人可以结束会议!").await;
&target_sender_which,
event_sender,
from_id,
"只有主持人可以结束会议!",
)
.await;
} else { } else {
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
if !channel_id.is_empty() { if !channel_id.is_empty() {
...@@ -463,30 +330,17 @@ pub async fn handle_agora_call( ...@@ -463,30 +330,17 @@ pub async fn handle_agora_call(
for user_id in &users { for user_id in &users {
let joined = update_user_status(user_id, "idle", "", false); let joined = update_user_status(user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined); update_redis_async(user_id.clone(), joined);
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == *user_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, serde_json::json!({
.find(|e| e.key().0 == *user_id) "msgType": "CmdEndMeeting",
.map(|e| e.key().clone()) "fromID": "0",
{ "fromName": "Server",
send_inside_message( "toID": user_id,
&sender, "msgData": {"channelId": "", "rtcToken": ""}
event_sender, }).to_string(), user_id).await;
serde_json::json!({
"msgType": "CmdEndMeeting",
"fromID": "0",
"fromName": "Server",
"toID": user_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
user_id,
)
.await;
} }
} }
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
} }
} }
} }
...@@ -499,230 +353,115 @@ pub async fn handle_agora_call( ...@@ -499,230 +353,115 @@ pub async fn handle_agora_call(
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let joined = update_user_status(from_id, "idle", "", false); let joined = update_user_status(from_id, "idle", "", false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
send_inside_message( send_inside_message(&target_sender_which, event_sender, serde_json::json!({
&target_sender_which, "msgType": "CmdHangup",
event_sender, "fromID": "0",
serde_json::json!({ "fromName": "Server",
"msgType": "CmdHangup", "toID": from_id,
"fromID": "0", "msgData": {"channelId": "", "rtcToken": ""}
"fromName": "Server", }).to_string(), from_id).await;
"toID": from_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
from_id,
)
.await;
let remaining_users = get_users_by_channel(&channel_id); let remaining_users = get_users_by_channel(&channel_id);
if remaining_users.is_empty() { if remaining_users.is_empty() {
println!("当前频道没有人员,请重新发起通话"); println!("当前频道没有人员,请重新发起通话");
} else { } else {
for user_id in &remaining_users { for user_id in &remaining_users {
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == *user_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, serde_json::json!({
.find(|e| e.key().0 == *user_id) "msgType": "CmdLeave",
.map(|e| e.key().clone()) "fromID": from_id,
{ "fromName": "Unknown",
send_inside_message( "toID": user_id,
&sender, "msgData": {}
event_sender, }).to_string(), user_id).await;
serde_json::json!({
"msgType": "CmdLeave",
"fromID": from_id,
"fromName": "Unknown",
"toID": user_id,
"msgData": {}
})
.to_string(),
user_id,
)
.await;
} }
} }
if remaining_users.len() == 1 { if remaining_users.len() == 1 {
let user_id = remaining_users[0].clone();
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>(); let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
channel_hangup_procedure_map.insert(channel_id.clone(), cancel_tx); channel_hangup_procedure_map.insert(channel_id.clone(), cancel_tx);
let event_sender_clone = event_sender.clone(); let event_sender_clone = event_sender.clone();
let channel_id_clone = channel_id.clone(); let channel_id_clone = channel_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
let start_time = tokio::time::Instant::now();
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut should_cancel = false;
tokio::select! { tokio::select! {
_ = async { _ = sleep(Duration::from_secs(15)) => {
loop { println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {}", user_id);
interval.tick().await; if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
// 检查频道中是否存在 callin 状态的用户 let joined = update_user_status(&user_id, "idle", "", false);
let has_callin = ONLINE_USERS.iter() update_redis_async(user_id.clone(), joined);
.filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id_clone) send_inside_message(&sender, &event_sender_clone, serde_json::json!({
.any(|e| e.value().split(',').nth(STATUS_IDX).unwrap_or("") == "callin"); "msgType": "CmdHangup",
"fromID": "0",
if has_callin { "fromName": "Server",
println!("检测到频道 {} 有用户处于呼叫状态,取消挂断任务", channel_id_clone); "toID": user_id,
should_cancel = true; "msgData": {"channelId": "", "rtcToken": ""}
break; }).to_string(), &user_id).await;
} tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
if start_time.elapsed().as_secs() >= 15 {
break;
}
}
if !should_cancel {
// 最终检查:只需要确认当前频道人数是否为1
let current_users = get_users_by_channel(&channel_id_clone);
if current_users.len() == 1 {
if let Some(user_id) = current_users.first() {
if let Some(sender) = CLIENT_SENDERS.iter()
.find(|e| e.key().0 == *user_id)
.map(|e| e.key().clone())
{
let joined = update_user_status(user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined);
send_inside_message(&sender, &event_sender_clone, serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": user_id,
"msgData": {"channelId": "", "rtcToken": ""}
}).to_string(), user_id).await;
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
}
}
}
} }
} => {} }
_ = cancel_rx.recv() => println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone), _ = cancel_rx.recv() => println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone),
} }
}); });
} else { } else {
let (allowed, disallowed): (Vec<_>, Vec<_>) = remaining_users let (allowed, disallowed): (Vec<_>, Vec<_>) = remaining_users.into_iter().partition(|id| HOST_ENABLED_ID_SET.contains(id.as_str()));
.into_iter()
.partition(|id| HOST_ENABLED_ID_SET.contains(id.as_str()));
let new_host = allowed.into_iter().next().or(disallowed.into_iter().next()); let new_host = allowed.into_iter().next().or(disallowed.into_iter().next());
if let Some(host_id) = new_host { if let Some(host_id) = new_host {
let joined = update_user_status( let joined = update_user_status(&host_id, &ONLINE_USERS.get(&host_id).map(|v| v.split(',').next().unwrap_or("idle").to_string()).unwrap_or("idle".into()), &channel_id, true);
&host_id,
&ONLINE_USERS
.get(&host_id)
.map(|v| v.split(',').next().unwrap_or("idle").to_string())
.unwrap_or("idle".into()),
&channel_id,
true,
);
update_redis_async(host_id, joined); update_redis_async(host_id, joined);
} }
} }
} }
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
} }
} }
"Connect" => { "Connect" => {
println!( println!("收到客户端Connect消息连接 {} 频道信息 {:?}", from_id, client_message_data);
"收到客户端Connect消息连接 {} 频道信息 {:?}",
from_id, client_message_data
);
let to_id = &client_message_data.to_id; let to_id = &client_message_data.to_id;
if let Some(to_data) = ONLINE_USERS.get(to_id).map(|v| v.clone()) { if let Some(to_data) = ONLINE_USERS.get(to_id).map(|v| v.clone()) {
let data_split = to_data.split(',').map(String::from).collect::<Vec<_>>(); let data_split = to_data.split(',').map(String::from).collect::<Vec<_>>();
let channel_id = data_split[CHANNEL_IDX].clone(); let channel_id = data_split[CHANNEL_IDX].clone();
if channel_id.is_empty() { if channel_id.is_empty() {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "对方数据出现异常,缺少channelID").await;
&target_sender_which,
event_sender,
from_id,
"对方数据出现异常,缺少channelID",
)
.await;
return; return;
} }
if let Some(sender) = refuse_procedure_map.get(from_id) { if let Some(sender) = refuse_procedure_map.get(from_id) { sender.send(()).ok(); }
sender.send(()).ok(); if let Some(sender) = channel_hangup_procedure_map.get(&channel_id) { sender.send(()).ok(); }
}
if let Some(sender) = channel_hangup_procedure_map.get(&channel_id) {
sender.send(()).ok();
}
if !ONLINE_USERS.iter().any(|e| { if !ONLINE_USERS.iter().any(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id && e.value().split(',').nth(HOST_IDX).unwrap_or("") == "1") {
e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id
&& e.value().split(',').nth(HOST_IDX).unwrap_or("") == "1"
}) {
let initiator = channel_id.split('_').next().unwrap_or(""); let initiator = channel_id.split('_').next().unwrap_or("");
let host_id = if HOST_ENABLED_ID_SET.contains(initiator) { let host_id = if HOST_ENABLED_ID_SET.contains(initiator) {
initiator.to_string() initiator.to_string()
} else { } else {
ONLINE_USERS ONLINE_USERS.iter()
.iter() .filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id)
.filter(|e| {
e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id
})
.map(|e| e.key().clone()) .map(|e| e.key().clone())
.find(|id| HOST_ENABLED_ID_SET.contains(id.as_str())) .find(|id| HOST_ENABLED_ID_SET.contains(id.as_str()))
.unwrap_or_else(|| { .unwrap_or_else(|| ONLINE_USERS.iter().find(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id).map(|e| e.key().clone()).unwrap_or_default())
ONLINE_USERS
.iter()
.find(|e| {
e.value().split(',').nth(CHANNEL_IDX).unwrap_or("")
== channel_id
})
.map(|e| e.key().clone())
.unwrap_or_default()
})
}; };
let joined = update_user_status( let joined = update_user_status(&host_id, &ONLINE_USERS.get(&host_id).map(|v| v.split(',').next().unwrap_or("idle").to_string()).unwrap_or("idle".into()), &channel_id, true);
&host_id,
&ONLINE_USERS
.get(&host_id)
.map(|v| v.split(',').next().unwrap_or("idle").to_string())
.unwrap_or("idle".into()),
&channel_id,
true,
);
update_redis_async(host_id, joined); update_redis_async(host_id, joined);
} }
let users = ONLINE_USERS let users = ONLINE_USERS.iter()
.iter() .filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id && e.value().split(',').nth(STATUS_IDX).unwrap_or("") != "calling")
.filter(|e| {
e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id
&& e.value().split(',').nth(STATUS_IDX).unwrap_or("") != "calling"
})
.map(|e| e.key().clone()) .map(|e| e.key().clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, serde_json::json!({
.find(|e| e.key().0 == user_id) "msgType": "CmdConnect",
.map(|e| e.key().clone()) "msgData": {"channelID": channel_id, "rtcToken": ""},
{ "fromID": "0",
send_inside_message( "fromName": "Server",
&sender, "toID": user_id
event_sender, }).to_string(), &user_id).await;
serde_json::json!({
"msgType": "CmdConnect",
"msgData": {"channelID": channel_id, "rtcToken": ""},
"fromID": "0",
"fromName": "Server",
"toID": user_id
})
.to_string(),
&user_id,
)
.await;
} }
} }
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
} else { } else {
send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id") send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id").await;
.await;
} }
} }
...@@ -731,60 +470,34 @@ pub async fn handle_agora_call( ...@@ -731,60 +470,34 @@ pub async fn handle_agora_call(
if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>(); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
if user_data_vec[HOST_IDX] != "1" { if user_data_vec[HOST_IDX] != "1" {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "只有主持人才能踢出用户").await;
&target_sender_which,
event_sender,
from_id,
"只有主持人才能踢出用户",
)
.await;
} else { } else {
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let users = get_users_by_channel(&channel_id); let users = get_users_by_channel(&channel_id);
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
.iter()
.find(|e| e.key().0 == user_id)
.map(|e| e.key().clone())
{
if user_id == client_message_data.to_id { if user_id == client_message_data.to_id {
let joined = update_user_status(&user_id, "idle", "", false); let joined = update_user_status(&user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined); update_redis_async(user_id.clone(), joined);
send_inside_message( send_inside_message(&sender, event_sender, serde_json::json!({
&sender, "msgType": "CmdHangup",
event_sender, "msgData": {"channelID": channel_id, "rtcToken": ""},
serde_json::json!({ "fromID": "0",
"msgType": "CmdHangup", "fromName": "Server",
"msgData": {"channelID": channel_id, "rtcToken": ""}, "toID": user_id
"fromID": "0", }).to_string(), &user_id).await;
"fromName": "Server",
"toID": user_id
})
.to_string(),
&user_id,
)
.await;
} else { } else {
send_inside_message( send_inside_message(&sender, event_sender, serde_json::json!({
&sender, "msgType": "CmdKickOut",
event_sender, "msgData": {"channelID": channel_id, "rtcToken": ""},
serde_json::json!({ "toID": client_message_data.to_id,
"msgType": "CmdKickOut", "fromName": "Server",
"msgData": {"channelID": channel_id, "rtcToken": ""}, "fromID": "0"
"toID": client_message_data.to_id, }).to_string(), &user_id).await;
"fromName": "Server",
"fromID": "0"
})
.to_string(),
&user_id,
)
.await;
} }
} }
} }
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
} }
} }
} }
...@@ -793,20 +506,13 @@ pub async fn handle_agora_call( ...@@ -793,20 +506,13 @@ pub async fn handle_agora_call(
println!("step - Mute - 1 收到静音通知"); println!("step - Mute - 1 收到静音通知");
let to_id = &client_message_data.to_id; let to_id = &client_message_data.to_id;
if to_id.is_empty() { if to_id.is_empty() {
send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id") send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id").await;
.await;
return; return;
} }
if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>(); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
if user_data_vec[HOST_IDX] != "1" { if user_data_vec[HOST_IDX] != "1" {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "您不是主持人,无法静音他人").await;
&target_sender_which,
event_sender,
from_id,
"您不是主持人,无法静音他人",
)
.await;
return; return;
} }
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
...@@ -817,16 +523,10 @@ pub async fn handle_agora_call( ...@@ -817,16 +523,10 @@ pub async fn handle_agora_call(
"fromID": to_id, "fromID": to_id,
"fromName": "Server", "fromName": "Server",
"toID": to_id "toID": to_id
}) }).to_string();
.to_string();
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, json_str.clone(), &user_id).await;
.find(|e| e.key().0 == user_id)
.map(|e| e.key().clone())
{
send_inside_message(&sender, event_sender, json_str.clone(), &user_id)
.await;
} }
} }
} }
...@@ -837,37 +537,20 @@ pub async fn handle_agora_call( ...@@ -837,37 +537,20 @@ pub async fn handle_agora_call(
if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>(); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
if user_data_vec[HOST_IDX] != "1" { if user_data_vec[HOST_IDX] != "1" {
send_error_message( send_error_message(&target_sender_which, event_sender, from_id, "只有主持人才有操作权限").await;
&target_sender_which,
event_sender,
from_id,
"只有主持人才有操作权限",
)
.await;
return; return;
} }
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let users = get_users_by_channel(&channel_id); let users = get_users_by_channel(&channel_id);
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, serde_json::json!({
.find(|e| e.key().0 == user_id) "msgType": "CmdMuteAll",
.map(|e| e.key().clone()) "msgData": {"channelID": channel_id, "rtcToken": ""},
{ "fromID": "0",
send_inside_message( "fromName": "Server",
&sender, "toID": user_id
event_sender, }).to_string(), &user_id).await;
serde_json::json!({
"msgType": "CmdMuteAll",
"msgData": {"channelID": channel_id, "rtcToken": ""},
"fromID": "0",
"fromName": "Server",
"toID": user_id
})
.to_string(),
&user_id,
)
.await;
} }
} }
} }
...@@ -880,25 +563,14 @@ pub async fn handle_agora_call( ...@@ -880,25 +563,14 @@ pub async fn handle_agora_call(
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let users = get_users_by_channel(&channel_id); let users = get_users_by_channel(&channel_id);
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, serde_json::json!({
.find(|e| e.key().0 == user_id) "msgType": client_message_data.msg_type.as_str(),
.map(|e| e.key().clone()) "msgData": {},
{ "fromID": from_id,
send_inside_message( "fromName": "Unknown",
&sender, "toID": user_id
event_sender, }).to_string(), &user_id).await;
serde_json::json!({
"msgType": client_message_data.msg_type.as_str(),
"msgData": {},
"fromID": from_id,
"fromName": "Unknown",
"toID": user_id
})
.to_string(),
&user_id,
)
.await;
} }
} }
} }
...@@ -906,4 +578,4 @@ pub async fn handle_agora_call( ...@@ -906,4 +578,4 @@ pub async fn handle_agora_call(
_ => {} _ => {}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment