Commit f1a8eadf by qlintonger xeno

尝试优化+3,Grok3真厉害!

parent 03226c7f
...@@ -7,16 +7,45 @@ use crate::utils::json_utils::get_current_timestamp; ...@@ -7,16 +7,45 @@ use crate::utils::json_utils::get_current_timestamp;
use dashmap::DashMap; use dashmap::DashMap;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::time::Duration; use std::time::Duration;
use tokio::sync::mpsc; use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::sleep; use tokio::time::sleep;
lazy_static! { lazy_static! {
pub static ref refuse_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new(); pub static ref refuse_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new();
pub static ref channel_hangup_procedure_map: DashMap<String, UnboundedSender<()>> = pub static ref channel_hangup_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new();
DashMap::new();
} }
// 用户状态字段的索引
const STATUS_IDX: usize = 0;
const CHANNEL_IDX: usize = 1;
const HOST_IDX: usize = 6;
// 异步更新 Redis 数据
fn update_redis_async(user_id: String, data: String) {
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&user_id, data).await {
println!("更新redis数据失败:{:?} 用户id {}", e, user_id);
} else {
println!("更新redis数据成功");
}
});
}
// 更新用户状态并同步到 Redis 和 ONLINE_USERS
fn update_user_status(user_id: &str, status: &str, channel_id: &str, is_host: bool) -> String {
let mut user_data = ONLINE_USERS
.get(user_id)
.map(|v| v.split(',').map(String::from).collect())
.unwrap_or(vec!["idle".into(), "".into(), "".into(), "".into(), "".into(), "".into(), "0".into()]);
user_data[STATUS_IDX] = status.into();
user_data[CHANNEL_IDX] = channel_id.into();
user_data[HOST_IDX] = if is_host { "1".into() } else { "0".into() };
let joined = user_data.join(",");
ONLINE_USERS.insert(user_id.to_string(), joined.clone());
joined
}
// 发送消息的通用函数
async fn send_inside_message( async fn send_inside_message(
target_sender: &(String, u128), target_sender: &(String, u128),
event_sender: &UnboundedSender<Event>, event_sender: &UnboundedSender<Event>,
...@@ -24,17 +53,43 @@ async fn send_inside_message( ...@@ -24,17 +53,43 @@ async fn send_inside_message(
from_id: &String, from_id: &String,
) { ) {
println!("发送给用户id {} 的消息 {}", from_id, json_message); println!("发送给用户id {} 的消息 {}", from_id, json_message);
if let Err(e) = event_sender.send(Event::SendClientMessage( if let Err(e) = event_sender.send(Event::SendClientMessage(target_sender.clone(), json_message, false)) {
target_sender.clone(), println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
json_message, } else {
false, println!("发送给用户id {} 独立消息成功", from_id);
)) { }
}
// 发送错误消息
async fn send_error_message(
target_sender: &(String, u128),
event_sender: &UnboundedSender<Event>,
from_id: &String,
error_msg: &str,
) {
let json = serde_json::json!({
"msgType": "Error",
"msgData": error_msg,
"fromID": "0",
"fromName": "Server",
"toID": from_id
}).to_string();
if let Err(e) = event_sender.send(Event::SendClientMessage(target_sender.clone(), json, false)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e); println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else { } else {
println!("发送给用户id {} 独立消息成功", from_id); println!("发送给用户id {} 独立消息成功", from_id);
} }
} }
// 获取指定 channel_id 的用户 ID 列表
fn get_users_by_channel(channel_id: &str) -> Vec<String> {
ONLINE_USERS
.iter()
.filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id)
.map(|e| e.key().clone())
.collect()
}
pub async fn handle_agora_call( pub async fn handle_agora_call(
client_message_data: &ClientMessageData, client_message_data: &ClientMessageData,
from_id: &String, from_id: &String,
...@@ -42,1331 +97,484 @@ pub async fn handle_agora_call( ...@@ -42,1331 +97,484 @@ pub async fn handle_agora_call(
connection_time: &u128, connection_time: &u128,
) { ) {
let target_sender_which = (from_id.to_string(), connection_time.clone()); let target_sender_which = (from_id.to_string(), connection_time.clone());
match client_message_data.msg_type.as_str() { match client_message_data.msg_type.as_str() {
"CancelCall" => { "CancelCall" => {
let calling_to_id = &client_message_data.to_id; let calling_to_id = &client_message_data.to_id;
println!( println!("收到客户端取消呼叫 取消呼叫组: {} 呼叫方id {}", calling_to_id, from_id);
"收到客户端取消呼叫 取消呼叫组: {} 呼叫方id {}", send_inside_message(&target_sender_which, event_sender, serde_json::json!({
calling_to_id,
from_id.to_string()
);
let hangup_message_json = serde_json::json!({
"msgType": "CmdCancelCall", "msgType": "CmdCancelCall",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": from_id, "toID": from_id,
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), from_id).await;
"rtcToken": ""
} if let Some(user_status) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
}) let user_data = user_status.split(',').map(String::from).collect::<Vec<_>>();
.to_string(); if user_data[CHANNEL_IDX].is_empty() {
if let Err(e) = event_sender.send(Event::SendClientMessage( let joined = update_user_status(from_id, "idle", "", false);
target_sender_which.clone(), update_redis_async(from_id.clone(), joined);
hangup_message_json, tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
}
let user_status = ONLINE_USERS
.get(from_id)
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(mut the_current_user_data) = user_status {
if the_current_user_data[1].is_empty() {
the_current_user_data[1] = "".to_string();
the_current_user_data[0] = "idle".to_string();
the_current_user_data[6] = "0".to_string();
let data_joined = the_current_user_data.join(",");
ONLINE_USERS.insert(from_id.to_string(), data_joined.clone());
let from_id_clone = from_id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&from_id_clone, data_joined).await {
println!("更新redis数据失败:{:?}", e);
}
});
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
return; return;
} }
let all_to_hangup_id_vec: Vec<String> =
calling_to_id.split(',').map(|s| s.to_string()).collect(); let to_hangup_ids: Vec<String> = calling_to_id.split(',').map(String::from).collect();
for to_hangup_id in all_to_hangup_id_vec { for to_id in to_hangup_ids {
if let Some(ref_procedure_sender) = refuse_procedure_map.get(&to_hangup_id) { if let Some(sender) = refuse_procedure_map.get(&to_id) {
if let Err(e) = ref_procedure_sender.send(()) { sender.send(()).ok();
println!("取消拒绝接听的线程队列失败:{:?}", e); }
} else { if let Some(other_data) = ONLINE_USERS.get(&to_id).map(|v| v.clone()) {
println!("取消拒绝接听的线程队列成功"); let other_data_vec = other_data.split(',').map(String::from).collect::<Vec<_>>();
} if other_data_vec[CHANNEL_IDX] == user_data[CHANNEL_IDX] {
} if let Some(target_sender) = CLIENT_SENDERS.iter().find(|entry| entry.key().0 == *calling_to_id).map(|entry| entry.key().clone()) {
let the_other_caller_data = ONLINE_USERS send_inside_message(&target_sender, event_sender, serde_json::json!({
.get(&to_hangup_id)
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(mut the_other_caller_data) = the_other_caller_data {
if the_other_caller_data[1] == the_current_user_data[1] {
let hangup_message_json = serde_json::json!({
"msgType": "CmdHangup", "msgType": "CmdHangup",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": to_hangup_id, "toID": to_id,
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), &to_id).await;
"rtcToken": "" let joined = update_user_status(&to_id, "idle", "", false);
} update_redis_async(to_id, joined);
})
.to_string();
let target_sender = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == *calling_to_id)
.map(|entry| entry.key().clone());
if let Some(target_sender) = target_sender {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender,
hangup_message_json,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", to_hangup_id, e);
} else {
the_other_caller_data[0] = "idle".to_string();
the_other_caller_data[1] = "".to_string();
the_other_caller_data[6] = "0".to_string();
let all_data_joined = the_other_caller_data.join(",");
println!("step - 0");
ONLINE_USERS.insert(to_hangup_id.clone(), all_data_joined.clone());
println!("step - 1");
tokio::spawn(async move {
if let Err(e) =
update_client_redis_data(&to_hangup_id, all_data_joined)
.await
{
println!("更新redis数据失败:{:?}", e);
}
});
}
} else { } else {
println!("未找到对应的sender"); println!("未找到对应的sender");
} }
} }
} }
} }
println!("step - 1.5"); let joined = update_user_status(from_id, "idle", "", false);
the_current_user_data[1] = "".to_string(); update_redis_async(from_id.clone(), joined);
the_current_user_data[0] = "idle".to_string(); tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
the_current_user_data[6] = "0".to_string();
let the_current_data_joined = the_current_user_data.join(",");
println!("step - 1.7");
ONLINE_USERS.insert(from_id.to_string(), the_current_data_joined.clone());
let from_id_clone = from_id.clone();
println!("step - 2.1");
tokio::spawn(async move {
if let Err(e) =
update_client_redis_data(&from_id_clone, the_current_data_joined).await
{
println!("更新redis数据失败:{:?}", e);
}
});
tokio::spawn(async move {
println!("更新在线用户列表 Cancel-Call");
notify_all_clients_to_update_online_users().await;
});
println!("step - 2.2");
} }
} }
"Call" => { "Call" => {
let calling_to_id = &client_message_data.to_id; let calling_to_id = &client_message_data.to_id;
println!( println!("收到客户端呼叫 呼叫方id {} 呼叫组id {}", from_id, calling_to_id);
"收到客户端呼叫 呼叫方id {} 呼叫组id {}",
from_id, calling_to_id
);
if calling_to_id.is_empty() { if calling_to_id.is_empty() {
let error_json = format!( send_error_message(&target_sender_which, event_sender, from_id, "请指定呼叫对象").await;
r#"{{"msgType": "Error", "fromID": "0", "fromName": "Server", "msgData": "请指定呼叫对象", "toID": "{}"}}"#,
from_id
);
send_inside_message(&target_sender_which, event_sender, error_json, from_id).await;
return; return;
} }
if calling_to_id.eq(from_id) { if calling_to_id == from_id {
let error_self_json = format!( send_error_message(&target_sender_which, event_sender, from_id, "不能给自己打电话").await;
r#"{{"msgType": "Error", "fromID": "0", "fromName": "Server", "msgData": "不能给自己打电话", "toID": "{}"}}"#,
from_id
);
send_inside_message(&target_sender_which, event_sender, error_self_json, from_id)
.await;
return; return;
} }
let user_status = ONLINE_USERS
.get(from_id) if let Some(user_status) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>()); let user_data = user_status.split(',').map(String::from).collect::<Vec<_>>();
if let Some(mut the_user_data) = user_status { let channel_id = if user_data[CHANNEL_IDX].is_empty() {
let channel_id_now = if the_user_data[1].is_empty() {
format!("{}-{}", from_id, get_current_timestamp()) format!("{}-{}", from_id, get_current_timestamp())
} else { } else {
the_user_data[1].to_string() user_data[CHANNEL_IDX].clone()
}; };
let calling_to_id_vec: Vec<&str> = calling_to_id.split(',').collect(); println!("当前用户channelId {} 呼叫方id集合是 {:?}", channel_id, calling_to_id.split(',').collect::<Vec<_>>());
let mut refuse_thread_users: Vec<String> = vec![];
println!( let mut refuse_users = Vec::new();
"当前用户channelId {} 呼叫方id集合是 {:?}", for to_id in calling_to_id.split(',') {
channel_id_now, calling_to_id_vec if !ONLINE_USERS.contains_key(to_id) {
); send_error_message(&target_sender_which, event_sender, from_id, "对方不在线").await;
for calling_to_id in calling_to_id_vec {
if !ONLINE_USERS.contains_key(calling_to_id) {
let json_string = format!(
r#"{{"msgType":"Error","fromID":"0","fromName":"Server","msgData":"对方不在线","toID":"{}"}}"#,
client_message_data.to_id
);
send_inside_message(&target_sender_which, event_sender, json_string, from_id)
.await;
continue; continue;
} }
let user_status = ONLINE_USERS if let Some(to_status) = ONLINE_USERS.get(to_id).map(|v| v.clone()) {
.get(calling_to_id) let to_data = to_status.split(',').map(String::from).collect::<Vec<_>>();
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>()); if to_data[STATUS_IDX] != "idle" {
if let Some(mut the_another_user_data) = user_status { send_error_message(&target_sender_which, event_sender, from_id, "对方正在呼叫中").await;
if the_another_user_data[0] != "idle" {
let json_string = format!(
r#"{{"msgType":"Error","fromID":"0","fromName":"Server","msgData":"对方正在呼叫中","toID":"{}"}}"#,
from_id
);
send_inside_message(&target_sender_which, event_sender, json_string, from_id)
.await;
continue; continue;
} }
let call_message_json = serde_json::json!({ if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == to_id).map(|e| e.key().clone()) {
send_inside_message(&sender, event_sender, serde_json::json!({
"msgType": "Call", "msgType": "Call",
"fromID": from_id, "fromID": from_id,
"fromName": client_message_data.from_name, "fromName": client_message_data.from_name,
"toID": calling_to_id, "toID": to_id,
"msgData": { "msgData": {"channelId": channel_id, "rtcToken": ""}
"channelId": channel_id_now, }).to_string(), &to_id.to_string()).await;
"rtcToken": "" let joined = update_user_status(to_id, "callin", &channel_id, false);
} update_redis_async(to_id.to_string(), joined);
}) refuse_users.push(to_id.to_string());
.to_string();
let target_sender = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == *calling_to_id)
.map(|entry| entry.key().clone());
if let Some(sender) = target_sender {
if let Err(e) = event_sender.send(Event::SendClientMessage(
sender.clone(),
call_message_json.clone(),
false,
)) {
println!("发送给用户id {:?} 独立消息失败:{:?}", sender, e);
continue;
} else {
println!(
"发送给用户id {} 独立消息成功 消息内容 {}",
calling_to_id, call_message_json
);
}
} else { } else {
println!("找不到对应的sender,无法发送客户端消息!"); println!("找不到对应的sender,无法发送客户端消息!");
continue;
} }
println!("发送给用户id {} 独立消息成功", calling_to_id);
the_another_user_data[0] = "callin".to_string();
the_another_user_data[1] = channel_id_now.clone();
the_another_user_data[6] = "0".to_string();
println!("step - 1");
let the_another_user_data_joined = the_another_user_data.join(",");
ONLINE_USERS.insert(calling_to_id.to_string(), the_another_user_data_joined.clone());
println!("step - 2");
let calling_id_c = calling_to_id.to_string();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&calling_id_c,
the_another_user_data_joined,
)
.await
{
println!("更新redis数据失败:{:?} 用户id {}", e, calling_id_c);
} else {
println!("更新redis数据成功");
} }
}); }
println!("step - 3");
refuse_thread_users.push(calling_to_id.to_string()); if !refuse_users.is_empty() {
} let status = if user_data[STATUS_IDX] == "calling" { "calling" } else { "callout" };
} send_inside_message(&target_sender_which, event_sender, serde_json::json!({
println!(
"是否通知其他用户成功? {} 然后发送CmdCall?",
refuse_thread_users.len()
);
if refuse_thread_users.len() > 0 {
println!("发送给用户id {} 需要发送CmdCall", from_id);
if the_user_data[0] == "idle" || the_user_data[0] == "calling" {
let cmd_call_message_json = serde_json::json!({
"msgType": "CmdCall", "msgType": "CmdCall",
"fromID": calling_to_id, "fromID": calling_to_id,
"fromName": client_message_data.from_name, "fromName": client_message_data.from_name,
"toID": from_id, "toID": from_id,
"msgData": { "msgData": {"channelId": channel_id, "rtcToken": ""}
"channelId": channel_id_now, }).to_string(), from_id).await;
"rtcToken": "" let joined = update_user_status(from_id, status, &channel_id, false);
} update_redis_async(from_id.clone(), joined);
}) tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
.to_string();
the_user_data[0] = if the_user_data[0] == "calling" { for user_id in refuse_users {
"calling".to_string()
} else {
"callout".to_string()
};
the_user_data[1] = channel_id_now.clone();
the_user_data[6] = "0".to_string();
let the_user_data_joined = the_user_data.join(",");
println!("step - 4");
ONLINE_USERS.insert(from_id.to_string(), the_user_data_joined.clone());
let from_id_c = from_id.clone();
println!("step - 5");
tokio::spawn(async move {
if let Err(e) =
update_client_redis_data(&from_id_c, the_user_data_joined).await
{
println!("更新redis数据失败:{:?} 用户id {}", e, from_id_c);
} else {
println!("更新redis数据成功");
}
});
println!("step - 6");
send_inside_message(
&target_sender_which,
event_sender,
cmd_call_message_json,
from_id,
)
.await;
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
println!(
"step - 7 开启定时挂断线程refuse {}",
refuse_thread_users.len()
);
for user_id in refuse_thread_users {
let event_sender_clone = event_sender.clone();
let user_id_clone = user_id.clone();
let from_id_clone = from_id.clone();
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>(); let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
refuse_procedure_map.insert(user_id.clone(), cancel_tx); refuse_procedure_map.insert(user_id.clone(), cancel_tx);
println!("step -8 Refuse-Call auto"); let event_sender_clone = event_sender.clone();
let from_id_clone = from_id.clone();
let channel_id_clone = channel_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
tokio::select! { tokio::select! {
_ = sleep(Duration::from_secs(15)) => { _ = sleep(Duration::from_secs(15)) => {
println!("20秒内没有接听,准备挂断通话"); println!("20秒内没有接听,准备挂断通话");
let user_data = ONLINE_USERS.get(&user_id_clone).map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>()); let joined = update_user_status(&user_id, "idle", "", false);
if let Some(mut current_user_data_vec) = user_data { update_redis_async(user_id.clone(), joined);
let current_channel_id = current_user_data_vec[1].clone(); if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
current_user_data_vec[0] = "idle".to_string(); send_inside_message(&sender, &event_sender_clone, serde_json::json!({
current_user_data_vec[1] = "".to_string();
current_user_data_vec[6] = "0".to_string();
println!("step R-H 1");
ONLINE_USERS.insert(user_id_clone.clone(), current_user_data_vec.join(","));
println!("step R-H 2");
let user_id_clone_for = user_id_clone.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&user_id_clone_for,
current_user_data_vec.join(","),
)
.await {
println!("更新redis数据失败:{:?} 用户id {}", e, user_id_clone_for);
} else {
println!("通知挂断refuse线程,更新redis数据成功");
}
});
println!("step R-H 3");
let host_user_sender_which = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == from_id_clone)
.map(|entry| entry.key().clone());
println!("step R-H 4");
if let Some(host_user_sender_which) = host_user_sender_which {
send_inside_message(
&host_user_sender_which,
&event_sender_clone,
serde_json::json!({
"msgType": "CmdRefuse",
"fromID": user_id_clone,
"fromName": "Unknown",
"toID": host_user_sender_which.0,
"msgData": {
"channelId": "",
"rtcToken": ""
}
}).to_string(),
&host_user_sender_which.0,
)
.await;
}
println!("step R-H 5");
let hangup_user_sender_which = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == user_id_clone)
.map(|entry| entry.key().clone());
println!("step R-H 6");
if let Some(hangup_user_sender_which) = hangup_user_sender_which {
send_inside_message(
&hangup_user_sender_which,
&event_sender_clone,
serde_json::json!({
"msgType": "CmdHangup", "msgType": "CmdHangup",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": user_id_clone, "toID": user_id,
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), &user_id).await;
"rtcToken": ""
}
}).to_string(),
&user_id_clone,
)
.await;
}
println!("step R-H 7");
let left_users = ONLINE_USERS
.iter()
.filter(|entry| entry.value().split(',').collect::<Vec<_>>()[1] == current_channel_id)
.map(|entry| (entry.key().clone(), entry.value().clone()))
.collect::<Vec<_>>();
println!("step R-H 8");
if left_users.len() <= 1 {
if let Some((user_id, user_data)) = left_users.get(0) {
let mut user_data_vec = user_data.split(',').map(|s| s.to_string()).collect::<Vec<_>>();
let sender_or_not = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == *user_id)
.map(|entry| entry.key().clone());
println!("step R-H 9");
if let Some(host_user_sender_which) = sender_or_not {
user_data_vec[0] = "idle".to_string();
user_data_vec[1] = "".to_string();
user_data_vec[6] = "0".to_string();
println!("step R-H 10");
ONLINE_USERS.insert(user_id.clone(), user_data_vec.join(","));
println!("step R-H 11");
let user_data_j = user_data_vec.join(",");
let uid = user_id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&uid, user_data_j).await {
println!("只有一人在会议中,更新redis数据失败: {}", e);
} else {
println!("只有一人在会议中,更新redis数据成功");
} }
}); if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == from_id_clone).map(|e| e.key().clone()) {
println!("step R-H 12"); send_inside_message(&sender, &event_sender_clone, serde_json::json!({
send_inside_message( "msgType": "CmdRefuse",
&host_user_sender_which, "fromID": user_id,
&event_sender_clone, "fromName": "Unknown",
serde_json::json!({ "toID": from_id_clone,
"msgData": {"channelId": "", "rtcToken": ""}
}).to_string(), &from_id_clone).await;
}
let left_users = get_users_by_channel(&channel_id_clone);
if left_users.len() <= 1 {
if let Some(user_id) = left_users.get(0) {
if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == *user_id).map(|e| e.key().clone()) {
let joined = update_user_status(user_id, "idle", "", false);
update_redis_async(user_id.clone(), joined);
send_inside_message(&sender, &event_sender_clone, serde_json::json!({
"msgType": "CmdHangup", "msgType": "CmdHangup",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": host_user_sender_which.0, "toID": user_id,
"msgData": {} "msgData": {}
}).to_string(), }).to_string(), user_id).await;
user_id,
)
.await;
}
}
} }
println!("step R-H 13");
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} }
} }
_ = cancel_rx.recv() => { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
println!("用户已接听或者主动挂断或者通话被取消 {}", user_id_clone);
} }
_ = cancel_rx.recv() => println!("用户已接听或挂断 {}", user_id),
} }
}); });
} }
} }
} }
} }
}
"Refuse" => { "Refuse" => {
println!("用户拒接电话 {}", from_id); println!("用户拒接电话 {}", from_id);
if let Some(ref_procedure_sender) = refuse_procedure_map.get(from_id) { if let Some(sender) = refuse_procedure_map.get(from_id) {
if let Err(e) = ref_procedure_sender.send(()) { sender.send(()).ok();
println!("取消拒绝接听的线程队列失败:{:?}", e); }
} else { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
println!("取消拒绝接听的线程队列成功"); let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
} let channel_id = user_data_vec[CHANNEL_IDX].clone();
} let joined = update_user_status(from_id, "idle", "", false);
let user_data = ONLINE_USERS update_redis_async(from_id.clone(), joined);
.get(from_id) send_inside_message(&target_sender_which, event_sender, serde_json::json!({
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
let mut current_channel_id = String::new();
if let Some(mut current_user_data_vec) = user_data {
let hangup_refuse_message = serde_json::json!({
"msgType": "CmdHangup", "msgType": "CmdHangup",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": from_id, "toID": from_id,
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), from_id).await;
"rtcToken": ""
} if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == client_message_data.to_id).map(|e| e.key().clone()) {
}) send_inside_message(&sender, event_sender, serde_json::json!({
.to_string();
println!("step - Refuse - 0");
current_channel_id = current_user_data_vec[1].clone();
current_user_data_vec[0] = "idle".to_string();
current_user_data_vec[1] = "".to_string();
current_user_data_vec[6] = "0".to_string();
println!("step - Refuse - 1");
let current_user_data_joined = current_user_data_vec.join(",");
println!("step - Refuse - 2");
ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone());
let from_id_clone = from_id.clone();
println!("step - Refuse - 3");
tokio::spawn(async move {
if let Err(e) =
update_client_redis_data(&from_id_clone, current_user_data_joined).await
{
println!("更新redis数据失败:{:?} 用户id {}", e, from_id_clone);
} else {
println!("更新redis数据成功");
}
});
println!("step - Refuse - 4");
send_inside_message(
&target_sender_which,
event_sender,
hangup_refuse_message,
from_id,
)
.await;
}
println!("step - Refuse - 5");
let sender_found = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == client_message_data.to_id)
.map(|entry| entry.key().clone());
println!("step - Refuse - 6");
if let Some(target_sender) = sender_found {
let cmd_refuse_message = serde_json::json!({
"msgType": "CmdRefuse", "msgType": "CmdRefuse",
"fromID": from_id, "fromID": from_id,
"fromName": "Server", "fromName": "Server",
"toID": client_message_data.to_id, "toID": client_message_data.to_id,
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), &client_message_data.to_id).await;
"rtcToken": ""
}
})
.to_string();
send_inside_message(
&target_sender,
event_sender,
cmd_refuse_message,
&client_message_data.to_id,
)
.await;
} else { } else {
println!("找不到toID对应的sender"); println!("找不到toID对应的sender");
} }
println!("step - Refuse - 7");
let current_all_chatters = ONLINE_USERS let chatters = get_users_by_channel(&channel_id);
.iter() if chatters.len() == 1 {
.filter(|entry| entry.value().split(',').nth(1).unwrap_or("") == current_channel_id) if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == chatters[0]).map(|e| e.key().clone()) {
.map(|entry| entry.key().clone()) let joined = update_user_status(&chatters[0], "idle", "", false);
.collect::<Vec<_>>(); update_redis_async(chatters[0].clone(), joined);
if current_all_chatters.len() == 1 { send_inside_message(&sender, event_sender, serde_json::json!({
let sender_found = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == current_all_chatters[0])
.map(|entry| entry.key().clone());
if let Some(sender) = sender_found {
println!("step - Refuse - 7.75 - alone");
let cmd_cancel_call_message = serde_json::json!({
"msgType": "CmdCancelCall", "msgType": "CmdCancelCall",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": current_all_chatters[0], "toID": chatters[0],
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), &chatters[0]).await;
"rtcToken": ""
}
})
.to_string();
send_inside_message(
&sender,
event_sender,
cmd_cancel_call_message,
&current_all_chatters[0],
)
.await;
println!("step - Refuse - 7.755 - alone");
let target_id = current_all_chatters[0].clone();
let target_ud = ONLINE_USERS
.get(&target_id)
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(mut target_ud_vec) = target_ud {
target_ud_vec[0] = "idle".to_string();
target_ud_vec[1] = "".to_string();
target_ud_vec[6] = "0".to_string();
let target_ud_joined = target_ud_vec.join(",");
println!("step - Refuse - 7.7599 - alone");
ONLINE_USERS.insert(target_id.clone(), target_ud_joined.clone());
println!("step - Refuse - 7.759999 - alone");
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&target_id, target_ud_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, target_id);
} else {
println!("更新redis数据成功");
} }
});
} }
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
} }
} }
println!("step - Refuse - 7.8");
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
println!("step - Refuse - 8");
}
"EndMeeting" => { "EndMeeting" => {
println!("step - EndMeeting - 1 收到挂断会议通知!"); println!("step - EndMeeting - 1 收到挂断会议通知!");
let user_found = ONLINE_USERS if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
.get(from_id) let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>()); if user_data_vec[HOST_IDX] != "1" {
if let Some(current_user_data_vec) = user_found { send_error_message(&target_sender_which, event_sender, from_id, "只有主持人可以结束会议!").await;
println!("step - EndMeeting - 1.1 判断是否为主持人");
if current_user_data_vec[6] != "1" {
println!("step - EndMeeting - 1.2 不是主持人");
let error_message = serde_json::json!({
"msgType": "Error",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
"msgData": "只有主持人可以结束会议!"
})
.to_string();
send_inside_message(&target_sender_which, event_sender, error_message, from_id)
.await;
println!("step - EndMeeting - 1.3 不是主持人消息发送完毕");
} else { } else {
println!("step - EndMeeting - 1.4 判断是主持人"); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let channel_id = current_user_data_vec[1].clone();
if !channel_id.is_empty() { if !channel_id.is_empty() {
println!("step - EndMeeting - 1.5 判断channel_id"); let users = get_users_by_channel(&channel_id);
let users_to_notify = ONLINE_USERS for user_id in &users {
.iter() let joined = update_user_status(user_id, "idle", "", false);
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] == channel_id) update_redis_async(user_id.clone(), joined);
.map(|entry| entry.key().clone()) if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == *user_id).map(|e| e.key().clone()) {
.collect::<Vec<_>>(); send_inside_message(&sender, event_sender, serde_json::json!({
let clone_for_users_to_notify = users_to_notify.clone();
println!("step - EndMeeting - 1.6 判断channel_id 遍历循环");
for user_id in users_to_notify {
let user_data = ONLINE_USERS
.get(&user_id)
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(mut current_user_data_vec) = user_data {
current_user_data_vec[0] = "idle".to_string();
current_user_data_vec[1] = "".to_string();
current_user_data_vec[6] = "0".to_string();
let current_user_data_joined = current_user_data_vec.join(",");
println!("step - EndMeeting - 1.7 判断channel_id 遍历循环 1");
ONLINE_USERS.insert(user_id.clone(), current_user_data_joined.clone());
println!("step - EndMeeting - 1.8 判断channel_id 遍历循环 2");
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&user_id, current_user_data_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, user_id);
} else {
println!("更新redis数据成功");
}
});
println!("玄幻完毕");
}
}
println!("step - EndMeeting - 1.9 发送消息 遍历循环 3");
for user_id in clone_for_users_to_notify {
let target_sender = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == user_id)
.map(|entry| entry.key().clone());
if let Some(target_sender) = target_sender {
println!("step - EndMeeting 消息发送给 {:?}", &target_sender);
send_inside_message(
&target_sender,
event_sender,
serde_json::json!({
"msgType": "CmdEndMeeting", "msgType": "CmdEndMeeting",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": user_id, "toID": user_id,
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), user_id).await;
"rtcToken": ""
}
})
.to_string(),
&user_id,
)
.await;
} }
} }
tokio::spawn(async move { tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
notify_all_clients_to_update_online_users().await;
});
} }
} }
} }
} }
"Hangup" => { "Hangup" => {
println!("step - hangup -1 接收到hangup消息"); println!("step - hangup -1 接收到hangup消息");
let user_data = ONLINE_USERS if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
.get(from_id) let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>()); let channel_id = user_data_vec[CHANNEL_IDX].clone();
if let Some(mut current_user_data_vec) = user_data { let joined = update_user_status(from_id, "idle", "", false);
let current_chatting_channel_id = current_user_data_vec[1].clone(); update_redis_async(from_id.clone(), joined);
println!("step - hangup -2 修改数据信息"); send_inside_message(&target_sender_which, event_sender, serde_json::json!({
current_user_data_vec[0] = "idle".to_string();
current_user_data_vec[1] = "".to_string();
current_user_data_vec[6] = "0".to_string();
let current_user_data_joined = current_user_data_vec.join(",");
println!("step - hangup -3 获取当前用户数据");
ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone());
println!("step - hangup -4 更新数据");
let from_id_clone = from_id.clone();
tokio::spawn(async move {
if let Err(e) =
update_client_redis_data(&from_id_clone, current_user_data_joined).await
{
println!("更新redis数据失败:{:?} 用户id {}", e, from_id_clone);
} else {
println!("更新redis数据成功");
}
});
send_inside_message(
&target_sender_which,
event_sender,
serde_json::json!({
"msgType": "CmdHangup", "msgType": "CmdHangup",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": from_id, "toID": from_id,
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), from_id).await;
"rtcToken": ""
} let remaining_users = get_users_by_channel(&channel_id);
})
.to_string(),
from_id,
)
.await;
println!("step - hangup -5 发送消息");
println!("step - hangup -6 发送消息 当前用户是主持人,需要更新数据");
let remaining_users = ONLINE_USERS
.iter()
.filter(|entry| entry.value().split(',').collect::<Vec<_>>()[1] == current_chatting_channel_id)
.map(|entry| entry.key().clone())
.collect::<Vec<_>>();
if remaining_users.is_empty() { if remaining_users.is_empty() {
println!("当前频道没有人员,请重新发起通话"); println!("当前频道没有人员,请重新发起通话");
} else { } else {
println!("准备发送CmdLeave消息");
for user_id in &remaining_users { for user_id in &remaining_users {
let sender_found = CLIENT_SENDERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == *user_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, serde_json::json!({
.find(|entry| entry.key().0 == *user_id)
.map(|entry| entry.key().clone());
if let Some(target_sender) = sender_found {
let cmd_leave_message = serde_json::json!({
"msgType": "CmdLeave", "msgType": "CmdLeave",
"fromID": from_id, "fromID": from_id,
"fromName": "Unknown", "fromName": "Unknown",
"toID": user_id, "toID": user_id,
"msgData": {} "msgData": {}
}) }).to_string(), user_id).await;
.to_string(); }
send_inside_message( }
&target_sender,
event_sender,
cmd_leave_message,
user_id,
)
.await;
}
}
println!("判断是否仅为1人,需要自动挂断");
if remaining_users.len() == 1 { if remaining_users.len() == 1 {
let channel_id_clone = current_chatting_channel_id.clone(); let user_id = remaining_users[0].clone();
let ev_clone = event_sender.clone();
let remaining_user_id = remaining_users[0].clone();
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>(); let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
channel_hangup_procedure_map.insert(current_chatting_channel_id.clone(), cancel_tx); channel_hangup_procedure_map.insert(channel_id.clone(), cancel_tx);
let event_sender_clone = event_sender.clone();
let channel_id_clone = channel_id.clone();
tokio::spawn(async move { tokio::spawn(async move {
tokio::select! { tokio::select! {
_ = sleep(Duration::from_secs(15)) => { _ = sleep(Duration::from_secs(15)) => {
println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {:?}", remaining_user_id); println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {}", user_id);
let remain_user_data = ONLINE_USERS.get(&remaining_user_id).map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>()); if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
if let Some(mut user_info_data_for_remained) = remain_user_data { let joined = update_user_status(&user_id, "idle", "", false);
let sender_d = CLIENT_SENDERS update_redis_async(user_id.clone(), joined);
.iter() send_inside_message(&sender, &event_sender_clone, serde_json::json!({
.find(|entry| entry.key().0 == remaining_user_id)
.map(|entry| entry.key().clone());
if let Some(target_sender_which) = sender_d {
user_info_data_for_remained[0] = "idle".to_string();
user_info_data_for_remained[1] = "".to_string();
user_info_data_for_remained[6] = "0".to_string();
let user_info_data_for_remained_joined = user_info_data_for_remained.join(",");
println!("准备更新剩余一人数据!");
ONLINE_USERS.insert(remaining_user_id.clone(), user_info_data_for_remained_joined.clone());
println!("更新剩余一人数据成功!");
let clone_for_redis = remaining_user_id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&clone_for_redis, user_info_data_for_remained_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, clone_for_redis);
} else {
println!("更新redis数据成功");
}
});
send_inside_message(
&target_sender_which,
&ev_clone,
serde_json::json!({
"msgType": "CmdHangup", "msgType": "CmdHangup",
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": remaining_user_id, "toID": user_id,
"msgData": { "msgData": {"channelId": "", "rtcToken": ""}
"channelId": "", }).to_string(), &user_id).await;
"rtcToken": "" tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
}
}).to_string(),
&remaining_user_id,
)
.await;
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
}
}
}
_ = cancel_rx.recv() => {
println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone);
} }
} }
}); _ = cancel_rx.recv() => println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone),
} else {
let (allowed_users, disallowed_users): (Vec<String>, Vec<String>) =
remaining_users.into_iter().partition(|user_id| HOST_ENABLED_ID_SET.contains(user_id.as_str()));
let allowed_users_clone = allowed_users.clone();
if !allowed_users_clone.is_empty() {
if let Some(allowed_user_id) = allowed_users_clone.get(0) {
let allowed_user_data = ONLINE_USERS.get(allowed_user_id).map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(mut user_info_data_for_allowed) = allowed_user_data {
user_info_data_for_allowed[6] = "1".to_string();
let user_info_data_for_allowed_joined = user_info_data_for_allowed.join(",");
ONLINE_USERS.insert(allowed_user_id.clone(), user_info_data_for_allowed_joined.clone());
let allowed_user_id_clone = allowed_user_id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&allowed_user_id_clone, user_info_data_for_allowed_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, allowed_user_id_clone);
} else {
println!("更新redis数据成功");
} }
}); });
}
}
} else if !disallowed_users.is_empty() {
if let Some(disallowed_user_id) = disallowed_users.get(0) {
let disallowed_user_data = ONLINE_USERS.get(disallowed_user_id).map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(mut current_host_data) = disallowed_user_data {
current_host_data[6] = "1".to_string();
let current_host_data_joined = current_host_data.join(",");
ONLINE_USERS.insert(disallowed_user_id.clone(), current_host_data_joined.clone());
let disallowed_user_id_clone = disallowed_user_id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&disallowed_user_id_clone, current_host_data_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, disallowed_user_id_clone);
} else { } else {
println!("更新redis数据成功"); let (allowed, disallowed): (Vec<_>, Vec<_>) = remaining_users.into_iter().partition(|id| HOST_ENABLED_ID_SET.contains(id.as_str()));
} let new_host = allowed.into_iter().next().or(disallowed.into_iter().next());
}); if let Some(host_id) = new_host {
} let joined = update_user_status(&host_id, &ONLINE_USERS.get(&host_id).map(|v| v.split(',').next().unwrap_or("idle").to_string()).unwrap_or("idle".into()), &channel_id, true);
update_redis_async(host_id, joined);
} }
} }
} }
} tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} }
} }
"Connect" => { "Connect" => {
println!( println!("收到客户端Connect消息连接 {} 频道信息 {:?}", from_id, client_message_data);
"收到客户端Connect消息连接 {} 频道信息 {:?}", let to_id = &client_message_data.to_id;
from_id, client_message_data if let Some(to_data) = ONLINE_USERS.get(to_id).map(|v| v.clone()) {
); let data_split = to_data.split(',').map(String::from).collect::<Vec<_>>();
let to_id_user = &client_message_data.to_id; let channel_id = data_split[CHANNEL_IDX].clone();
println!("Step Connect-1"); if channel_id.is_empty() {
let to_id_user_data = ONLINE_USERS send_error_message(&target_sender_which, event_sender, from_id, "对方数据出现异常,缺少channelID").await;
.get(to_id_user) return;
.map(|v| v.split(',').map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(data_split) = to_id_user_data {
println!("Step Connect-1.1");
let target_channel_id = data_split[1].clone();
if !target_channel_id.is_empty() {
println!("Step Connect-1.2");
if let Some(hangup_personnel) = refuse_procedure_map.get(from_id) {
if let Err(e) = hangup_personnel.send(()) {
println!("挂断personnel的任务终结失败 {:?}", e);
} else {
println!("挂断personnel的任务已经发送成功");
}
}
println!("Step Connect-1.3");
if let Some(hangup_channel_task) = channel_hangup_procedure_map.get(&target_channel_id) {
if let Err(e) = hangup_channel_task.send(()) {
println!("挂断channel的任务终结失败 {:?}", e);
} else {
println!("挂断channel的任务已经发送成功");
}
}
println!("Step Connect-1.4");
let already_had_host = ONLINE_USERS
.iter()
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] == target_channel_id)
.any(|entry| entry.value().split(",").collect::<Vec<_>>()[6] == "1");
println!("Step Connect-1.5");
if already_had_host {
println!("Step Connect-1.6");
println!("当前channel已经有主持人,直接返回 {}", target_channel_id);
} else {
println!("当前channel没有主持人,开始处理 {}", target_channel_id);
println!("Step Connect-1.8");
let last_remaining_id_set = target_channel_id.split("_").map(|s| s.to_string()).collect::<Vec<_>>();
if let Some(channel_id_first_element) = last_remaining_id_set.get(0) {
println!("Step Connect-2");
if HOST_ENABLED_ID_SET.contains(channel_id_first_element.as_str()) {
let host_data = ONLINE_USERS.get(channel_id_first_element).map(|v| v.split(",").map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(mut host_data_vec) = host_data {
host_data_vec[6] = "1".to_string();
let host_data_joined = host_data_vec.join(",");
println!("Step Connect-3");
println!("Step Connect-4");
ONLINE_USERS.insert(channel_id_first_element.clone(), host_data_joined.clone());
println!("Step Connect-5");
let the_first_for = channel_id_first_element.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&the_first_for, host_data_joined).await {
println!("更新redis数据失败:{:?} 用户id {} 成为主持人", e, the_first_for);
} else {
println!("更新redis数据成功");
}
});
println!("Step Connect-6");
}
} else {
println!("Step Connect-7");
let all_in_group_chatters_id = ONLINE_USERS
.iter()
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] == target_channel_id)
.map(|entry| entry.key().clone())
.collect::<Vec<_>>();
let first_host_id = all_in_group_chatters_id
.iter()
.find(|id| HOST_ENABLED_ID_SET.contains(id.as_str()));
let mut id = String::new();
if let Some(target_id) = first_host_id {
id = target_id.clone();
} else if let Some(first_element) = all_in_group_chatters_id.first() {
id = first_element.clone();
}
println!("Step Connect-7.5 找到具备成为主持人的host-id");
let host_data = ONLINE_USERS.get(&id).map(|v| v.split(",").map(|s| s.to_string()).collect::<Vec<_>>());
if let Some(mut host_data_vec) = host_data {
host_data_vec[6] = "1".to_string();
println!("Step Connect-8");
let host_data_joined = host_data_vec.join(",");
println!("Step Connect-9");
ONLINE_USERS.insert(id.clone(), host_data_joined.clone());
println!("Step Connect-9.1");
let host_id_clone = id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&host_id_clone, host_data_joined).await {
println!("更新redis数据失败:{:?} 用户id {} 成为主持人", e, host_id_clone);
} else {
println!("更新redis数据成功");
}
});
println!("Step Connect-10");
}
}
} }
if let Some(sender) = refuse_procedure_map.get(from_id) { sender.send(()).ok(); }
if let Some(sender) = channel_hangup_procedure_map.get(&channel_id) { sender.send(()).ok(); }
if !ONLINE_USERS.iter().any(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id && e.value().split(',').nth(HOST_IDX).unwrap_or("") == "1") {
let initiator = channel_id.split('_').next().unwrap_or("");
let host_id = if HOST_ENABLED_ID_SET.contains(initiator) {
initiator.to_string()
} else {
ONLINE_USERS.iter()
.filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id)
.map(|e| e.key().clone())
.find(|id| HOST_ENABLED_ID_SET.contains(id.as_str()))
.unwrap_or_else(|| ONLINE_USERS.iter().find(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id).map(|e| e.key().clone()).unwrap_or_default())
};
let joined = update_user_status(&host_id, &ONLINE_USERS.get(&host_id).map(|v| v.split(',').next().unwrap_or("idle").to_string()).unwrap_or("idle".into()), &channel_id, true);
update_redis_async(host_id, joined);
} }
println!("Step Connect-11");
let all_not_calling_users = ONLINE_USERS let users = ONLINE_USERS.iter()
.iter() .filter(|e| e.value().split(',').nth(CHANNEL_IDX).unwrap_or("") == channel_id && e.value().split(',').nth(STATUS_IDX).unwrap_or("") != "calling")
.filter(|entry| { .map(|e| e.key().clone())
let entry_split = entry.value().split(",").collect::<Vec<_>>();
entry_split[1] == target_channel_id && entry_split[0] != "calling"
})
.map(|entry| entry.key().clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
println!( for user_id in users {
"Step Connect-11.5 当前所有channel用户 {:?}和channelID {}", if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
all_not_calling_users, target_channel_id send_inside_message(&sender, event_sender, serde_json::json!({
);
for user_id in all_not_calling_users {
let target_sender_which = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == user_id)
.map(|entry| entry.key().clone());
if let Some(target_sender_which) = target_sender_which {
let json_str = serde_json::json!({
"msgType": "CmdConnect", "msgType": "CmdConnect",
"msgData": { "msgData": {"channelID": channel_id, "rtcToken": ""},
"channelID": target_channel_id,
"rtcToken": ""
},
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": user_id "toID": user_id
}) }).to_string(), &user_id).await;
.to_string();
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
json_str,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", user_id, e);
} else {
println!("发送给用户id {} 独立消息成功 CmdConnect", user_id);
}
}
}
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
println!("Step Connect-11");
} else {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": "Error",
"msgData": "对方数据出现异常,缺少channelID",
"fromID": "0",
"fromName": "Server",
"toID": from_id
})
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} }
} }
tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
} else { } else {
if let Err(e) = event_sender.send(Event::SendClientMessage( send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id").await;
target_sender_which.clone(),
serde_json::json!({
"msgType": "Error",
"msgData": "必须传递to_id",
"fromID": "0",
"fromName": "Server",
"toID": from_id
})
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
}
} }
} }
"KickOut" => { "KickOut" => {
println!("step - KickOut - 1 收到踢出通知"); println!("step - KickOut - 1 收到踢出通知");
let user_found = ONLINE_USERS if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
.get(from_id) let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
.map(|v| v.split(",").map(|s| s.to_string()).collect::<Vec<_>>()); if user_data_vec[HOST_IDX] != "1" {
if let Some(user_split) = user_found { send_error_message(&target_sender_which, event_sender, from_id, "只有主持人才能踢出用户").await;
if user_split[6] == "1" { } else {
let current_cid = user_split[1].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
println!("step - KickOut - 2 释放资源"); let users = get_users_by_channel(&channel_id);
let current_chatters = ONLINE_USERS for user_id in users {
.iter() if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] == current_cid)
.map(|entry| entry.key().clone())
.collect::<Vec<_>>();
println!("step - KickOut - 3 获取同channelId的人");
for user_id in current_chatters {
let target_sender_which = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == user_id)
.map(|entry| entry.key().clone());
if let Some(target_sender_which) = target_sender_which {
if user_id == client_message_data.to_id { if user_id == client_message_data.to_id {
println!("step - KickOut 4 --踢出对应消息的人"); let joined = update_user_status(&user_id, "idle", "", false);
let kicked_user_data = ONLINE_USERS.get(&user_id).map(|v| v.split(",").map(|s| s.to_string()).collect::<Vec<_>>()); update_redis_async(user_id.clone(), joined);
if let Some(mut kicked_user_data) = kicked_user_data { send_inside_message(&sender, event_sender, serde_json::json!({
kicked_user_data[0] = "idle".to_string();
kicked_user_data[1] = "".to_string();
kicked_user_data[6] = "0".to_string();
let data_joined = kicked_user_data.join(",");
println!("准备修改资源");
ONLINE_USERS.insert(user_id.clone(), data_joined.clone());
println!("修改资源成功!");
let cloned_user_id = user_id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&cloned_user_id, data_joined).await {
println!("修改redis数据失败:{:?}", e);
} else {
println!("修改redis数据成功!");
}
});
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": "CmdHangup", "msgType": "CmdHangup",
"msgData": { "msgData": {"channelID": channel_id, "rtcToken": ""},
"channelID": current_cid,
"rtcToken": ""
},
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": user_id "toID": user_id
}) }).to_string(), &user_id).await;
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", user_id, e);
} else { } else {
println!("发送给用户id {} 独立消息成功 CmdHangup", user_id); send_inside_message(&sender, event_sender, serde_json::json!({
}
}
} else {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": "CmdKickOut", "msgType": "CmdKickOut",
"msgData": { "msgData": {"channelID": channel_id, "rtcToken": ""},
"channelID": current_cid,
"rtcToken": ""
},
"toID": client_message_data.to_id, "toID": client_message_data.to_id,
"fromName": "Server", "fromName": "Server",
"fromID": "0" "fromID": "0"
}) }).to_string(), &user_id).await;
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", user_id, e);
} else {
println!("发送给用户id {} 独立消息成功 CmdHangup", user_id);
} }
} }
} }
} tokio::spawn(async move { notify_all_clients_to_update_online_users().await; });
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
} else {
println!("step - KickOut - 2 不是主持人");
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": "Error",
"msgData": "只有主持人才能踢出用户",
"fromID": "0",
"fromName": "Server",
"toID": from_id
})
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else {
println!("发送给用户id {} 独立消息成功 Error", from_id);
}
} }
} }
} }
"Mute" => { "Mute" => {
println!("step - Mute - 1 收到静音通知"); println!("step - Mute - 1 收到静音通知");
let to_id = client_message_data.to_id.clone(); let to_id = &client_message_data.to_id;
if to_id.is_empty() { if to_id.is_empty() {
if let Err(e) = event_sender.send(Event::SendClientMessage( send_error_message(&target_sender_which, event_sender, from_id, "必须传递to_id").await;
target_sender_which.clone(), return;
serde_json::json!({
"msgType": "Error",
"msgData": "必须传递to_id",
"fromID": "0",
"fromName": "Server",
"toID": from_id
})
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else {
println!("发送给用户id {} 独立消息成功", from_id);
} }
} else { if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
let user_found = ONLINE_USERS let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
.get(from_id) if user_data_vec[HOST_IDX] != "1" {
.map(|v| v.split(",").map(|s| s.to_string()).collect::<Vec<_>>()); send_error_message(&target_sender_which, event_sender, from_id, "您不是主持人,无法静音他人").await;
if let Some(user_data_now) = user_found { return;
if user_data_now[6] == "1" { }
let channel_id = user_data_now[1].clone(); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let all_chatters_id = ONLINE_USERS let users = get_users_by_channel(&channel_id);
.iter()
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] == channel_id)
.map(|entry| entry.key().clone())
.collect::<Vec<_>>();
let json_str = serde_json::json!({ let json_str = serde_json::json!({
"msgType": "CmdMute", "msgType": "CmdMute",
"msgData": {}, "msgData": {},
"fromID": to_id, "fromID": to_id,
"fromName": "Server", "fromName": "Server",
"toID": to_id "toID": to_id
}) }).to_string();
.to_string(); for user_id in users {
for user_id in all_chatters_id { if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
let target_sender_which = CLIENT_SENDERS send_inside_message(&sender, event_sender, json_str.clone(), &user_id).await;
.iter()
.find(|entry| entry.key().0 == user_id)
.map(|entry| entry.key().clone());
if let Some(target_sender_which) = target_sender_which {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
json_str.clone(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", user_id, e);
} else {
println!("发送给用户id {} 独立消息成功", user_id);
}
}
}
} else {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": "Error",
"msgData": "您不是主持人,无法静音他人",
"fromID": "0",
"fromName": "Server",
"toID": from_id
})
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else {
println!("发送给用户id {} 独立消息成功", from_id);
}
} }
} }
} }
} }
"MuteAll" => { "MuteAll" => {
println!("step - MuteAll - 1 收到静音全部通知"); println!("step - MuteAll - 1 收到静音全部通知");
let user_found = ONLINE_USERS if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
.get(from_id) let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
.map(|v| v.split(",").map(|s| s.to_string()).collect::<Vec<_>>()); if user_data_vec[HOST_IDX] != "1" {
if let Some(current_user_data) = user_found { send_error_message(&target_sender_which, event_sender, from_id, "只有主持人才有操作权限").await;
println!("step - MuteAll -2 拆解数据"); return;
if current_user_data[6] == "1" { }
println!("step - MuteAll -3 静音全部可以,具备权限"); let channel_id = user_data_vec[CHANNEL_IDX].clone();
let current_channel_id = current_user_data[1].clone(); let users = get_users_by_channel(&channel_id);
let all_users_in_channel = ONLINE_USERS for user_id in users {
.iter() if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] == current_channel_id) send_inside_message(&sender, event_sender, serde_json::json!({
.map(|entry| entry.key().clone())
.collect::<Vec<_>>();
println!("step - MuteAll -4 筛选出所有channelId下的所有用户");
for user_id in all_users_in_channel {
let target_sender_which = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == user_id)
.map(|entry| entry.key().clone());
if let Some(target_sender_which) = target_sender_which {
let json_str = serde_json::json!({
"msgType": "CmdMuteAll", "msgType": "CmdMuteAll",
"msgData": { "msgData": {"channelID": channel_id, "rtcToken": ""},
"channelID": current_channel_id,
"rtcToken": ""
},
"fromID": "0", "fromID": "0",
"fromName": "Server", "fromName": "Server",
"toID": user_id "toID": user_id
}) }).to_string(), &user_id).await;
.to_string();
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
json_str,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", user_id, e);
} else {
println!("发送给用户id {} 独立消息成功 CmdMuteAll", user_id);
}
}
}
} else {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": "Error",
"msgData": "只有主持人才有操作权限",
"fromID": "0",
"fromName": "Server",
"toID": from_id
})
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else {
println!("发送给用户id {} 独立消息成功", from_id);
} }
} }
} }
} }
"MuteSelf" | "UnMuteSelf" => { "MuteSelf" | "UnMuteSelf" => {
println!("step - MuteSelf-UN - 1 收到开关静音自己通知"); println!("step - MuteSelf-UN - 1 收到开关静音自己通知");
let user_found = ONLINE_USERS if let Some(user_data) = ONLINE_USERS.get(from_id).map(|v| v.clone()) {
.get(from_id) let user_data_vec = user_data.split(',').map(String::from).collect::<Vec<_>>();
.map(|v| v.split(",").map(|s| s.to_string()).collect::<Vec<_>>()); let channel_id = user_data_vec[CHANNEL_IDX].clone();
if let Some(data_split) = user_found { let users = get_users_by_channel(&channel_id);
let channel_id = data_split[1].clone(); for user_id in users {
let chatting_id_set = ONLINE_USERS if let Some(sender) = CLIENT_SENDERS.iter().find(|e| e.key().0 == user_id).map(|e| e.key().clone()) {
.iter() send_inside_message(&sender, event_sender, serde_json::json!({
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] == channel_id)
.map(|entry| entry.key().clone())
.collect::<Vec<_>>();
println!("step -2 -Mute-orNot 准备广播消息");
for chatting_id in chatting_id_set {
let target_sender_which = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == chatting_id)
.map(|entry| entry.key().clone());
if let Some(target_sender_which) = target_sender_which {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
serde_json::json!({
"msgType": client_message_data.msg_type.as_str(), "msgType": client_message_data.msg_type.as_str(),
"msgData": {}, "msgData": {},
"fromID": from_id, "fromID": from_id,
"fromName": "Unknown", "fromName": "Unknown",
"toID": chatting_id "toID": user_id
}) }).to_string(), &user_id).await;
.to_string(),
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", chatting_id, e);
}
} }
} }
} }
} }
_ => {} _ => {}
} }
} }
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment