Commit e93a18f7 by qlintonger xeno

修改挂断呼叫逻辑,增添命令行参数以及环境变量适配

parent c633ebeb
...@@ -12,7 +12,8 @@ use tokio::time::sleep; ...@@ -12,7 +12,8 @@ use tokio::time::sleep;
lazy_static! { lazy_static! {
pub static ref refuse_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new(); pub static ref refuse_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new();
pub static ref channel_hangup_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new(); pub static ref channel_hangup_procedure_map: DashMap<String, UnboundedSender<()>> =
DashMap::new();
} }
// 用户状态字段的索引 // 用户状态字段的索引
...@@ -36,7 +37,15 @@ fn update_user_status(user_id: &str, status: &str, channel_id: &str, is_host: bo ...@@ -36,7 +37,15 @@ fn update_user_status(user_id: &str, status: &str, channel_id: &str, is_host: bo
let mut user_data = ONLINE_USERS let mut user_data = ONLINE_USERS
.get(user_id) .get(user_id)
.map(|v| v.split(',').map(String::from).collect()) .map(|v| v.split(',').map(String::from).collect())
.unwrap_or(vec!["idle".into(), "".into(), "".into(), "".into(), "".into(), "".into(), "0".into()]); .unwrap_or(vec![
"idle".into(),
"".into(),
"".into(),
"".into(),
"".into(),
"".into(),
"0".into(),
]);
user_data[STATUS_IDX] = status.into(); user_data[STATUS_IDX] = status.into();
user_data[CHANNEL_IDX] = channel_id.into(); user_data[CHANNEL_IDX] = channel_id.into();
user_data[HOST_IDX] = if is_host { "1".into() } else { "0".into() }; user_data[HOST_IDX] = if is_host { "1".into() } else { "0".into() };
...@@ -53,7 +62,11 @@ async fn send_inside_message( ...@@ -53,7 +62,11 @@ async fn send_inside_message(
from_id: &String, from_id: &String,
) { ) {
println!("发送给用户id {} 的消息 {}", from_id, json_message); println!("发送给用户id {} 的消息 {}", from_id, json_message);
if let Err(e) = event_sender.send(Event::SendClientMessage(target_sender.clone(), json_message, false)) { if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender.clone(),
json_message,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e); println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else { } else {
println!("发送给用户id {} 独立消息成功", from_id); println!("发送给用户id {} 独立消息成功", from_id);
...@@ -73,8 +86,10 @@ async fn send_error_message( ...@@ -73,8 +86,10 @@ async fn send_error_message(
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": from_id "toID": from_id
}).to_string(); })
if let Err(e) = event_sender.send(Event::SendClientMessage(target_sender.clone(), json, false)) { .to_string();
if let Err(e) = event_sender.send(Event::SendClientMessage(target_sender.clone(), json, false))
{
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e); println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else { } else {
println!("发送给用户id {} 独立消息成功", from_id); println!("发送给用户id {} 独立消息成功", from_id);
...@@ -101,40 +116,65 @@ pub async fn handle_agora_call( ...@@ -101,40 +116,65 @@ pub async fn handle_agora_call(
match client_message_data.msg_type.as_str() { match client_message_data.msg_type.as_str() {
"CancelCall" => { "CancelCall" => {
let calling_to_id = &client_message_data.to_id; let calling_to_id = &client_message_data.to_id;
println!("收到客户端取消呼叫 取消呼叫组: {} 呼叫方id {}", calling_to_id, from_id); println!(
send_inside_message(&target_sender_which, event_sender, serde_json::json!({ "收到客户端取消呼叫 取消呼叫组: {} 呼叫方id {}",
"msgType": "CmdCancelCall", calling_to_id, from_id
"fromID": "0", );
"fromName": "Server", send_inside_message(
"toID": from_id, &target_sender_which,
"msgData": {"channelId": "", "rtcToken": ""} event_sender,
}).to_string(), from_id).await; serde_json::json!({
"msgType": "CmdCancelCall",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
from_id,
)
.await;
if let Some(user_status) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_status) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data = user_status.split(',').map(String::from).collect::<Vec<_>>(); let user_data = user_status.split(',').map(String::from).collect::<Vec<_>>();
if user_data[CHANNEL_IDX].is_empty() { if user_data[CHANNEL_IDX].is_empty() {
let joined = update_user_status(from_id, "idle", "", false); let joined = update_user_status(from_id, "idle", "", false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
return; return;
} }
let to_hangup_ids: Vec<String> = calling_to_id.split(',').map(String::from).collect(); let to_hangup_ids: Vec<String> =
calling_to_id.split(',').map(String::from).collect();
for to_id in to_hangup_ids { for to_id in to_hangup_ids {
if let Some(sender) = refuse_procedure_map.get(&to_id) { if let Some(sender) = refuse_procedure_map.get(&to_id) {
sender.send(()).ok(); sender.send(()).ok();
} }
if let Some(other_data) = ONLINE_USERS.get(&to_id).map(|v| v.clone()) { if let Some(other_data) = ONLINE_USERS.get(&to_id).map(|v| v.clone()) {
let other_data_vec = other_data.split(',').map(String::from).collect::<Vec<_>>(); let other_data_vec =
other_data.split(',').map(String::from).collect::<Vec<_>>();
if other_data_vec[CHANNEL_IDX] == user_data[CHANNEL_IDX] { if other_data_vec[CHANNEL_IDX] == user_data[CHANNEL_IDX] {
if let Some(target_sender) = CLIENT_SENDERS.iter().find(|entry| entry.key().0 == *calling_to_id).map(|entry| entry.key().clone()) { if let Some(target_sender) = CLIENT_SENDERS
send_inside_message(&target_sender, event_sender, serde_json::json!({ .iter()
"msgType": "CmdHangup", .find(|entry| entry.key().0 == *calling_to_id)
"fromID": "0", .map(|entry| entry.key().clone())
"fromName": "Server", {
"toID": to_id, send_inside_message(
"msgData": {"channelId": "", "rtcToken": ""} &target_sender,
}).to_string(), &to_id).await; event_sender,
serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": to_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
&to_id,
)
.await;
let joined = update_user_status(&to_id, "idle", "", false); let joined = update_user_status(&to_id, "idle", "", false);
update_redis_async(to_id, joined); update_redis_async(to_id, joined);
} else { } else {
...@@ -145,19 +185,36 @@ pub async fn handle_agora_call( ...@@ -145,19 +185,36 @@ pub async fn handle_agora_call(
} }
let joined = update_user_status(from_id, "idle", "", false); let joined = update_user_status(from_id, "idle", "", false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} }
} }
"Call" => { "Call" => {
let calling_to_id = &client_message_data.to_id; let calling_to_id = &client_message_data.to_id;
println!("收到客户端呼叫 呼叫方id {} 呼叫组id {}", from_id, calling_to_id); println!(
"收到客户端呼叫 呼叫方id {} 呼叫组id {}",
from_id, calling_to_id
);
if calling_to_id.is_empty() { if calling_to_id.is_empty() {
send_error_message(&target_sender_which, event_sender, from_id, "请指定呼叫对象").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"请指定呼叫对象",
)
.await;
return; return;
} }
if calling_to_id == from_id { if calling_to_id == from_id {
send_error_message(&target_sender_which, event_sender, from_id, "不能给自己打电话").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"不能给自己打电话",
)
.await;
return; return;
} }
...@@ -168,28 +225,55 @@ pub async fn handle_agora_call( ...@@ -168,28 +225,55 @@ pub async fn handle_agora_call(
} else { } else {
user_data[CHANNEL_IDX].clone() user_data[CHANNEL_IDX].clone()
}; };
println!("当前用户channelId {} 呼叫方id集合是 {:?}", channel_id, calling_to_id.split(',').collect::<Vec<_>>()); println!(
"当前用户channelId {} 呼叫方id集合是 {:?}",
channel_id,
calling_to_id.split(',').collect::<Vec<_>>()
);
let mut refuse_users = Vec::new(); let mut refuse_users = Vec::new();
for to_id in calling_to_id.split(',') { for to_id in calling_to_id.split(',') {
if !ONLINE_USERS.contains_key(to_id) { if !ONLINE_USERS.contains_key(to_id) {
send_error_message(&target_sender_which, event_sender, from_id, "对方不在线").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"对方不在线",
)
.await;
continue; continue;
} }
if let Some(to_status) = ONLINE_USERS.get(to_id).map(|v| v.clone()) { if let Some(to_status) = ONLINE_USERS.get(to_id).map(|v| v.clone()) {
let to_data = to_status.split(',').map(String::from).collect::<Vec<_>>(); let to_data = to_status.split(',').map(String::from).collect::<Vec<_>>();
if to_data[STATUS_IDX] != "idle" { if to_data[STATUS_IDX] != "idle" {
send_error_message(&target_sender_which, event_sender, from_id, "对方正在呼叫中").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"对方正在呼叫中",
)
.await;
continue; continue;
} }
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == to_id).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
send_inside_message(&sender, event_sender, serde_json::json!({ .iter()
"msgType": "Call", .find(|e| e.key().0 == to_id)
"fromID": from_id, .map(|e| e.key().clone())
"fromName": client_message_data.from_name, {
"toID": to_id, send_inside_message(
"msgData": {"channelId": channel_id, "rtcToken": ""} &sender,
}).to_string(), &to_id.to_string()).await; event_sender,
serde_json::json!({
"msgType": "Call",
"fromID": from_id,
"fromName": client_message_data.from_name,
"toID": to_id,
"msgData": {"channelId": channel_id, "rtcToken": ""}
})
.to_string(),
&to_id.to_string(),
)
.await;
let joined = update_user_status(to_id, "callin", &channel_id, false); let joined = update_user_status(to_id, "callin", &channel_id, false);
update_redis_async(to_id.to_string(), joined); update_redis_async(to_id.to_string(), joined);
refuse_users.push(to_id.to_string()); refuse_users.push(to_id.to_string());
...@@ -200,17 +284,30 @@ pub async fn handle_agora_call( ...@@ -200,17 +284,30 @@ pub async fn handle_agora_call(
} }
if !refuse_users.is_empty() { if !refuse_users.is_empty() {
let status = if user_data[STATUS_IDX] == "calling" { "calling" } else { "callout" }; let status = if user_data[STATUS_IDX] == "calling" {
send_inside_message(&target_sender_which, event_sender, serde_json::json!({ "calling"
"msgType": "CmdCall", } else {
"fromID": calling_to_id, "callout"
"fromName": client_message_data.from_name, };
"toID": from_id, send_inside_message(
"msgData": {"channelId": channel_id, "rtcToken": ""} &target_sender_which,
}).to_string(), from_id).await; event_sender,
serde_json::json!({
"msgType": "CmdCall",
"fromID": calling_to_id,
"fromName": client_message_data.from_name,
"toID": from_id,
"msgData": {"channelId": channel_id, "rtcToken": ""}
})
.to_string(),
from_id,
)
.await;
let joined = update_user_status(from_id, status, &channel_id, false); let joined = update_user_status(from_id, status, &channel_id, false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
for user_id in refuse_users { for user_id in refuse_users {
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>(); let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
...@@ -278,41 +375,72 @@ pub async fn handle_agora_call( ...@@ -278,41 +375,72 @@ pub async fn handle_agora_call(
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let joined = update_user_status(from_id, "idle", "", false); let joined = update_user_status(from_id, "idle", "", false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
send_inside_message(&target_sender_which, event_sender, serde_json::json!({ send_inside_message(
"msgType": "CmdHangup", &target_sender_which,
"fromID": "0", event_sender,
"fromName": "Server", serde_json::json!({
"toID": from_id, "msgType": "CmdHangup",
"msgData": {"channelId": "", "rtcToken": ""} "fromID": "0",
}).to_string(), from_id).await;
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == client_message_data.to_id).map(|e| e.key().clone()) {
send_inside_message(&sender, event_sender, serde_json::json!({
"msgType": "CmdRefuse",
"fromID": from_id,
"fromName": "Server", "fromName": "Server",
"toID": client_message_data.to_id, "toID": from_id,
"msgData": {"channelId": "", "rtcToken": ""} "msgData": {"channelId": "", "rtcToken": ""}
}).to_string(), &client_message_data.to_id).await; })
.to_string(),
from_id,
)
.await;
if let Some(sender) = CLIENT_SENDERS
.iter()
.find(|e| e.key().0 == client_message_data.to_id)
.map(|e| e.key().clone())
{
send_inside_message(
&sender,
event_sender,
serde_json::json!({
"msgType": "CmdRefuse",
"fromID": from_id,
"fromName": "Server",
"toID": client_message_data.to_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
&client_message_data.to_id,
)
.await;
} else { } else {
println!("找不到toID对应的sender"); println!("找不到toID对应的sender");
} }
let chatters = get_users_by_channel(&channel_id); let chatters = get_users_by_channel(&channel_id);
if chatters.len() == 1 { if chatters.len() == 1 {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == chatters[0]).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
.iter()
.find(|e| e.key().0 == chatters[0])
.map(|e| e.key().clone())
{
let joined = update_user_status(&chatters[0], "idle", "", false); let joined = update_user_status(&chatters[0], "idle", "", false);
update_redis_async(chatters[0].clone(), joined); update_redis_async(chatters[0].clone(), joined);
send_inside_message(&sender, event_sender, serde_json::json!({ send_inside_message(
"msgType": "CmdCancelCall", &sender,
"fromID": "0", event_sender,
"fromName": "Server", serde_json::json!({
"toID": chatters[0], "msgType": "CmdCancelCall",
"msgData": {"channelId": "", "rtcToken": ""} "fromID": "0",
}).to_string(), &chatters[0]).await; "fromName": "Server",
"toID": chatters[0],
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
&chatters[0],
)
.await;
} }
} }
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} }
} }
...@@ -321,7 +449,13 @@ pub async fn handle_agora_call( ...@@ -321,7 +449,13 @@ pub async fn handle_agora_call(
if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>(); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
if user_data_vec[HOST_IDX] != "1" { if user_data_vec[HOST_IDX] != "1" {
send_error_message(&target_sender_which, event_sender, from_id, "只有主持人可以结束会议!").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"只有主持人可以结束会议!",
)
.await;
} else { } else {
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
if !channel_id.is_empty() { if !channel_id.is_empty() {
...@@ -329,17 +463,30 @@ pub async fn handle_agora_call( ...@@ -329,17 +463,30 @@ pub async fn handle_agora_call(
for user_id in &users { for user_id in &users {
let joined = update_user_status(user_id, "idle", "", false); let joined = update_user_status(user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined); update_redis_async(user_id.clone(), joined);
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == *user_id).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
send_inside_message(&sender, event_sender, serde_json::json!({ .iter()
"msgType": "CmdEndMeeting", .find(|e| e.key().0 == *user_id)
"fromID": "0", .map(|e| e.key().clone())
"fromName": "Server", {
"toID": user_id, send_inside_message(
"msgData": {"channelId": "", "rtcToken": ""} &sender,
}).to_string(), user_id).await; event_sender,
serde_json::json!({
"msgType": "CmdEndMeeting",
"fromID": "0",
"fromName": "Server",
"toID": user_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
user_id,
)
.await;
} }
} }
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} }
} }
} }
...@@ -352,115 +499,230 @@ pub async fn handle_agora_call( ...@@ -352,115 +499,230 @@ pub async fn handle_agora_call(
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let joined = update_user_status(from_id, "idle", "", false); let joined = update_user_status(from_id, "idle", "", false);
update_redis_async(from_id.clone(), joined); update_redis_async(from_id.clone(), joined);
send_inside_message(&target_sender_which, event_sender, serde_json::json!({ send_inside_message(
"msgType": "CmdHangup", &target_sender_which,
"fromID": "0", event_sender,
"fromName": "Server", serde_json::json!({
"toID": from_id, "msgType": "CmdHangup",
"msgData": {"channelId": "", "rtcToken": ""} "fromID": "0",
}).to_string(), from_id).await; "fromName": "Server",
"toID": from_id,
"msgData": {"channelId": "", "rtcToken": ""}
})
.to_string(),
from_id,
)
.await;
let remaining_users = get_users_by_channel(&channel_id); let remaining_users = get_users_by_channel(&channel_id);
if remaining_users.is_empty() { if remaining_users.is_empty() {
println!("当前频道没有人员,请重新发起通话"); println!("当前频道没有人员,请重新发起通话");
} else { } else {
for user_id in &remaining_users { for user_id in &remaining_users {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == *user_id).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
send_inside_message(&sender, event_sender, serde_json::json!({ .iter()
"msgType": "CmdLeave", .find(|e| e.key().0 == *user_id)
"fromID": from_id, .map(|e| e.key().clone())
"fromName": "Unknown", {
"toID": user_id, send_inside_message(
"msgData": {} &sender,
}).to_string(), user_id).await; event_sender,
serde_json::json!({
"msgType": "CmdLeave",
"fromID": from_id,
"fromName": "Unknown",
"toID": user_id,
"msgData": {}
})
.to_string(),
user_id,
)
.await;
} }
} }
if remaining_users.len() == 1 { if remaining_users.len() == 1 {
let user_id = remaining_users[0].clone();
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>(); let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
channel_hangup_procedure_map.insert(channel_id.clone(), cancel_tx); channel_hangup_procedure_map.insert(channel_id.clone(), cancel_tx);
let event_sender_clone = event_sender.clone(); let event_sender_clone = event_sender.clone();
let channel_id_clone = channel_id.clone(); let channel_id_clone = channel_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
let start_time = tokio::time::Instant::now();
let mut interval = tokio::time::interval(Duration::from_secs(1));
let mut should_cancel = false;
tokio::select! { tokio::select! {
_ = sleep(Duration::from_secs(15)) => { _ = async {
println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {}", user_id); loop {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) { interval.tick().await;
let joined = update_user_status(&user_id, "idle", "", false); // 检查频道中是否存在 callin 状态的用户
update_redis_async(user_id.clone(), joined); let has_callin = ONLINE_USERS.iter()
send_inside_message(&sender, &event_sender_clone, serde_json::json!({ .filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id_clone)
"msgType": "CmdHangup", .any(|e| e.value().split(',').nth(STATUS_IDX).unwrap_or("") == "callin");
"fromID": "0",
"fromName": "Server", if has_callin {
"toID": user_id, println!("检测到频道 {} 有用户处于呼叫状态,取消挂断任务", channel_id_clone);
"msgData": {"channelId": "", "rtcToken": ""} should_cancel = true;
}).to_string(), &user_id).await; break;
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); }
if start_time.elapsed().as_secs() >= 15 {
break;
}
} }
}
if !should_cancel {
// 最终检查:只需要确认当前频道人数是否为1
let current_users = get_users_by_channel(&channel_id_clone);
if current_users.len() == 1 {
if let Some(user_id) = current_users.first() {
if let Some(sender) = CLIENT_SENDERS.iter()
.find(|e| e.key().0 == *user_id)
.map(|e| e.key().clone())
{
let joined = update_user_status(user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined);
send_inside_message(&sender, &event_sender_clone, serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": user_id,
"msgData": {"channelId": "", "rtcToken": ""}
}).to_string(), user_id).await;
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
}
}
}
}
} => {}
_ = cancel_rx.recv() => println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone), _ = cancel_rx.recv() => println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone),
} }
}); });
} else { } else {
let (allowed, disallowed): (Vec<_>, Vec<_>) = remaining_users.into_iter().partition(|id| HOST_ENABLED_ID_SET.contains(id.as_str())); let (allowed, disallowed): (Vec<_>, Vec<_>) = remaining_users
.into_iter()
.partition(|id| HOST_ENABLED_ID_SET.contains(id.as_str()));
let new_host = allowed.into_iter().next().or(disallowed.into_iter().next()); let new_host = allowed.into_iter().next().or(disallowed.into_iter().next());
if let Some(host_id) = new_host { if let Some(host_id) = new_host {
let joined = update_user_status(&host_id, &ONLINE_USERS.get(&host_id).map(|v| v.split(',').next().unwrap_or("idle").to_string()).unwrap_or("idle".into()), &channel_id, true); let joined = update_user_status(
&host_id,
&ONLINE_USERS
.get(&host_id)
.map(|v| v.split(',').next().unwrap_or("idle").to_string())
.unwrap_or("idle".into()),
&channel_id,
true,
);
update_redis_async(host_id, joined); update_redis_async(host_id, joined);
} }
} }
} }
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} }
} }
"Connect" => { "Connect" => {
println!("收到客户端Connect消息连接 {} 频道信息 {:?}", from_id, client_message_data); println!(
"收到客户端Connect消息连接 {} 频道信息 {:?}",
from_id, client_message_data
);
let to_id = &client_message_data.to_id; let to_id = &client_message_data.to_id;
if let Some(to_data) = ONLINE_USERS.get(to_id).map(|v| v.clone()) { if let Some(to_data) = ONLINE_USERS.get(to_id).map(|v| v.clone()) {
let data_split = to_data.split(',').map(String::from).collect::<Vec<_>>(); let data_split = to_data.split(',').map(String::from).collect::<Vec<_>>();
let channel_id = data_split[CHANNEL_IDX].clone(); let channel_id = data_split[CHANNEL_IDX].clone();
if channel_id.is_empty() { if channel_id.is_empty() {
send_error_message(&target_sender_which, event_sender, from_id, "对方数据出现异常,缺少channelID").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"对方数据出现异常,缺少channelID",
)
.await;
return; return;
} }
if let Some(sender) = refuse_procedure_map.get(from_id) { sender.send(()).ok(); } if let Some(sender) = refuse_procedure_map.get(from_id) {
if let Some(sender) = channel_hangup_procedure_map.get(&channel_id) { sender.send(()).ok(); } sender.send(()).ok();
}
if let Some(sender) = channel_hangup_procedure_map.get(&channel_id) {
sender.send(()).ok();
}
if !ONLINE_USERS.iter().any(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id && e.value().split(',').nth(HOST_IDX).unwrap_or("") == "1") { if !ONLINE_USERS.iter().any(|e| {
e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id
&& e.value().split(',').nth(HOST_IDX).unwrap_or("") == "1"
}) {
let initiator = channel_id.split('_').next().unwrap_or(""); let initiator = channel_id.split('_').next().unwrap_or("");
let host_id = if HOST_ENABLED_ID_SET.contains(initiator) { let host_id = if HOST_ENABLED_ID_SET.contains(initiator) {
initiator.to_string() initiator.to_string()
} else { } else {
ONLINE_USERS.iter() ONLINE_USERS
.filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id) .iter()
.filter(|e| {
e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id
})
.map(|e| e.key().clone()) .map(|e| e.key().clone())
.find(|id| HOST_ENABLED_ID_SET.contains(id.as_str())) .find(|id| HOST_ENABLED_ID_SET.contains(id.as_str()))
.unwrap_or_else(|| ONLINE_USERS.iter().find(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id).map(|e| e.key().clone()).unwrap_or_default()) .unwrap_or_else(|| {
ONLINE_USERS
.iter()
.find(|e| {
e.value().split(',').nth(CHANNEL_IDX).unwrap_or("")
== channel_id
})
.map(|e| e.key().clone())
.unwrap_or_default()
})
}; };
let joined = update_user_status(&host_id, &ONLINE_USERS.get(&host_id).map(|v| v.split(',').next().unwrap_or("idle").to_string()).unwrap_or("idle".into()), &channel_id, true); let joined = update_user_status(
&host_id,
&ONLINE_USERS
.get(&host_id)
.map(|v| v.split(',').next().unwrap_or("idle").to_string())
.unwrap_or("idle".into()),
&channel_id,
true,
);
update_redis_async(host_id, joined); update_redis_async(host_id, joined);
} }
let users = ONLINE_USERS.iter() let users = ONLINE_USERS
.filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id && e.value().split(',').nth(STATUS_IDX).unwrap_or("") != "calling") .iter()
.filter(|e| {
e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id
&& e.value().split(',').nth(STATUS_IDX).unwrap_or("") != "calling"
})
.map(|e| e.key().clone()) .map(|e| e.key().clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
send_inside_message(&sender, event_sender, serde_json::json!({ .iter()
"msgType": "CmdConnect", .find(|e| e.key().0 == user_id)
"msgData": {"channelID": channel_id, "rtcToken": ""}, .map(|e| e.key().clone())
"fromID": "0", {
"fromName": "Server", send_inside_message(
"toID": user_id &sender,
}).to_string(), &user_id).await; event_sender,
serde_json::json!({
"msgType": "CmdConnect",
"msgData": {"channelID": channel_id, "rtcToken": ""},
"fromID": "0",
"fromName": "Server",
"toID": user_id
})
.to_string(),
&user_id,
)
.await;
} }
} }
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} else { } else {
send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id").await; send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id")
.await;
} }
} }
...@@ -469,34 +731,60 @@ pub async fn handle_agora_call( ...@@ -469,34 +731,60 @@ pub async fn handle_agora_call(
if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>(); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
if user_data_vec[HOST_IDX] != "1" { if user_data_vec[HOST_IDX] != "1" {
send_error_message(&target_sender_which, event_sender, from_id, "只有主持人才能踢出用户").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"只有主持人才能踢出用户",
)
.await;
} else { } else {
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let users = get_users_by_channel(&channel_id); let users = get_users_by_channel(&channel_id);
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
.iter()
.find(|e| e.key().0 == user_id)
.map(|e| e.key().clone())
{
if user_id == client_message_data.to_id { if user_id == client_message_data.to_id {
let joined = update_user_status(&user_id, "idle", "", false); let joined = update_user_status(&user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined); update_redis_async(user_id.clone(), joined);
send_inside_message(&sender, event_sender, serde_json::json!({ send_inside_message(
"msgType": "CmdHangup", &sender,
"msgData": {"channelID": channel_id, "rtcToken": ""}, event_sender,
"fromID": "0", serde_json::json!({
"fromName": "Server", "msgType": "CmdHangup",
"toID": user_id "msgData": {"channelID": channel_id, "rtcToken": ""},
}).to_string(), &user_id).await; "fromID": "0",
"fromName": "Server",
"toID": user_id
})
.to_string(),
&user_id,
)
.await;
} else { } else {
send_inside_message(&sender, event_sender, serde_json::json!({ send_inside_message(
"msgType": "CmdKickOut", &sender,
"msgData": {"channelID": channel_id, "rtcToken": ""}, event_sender,
"toID": client_message_data.to_id, serde_json::json!({
"fromName": "Server", "msgType": "CmdKickOut",
"fromID": "0" "msgData": {"channelID": channel_id, "rtcToken": ""},
}).to_string(), &user_id).await; "toID": client_message_data.to_id,
"fromName": "Server",
"fromID": "0"
})
.to_string(),
&user_id,
)
.await;
} }
} }
} }
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; }); tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} }
} }
} }
...@@ -505,13 +793,20 @@ pub async fn handle_agora_call( ...@@ -505,13 +793,20 @@ pub async fn handle_agora_call(
println!("step - Mute - 1 收到静音通知"); println!("step - Mute - 1 收到静音通知");
let to_id = &client_message_data.to_id; let to_id = &client_message_data.to_id;
if to_id.is_empty() { if to_id.is_empty() {
send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id").await; send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id")
.await;
return; return;
} }
if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>(); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
if user_data_vec[HOST_IDX] != "1" { if user_data_vec[HOST_IDX] != "1" {
send_error_message(&target_sender_which, event_sender, from_id, "您不是主持人,无法静音他人").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"您不是主持人,无法静音他人",
)
.await;
return; return;
} }
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
...@@ -522,10 +817,16 @@ pub async fn handle_agora_call( ...@@ -522,10 +817,16 @@ pub async fn handle_agora_call(
"fromID": to_id, "fromID": to_id,
"fromName": "Server", "fromName": "Server",
"toID": to_id "toID": to_id
}).to_string(); })
.to_string();
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
send_inside_message(&sender, event_sender, json_str.clone(), &user_id).await; .iter()
.find(|e| e.key().0 == user_id)
.map(|e| e.key().clone())
{
send_inside_message(&sender, event_sender, json_str.clone(), &user_id)
.await;
} }
} }
} }
...@@ -536,20 +837,37 @@ pub async fn handle_agora_call( ...@@ -536,20 +837,37 @@ pub async fn handle_agora_call(
if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>(); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
if user_data_vec[HOST_IDX] != "1" { if user_data_vec[HOST_IDX] != "1" {
send_error_message(&target_sender_which, event_sender, from_id, "只有主持人才有操作权限").await; send_error_message(
&target_sender_which,
event_sender,
from_id,
"只有主持人才有操作权限",
)
.await;
return; return;
} }
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let users = get_users_by_channel(&channel_id); let users = get_users_by_channel(&channel_id);
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
send_inside_message(&sender, event_sender, serde_json::json!({ .iter()
"msgType": "CmdMuteAll", .find(|e| e.key().0 == user_id)
"msgData": {"channelID": channel_id, "rtcToken": ""}, .map(|e| e.key().clone())
"fromID": "0", {
"fromName": "Server", send_inside_message(
"toID": user_id &sender,
}).to_string(), &user_id).await; event_sender,
serde_json::json!({
"msgType": "CmdMuteAll",
"msgData": {"channelID": channel_id, "rtcToken": ""},
"fromID": "0",
"fromName": "Server",
"toID": user_id
})
.to_string(),
&user_id,
)
.await;
} }
} }
} }
...@@ -562,14 +880,25 @@ pub async fn handle_agora_call( ...@@ -562,14 +880,25 @@ pub async fn handle_agora_call(
let channel_id = user_data_vec[CHANNEL_IDX].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let users = get_users_by_channel(&channel_id); let users = get_users_by_channel(&channel_id);
for user_id in users { for user_id in users {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) { if let Some(sender) = CLIENT_SENDERS
send_inside_message(&sender, event_sender, serde_json::json!({ .iter()
"msgType": client_message_data.msg_type.as_str(), .find(|e| e.key().0 == user_id)
"msgData": {}, .map(|e| e.key().clone())
"fromID": from_id, {
"fromName": "Unknown", send_inside_message(
"toID": user_id &sender,
}).to_string(), &user_id).await; event_sender,
serde_json::json!({
"msgType": client_message_data.msg_type.as_str(),
"msgData": {},
"fromID": from_id,
"fromName": "Unknown",
"toID": user_id
})
.to_string(),
&user_id,
)
.await;
} }
} }
} }
...@@ -577,4 +906,4 @@ pub async fn handle_agora_call( ...@@ -577,4 +906,4 @@ pub async fn handle_agora_call(
_ => {} _ => {}
} }
} }
\ No newline at end of file
...@@ -10,14 +10,22 @@ mod deport; ...@@ -10,14 +10,22 @@ mod deport;
use crate::events::handle_events; use crate::events::handle_events;
use client::handle_client; use client::handle_client;
use config::config::STATIC_ADDR as addr;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use utils::utils::get_addr;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let listener = TcpListener::bind(addr).await.unwrap(); let bind_addr = match get_addr() {
Ok(other_addr) => other_addr,
Err(err) => {
eprintln!("Error: {}", err);
std::process::exit(1);
}
};
let listener = TcpListener::bind(bind_addr).await.unwrap();
// 创建事件通道 // 创建事件通道
let (event_sender, event_receiver) = mpsc::unbounded_channel(); let (event_sender, event_receiver) = mpsc::unbounded_channel();
// 启动事件处理任务 // 启动事件处理任务
......
use std::collections::HashMap; use std::collections::HashMap;
use std::env;
use std::net::SocketAddr;
use crate::config::config::STATIC_ADDR;
pub(crate) fn get_connection_params( pub(crate) fn get_connection_params(
connection_url: String, connection_url: String,
...@@ -17,3 +20,27 @@ pub(crate) fn get_connection_params( ...@@ -17,3 +20,27 @@ pub(crate) fn get_connection_params(
Ok(HashMap::new()) Ok(HashMap::new())
} }
} }
// 获取地址的逻辑封装
pub fn get_addr() -> Result<String, Box<dyn std::error::Error>> {
// 1. 处理命令行参数
let mut args = env::args().skip(1);
if let Some(pos) = args.position(|arg| arg == "--addr") {
// 获取下一个参数值
return args.nth(pos)
.ok_or("--addr requires a value".into())
.and_then(|addr| {
addr.parse::<SocketAddr>()?;
Ok(addr)
});
}
// 2. 处理环境变量
if let Ok(env_addr) = env::var("RS_WS_ADDR") {
env_addr.parse::<SocketAddr>()?;
return Ok(env_addr);
}
// 3. 返回默认值
Ok(STATIC_ADDR.to_string())
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment