Commit 19bee953 by qlintonger xeno

基本消息类型完成,准备测试一下+4.5

parent 6ebcc949
......@@ -15,6 +15,7 @@ use tokio::time;
use tungstenite::{Error, Message};
lazy_static! {
#[derive(Debug)]
pub static ref ONLINE_USERS: DashMap<String, String> = DashMap::new();
}
......@@ -114,7 +115,7 @@ pub(crate) async fn handle_client(
println!("收到客户端心跳消息 {:?}", &data);
last_heartbeat_time = Instant::now();
if let Ok(json_str) = heart_resp(&from_id) {
if let Err(e) = event_sender.clone().send(Event::SendClientMessage((from_id.clone(), connection_time), json_str, false)){
if let Err(e) = event_sender.send(Event::SendClientMessage((from_id.clone(), connection_time), json_str, false)){
println!("处理心跳消息出错了:{:?}", e)
} else {
println!("处理心跳消息成功")
......@@ -123,11 +124,8 @@ pub(crate) async fn handle_client(
},
_ => {
let from_id_clone = from_id.clone();
let event_sender_clone = event_sender.clone();
let connection_time_clone = connection_time.clone();
tokio::spawn(async move {
handle_ws_msg(&data, from_id_clone, event_sender_clone, connection_time_clone).await;
});
handle_ws_msg(&data, from_id_clone, &event_sender, connection_time_clone).await;
}
}
}
......
......@@ -7,7 +7,7 @@ use tokio::sync::mpsc::UnboundedSender;
pub async fn handle_ws_msg(
client_message_data: &ClientMessageData,
from_id: String,
event_sender: UnboundedSender<Event>,
event_sender: &UnboundedSender<Event>,
connection_time: u128,
) {
let msg_type = client_message_data.msg_type.clone();
......
......@@ -22,6 +22,7 @@ pub enum ClientMessage {
}
lazy_static! {
#[derive(Debug)]
pub static ref CLIENT_SENDERS: DashMap<(String, u128), mpsc::UnboundedSender<ClientMessage>> =
DashMap::new();
}
......
......@@ -71,31 +71,38 @@ pub async fn handle_agora_call(
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
}
let user_status = { ONLINE_USERS.get(from_id) };
// 对同一个channelID的组,发送挂断消息
if let Some(user_status) = ONLINE_USERS.get(from_id) {
let mut the_current_user_data: Vec<&str> = user_status.split(',').collect();
if let Some(user_status) = user_status {
// 将获取的字符串用逗号拆分,并存储为 Vec<String>
let mut the_current_user_data: Vec<String> = user_status.split(',').map(|s| s.to_string()).collect();
// 如果当前用户channelID为空,则中断后续操作,并且直接重新赋值即可
if the_current_user_data[1].is_empty() {
the_current_user_data[1] = "";
the_current_user_data[0] = "idle";
the_current_user_data[6] = "0";
the_current_user_data[1] = "".to_string();
the_current_user_data[0] = "idle".to_string();
the_current_user_data[6] = "0".to_string();
ONLINE_USERS.insert(
from_id.to_string(),
the_current_user_data.join(",").to_string(),
the_current_user_data.join(","),
);
if let Err(e) = update_client_redis_data(
from_id,
the_current_user_data.join(",").to_string(),
)
.await
{
println!("更新redis数据失败:{:?}", e);
};
notify_all_clients_to_update_online_users().await;
let from_id_clone = from_id.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&from_id_clone,
the_current_user_data.join(","),
)
.await
{
println!("更新redis数据失败:{:?}", e);
};
});
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
return;
}
// 拆分toId为其他用户id,并且获取其数据信息
let all_to_hangup_id_vec: Vec<&str> = calling_to_id.split(',').collect();
let all_to_hangup_id_vec: Vec<String> = calling_to_id.split(',').map(|s| s.to_string()).collect();
for to_hangup_id in all_to_hangup_id_vec {
// 取消拒绝接听的线程队列
if let Some(ref_procedure_sender) =
......@@ -107,10 +114,10 @@ pub async fn handle_agora_call(
println!("取消拒绝接听的线程队列成功");
}
}
let the_other_caller = { ONLINE_USERS.get(&to_hangup_id) };
// 获取当前用户状态信息成功后,再进行额外操作
if let Some(the_other_caller_data) = ONLINE_USERS.get(to_hangup_id) {
let mut the_other_caller_data: Vec<&str> =
the_other_caller_data.split(',').collect();
if let Some(the_other_caller_data) = the_other_caller {
let mut the_other_caller_data: Vec<String> = the_other_caller_data.split(',').map(|s| s.to_string()).collect();
// 如果处于同一频道,则发送挂断信息,
if the_other_caller_data[1] == the_current_user_data[1] {
let hangup_message_json = serde_json::json!({
......@@ -125,10 +132,11 @@ pub async fn handle_agora_call(
})
.to_string();
// 找到对应sender,发送CmdHangup消息
let target_sender = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == to_hangup_id) // 使用 entry.key() 访问键
.map(|entry| entry.key().clone());
let target_sender = {
let mut iter = CLIENT_SENDERS.iter();
iter.find(|entry| entry.key().0 == *calling_to_id)
.map(|entry| entry.key().clone())
};
if let Some(target_sender) = target_sender {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender,
......@@ -137,21 +145,23 @@ pub async fn handle_agora_call(
)) {
println!("发送给用户id {} 独立消息失败:{:?}", to_hangup_id, e);
// 修改对应sender的数据
the_other_caller_data[0] = "idle";
the_other_caller_data[1] = "";
the_other_caller_data[6] = "0";
the_other_caller_data[0] = "idle".to_string();
the_other_caller_data[1] = "".to_string();
the_other_caller_data[6] = "0".to_string();
ONLINE_USERS.insert(
to_hangup_id.to_string(),
the_other_caller_data.join(",").to_string(),
);
if let Err(e) = update_client_redis_data(
to_hangup_id,
the_other_caller_data.join(",").to_string(),
)
.await
{
println!("更新redis数据失败:{:?}", e);
};
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&to_hangup_id,
the_other_caller_data.join(",").to_string(),
)
.await
{
println!("更新redis数据失败:{:?}", e);
};
});
}
} else {
println!("未找到对应的sender");
......@@ -160,20 +170,27 @@ pub async fn handle_agora_call(
}
}
// 要求所有在线用户更新数据
the_current_user_data[1] = "";
the_current_user_data[0] = "idle";
the_current_user_data[6] = "0";
the_current_user_data[1] = "".to_string();
the_current_user_data[0] = "idle".to_string();
the_current_user_data[6] = "0".to_string();
ONLINE_USERS.insert(
from_id.to_string(),
the_current_user_data.join(",").to_string(),
);
if let Err(e) =
update_client_redis_data(from_id, the_current_user_data.join(",").to_string())
.await
{
println!("更新redis数据失败:{:?}", e);
};
notify_all_clients_to_update_online_users().await;
let from_id_clone = from_id.to_string();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&from_id_clone,
the_current_user_data.join(",").to_string(),
)
.await
{
println!("更新redis数据失败:{:?}", e);
};
});
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
});
}
}
// 对于通话类消息
......@@ -208,8 +225,9 @@ pub async fn handle_agora_call(
.await;
return;
}
let user_status = { ONLINE_USERS.get(from_id) };
// 获取当前用户状态信息成功后,再进行额外操作
if let Some(user_status) = ONLINE_USERS.get(from_id) {
if let Some(user_status) = user_status {
// 将获取的字符串用逗号拆分
let mut the_user_data: Vec<&str> = user_status.split(',').collect();
// 获取当前用户channelId
......@@ -222,7 +240,10 @@ pub async fn handle_agora_call(
let calling_to_id_vec: Vec<&str> = calling_to_id.split(',').collect();
// 储存需要计入CmdRefuse线程用户
let mut refuse_thread_users: Vec<String> = vec![];
println!("当前用户channelId {} 呼叫方id集合是 {:?}", channel_id_now, calling_to_id_vec);
println!(
"当前用户channelId {} 呼叫方id集合是 {:?}",
channel_id_now, calling_to_id_vec
);
for calling_to_id in calling_to_id_vec {
// 1.检查目标用户是否在线
if !ONLINE_USERS.contains_key(calling_to_id) {
......@@ -239,8 +260,9 @@ pub async fn handle_agora_call(
.await;
continue;
}
let user_status = { ONLINE_USERS.get(calling_to_id) };
// 2.检查目标用户是否为空闲状态
if let Some(user_status) = ONLINE_USERS.get(calling_to_id) {
if let Some(user_status) = user_status {
// 将获取的字符串用逗号拆分
let mut the_another_user_data: Vec<&str> = user_status.split(',').collect();
// 如果不为idle,不允许后续操作
......@@ -271,10 +293,12 @@ pub async fn handle_agora_call(
})
.to_string();
// 通知被呼叫方的sender发送数据
let target_sender = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == calling_to_id) // 使用 entry.key() 访问键
.map(|entry| entry.key().clone()); // 使用 entry.value() 访问值
let target_sender = {
CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == calling_to_id) // 使用 entry.key() 访问键
.map(|entry| entry.key().clone())
}; // 使用 entry.value() 访问值
if let Some(sender) = target_sender {
if let Err(e) = event_sender.send(Event::SendClientMessage(
sender.clone(),
......@@ -284,39 +308,53 @@ pub async fn handle_agora_call(
println!("发送给用户id {:?} 独立消息失败:{:?}", sender, e);
continue;
} else {
println!("发送给用户id {} 独立消息成功 消息内容 {}", calling_to_id, call_message_json);
println!(
"发送给用户id {} 独立消息成功 消息内容 {}",
calling_to_id, call_message_json
);
}
} else {
println!("找不到对应的sender,无法发送客户端消息!");
continue;
}
println!("发送给用户id {} 独立消息成功", from_id);
println!("发送给用户id {} 独立消息成功", calling_to_id);
// 修改对应用户的状态数据
the_another_user_data[0] = "callin";
the_another_user_data[1] = channel_id_now.as_str();
the_another_user_data[6] = "0";
println!("step - 1");
let the_another_user_data_joined = the_another_user_data.join(",");
// 更新状态数据
ONLINE_USERS.insert(
calling_to_id.to_string(),
the_another_user_data_joined.clone(),
);
// 修改redis数据
if let Err(e) = update_client_redis_data(
calling_to_id,
the_another_user_data_joined.clone(),
)
.await
{
println!("更新redis数据失败:{:?} 用户id {}", e, calling_to_id);
} else {
println!("更新redis数据成功");
}
println!("step - 2");
let calling_id_c = calling_to_id.to_string();
tokio::spawn(async move {
// 修改redis数据
if let Err(e) = update_client_redis_data(
&calling_id_c,
the_another_user_data_joined.clone(),
)
.await
{
println!("更新redis数据失败:{:?} 用户id {}", e, calling_id_c);
} else {
println!("更新redis数据成功");
}
});
println!("step - 3");
refuse_thread_users.push(calling_to_id.to_string());
}
}
println!(
"是否通知其他用户成功? {} 然后发送CmdCall?",
refuse_thread_users.len()
);
// 如果成功发送了呼叫信息,则根据当前用户状态发送消息通知
if refuse_thread_users.len() > 0 {
println!("发送给用户id {} 需要发送CmdCall", from_id);
// 如果当前用户本身为idle,则发送CmdCall
if the_user_data[0] == "idle" || the_user_data[0] == "calling" {
let cmd_call_message_json = serde_json::json!({
......@@ -352,45 +390,53 @@ pub async fn handle_agora_call(
&from_id,
)
.await;
// 最后广播用户更新
notify_all_clients_to_update_online_users().await;
tokio::spawn(async move {
// 最后广播用户更新
notify_all_clients_to_update_online_users().await;
});
// 开启定时器线程任务,如果所有用户20s内没有接听,则挂断,发送所有CmdRefuse数据并重置
for user_id in refuse_thread_users {
if let Some(current_user_status_data) = ONLINE_USERS.get(&user_id) {
let current_user_status_data = { ONLINE_USERS.get(&user_id) };
if let Some(current_user_status_data) = current_user_status_data {
// 设置线程任务
let user_id_clone = user_id.clone();
let from_id = from_id.clone();
let event_sender = event_sender.clone();
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
refuse_procedure_map.insert(user_id, cancel_tx);
let user_data_cloned_current = current_user_status_data.clone();
tokio::spawn(async move {
tokio::select! {
_ = sleep(Duration::from_secs(20)) => {
// 20秒内没有接听,准备挂断通话
let mut current_user_data_vec: Vec<&str> = current_user_status_data.split(',').collect();
let mut current_user_data_vec: Vec<String> = user_data_cloned_current.split(',').map(|s| s.to_string()).collect();
// 提前取出channelID,判断是否还有人在频道里面
let current_channel_id = current_user_data_vec[1];
current_user_data_vec[0] = "idle";
current_user_data_vec[1] = "";
current_user_data_vec[6] = "0";
let current_channel_id = current_user_data_vec[1].clone();
current_user_data_vec[0] = "idle".to_string();
current_user_data_vec[1] = "".to_string();
current_user_data_vec[6] = "0".to_string();
// 更新本地数据
ONLINE_USERS.insert(
user_id_clone.clone(),
current_user_data_vec.join(","),
);
// 更新redis数据
if let Err(e) = update_client_redis_data(
user_id_clone.as_str(),
current_user_data_vec.join(","),
)
.await {
println!("更新redis数据失败:{:?} 用户id {}", e, user_id_clone);
} else {
println!("通知挂断refuse线程,更新redis数据成功");
}
let user_id_clone_for = user_id_clone.clone();
tokio::spawn(async move {
// 更新redis数据
if let Err(e) = update_client_redis_data(
user_id_clone_for.as_str(),
current_user_data_vec.join(","),
)
.await {
println!("更新redis数据失败:{:?} 用户id {}", e, user_id_clone_for);
} else {
println!("通知挂断refuse线程,更新redis数据成功");
}
});
// 对其发送CmdHangup,对发起方发送CmdRefuse
// 发起方的isHost一定是1,找到该用户即可
let host_user_id_now = ONLINE_USERS
let host_user_id_now = {
ONLINE_USERS
.iter()
.find(|entry| {
entry.value().split(',').collect::<Vec<_>>()[1]
......@@ -398,7 +444,8 @@ pub async fn handle_agora_call(
&& entry.value().split(',').collect::<Vec<_>>()[6]
== "1"
})
.map(|entry| entry.key().clone());
.map(|entry| entry.key().clone())
};
// 再找到对应的Sender
let host_user_sender_which = if let Some(host_user_id) = host_user_id_now {
CLIENT_SENDERS
......@@ -453,10 +500,12 @@ pub async fn handle_agora_call(
).await;
}
// 检查剩余频道人员数量
let left_users: Vec<(String, String)> = ONLINE_USERS.iter()
let left_users: Vec<(String, String)> = {
ONLINE_USERS.iter()
.filter(|entry| entry.value().split(',').collect::<Vec<_>>()[1] == current_channel_id)
.map(|entry| (entry.key().clone(), entry.value().clone())) // 克隆键和值
.collect();
.collect()
};
if left_users.len() <= 1 {
let only_left_user = left_users.get(0);
if let Some((user_id, user_data)) = only_left_user {
......@@ -466,8 +515,10 @@ pub async fn handle_agora_call(
if user_is_host {
// 发起方
// 找到发起方的Sender
if let Some(host_user_sender_which) =
CLIENT_SENDERS.iter().find(|entry| entry.key().0 == user_id.to_string()) {
let sender_or_not = {
CLIENT_SENDERS.iter().find(|entry| entry.key().0 == user_id.to_string())
};
if let Some(host_user_sender_which) = sender_or_not {
// 更新状态数据到OnlineUsers
user_data[0] = "idle";
user_data[1] = "";
......@@ -496,8 +547,10 @@ pub async fn handle_agora_call(
}
}
}
// 要求所有用户更新在线人员列表
notify_all_clients_to_update_online_users().await;
tokio::spawn(async move {
// 要求所有用户更新在线人员列表
notify_all_clients_to_update_online_users().await;
});
}
_ = cancel_rx.recv() => {
// 线程被取消
......@@ -515,15 +568,17 @@ pub async fn handle_agora_call(
"Refuse" => {
// 当前用户拒接电话之后,直接给当前用户发送CmdHangup消息
// 主动拒绝之后,也需要取消refuse_procedure_map的线程任务
if let Some(ref_procedure_sender) = refuse_procedure_map.get(from_id) {
let target_sender_or_not = { refuse_procedure_map.get(from_id) };
if let Some(ref_procedure_sender) = target_sender_or_not {
if let Err(e) = ref_procedure_sender.send(()) {
println!("取消拒绝接听的线程队列失败:{:?}", e);
} else {
println!("取消拒绝接听的线程队列成功");
}
}
let user_data = { ONLINE_USERS.get(from_id) };
// 给当前from_id用户发送CmdHangup,表示已经成功拒接了电话,并且修改了状态
if let Some(current_user_data) = ONLINE_USERS.get(from_id) {
if let Some(current_user_data) = user_data {
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
let hangup_refuse_message = serde_json::json!({
"msgType": "CmdHangup",
......@@ -542,12 +597,17 @@ pub async fn handle_agora_call(
current_user_data_vec[6] = "0";
let current_user_data_joined = current_user_data_vec.join(",");
ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone());
// 修改redis数据
if let Err(e) = update_client_redis_data(from_id, current_user_data_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, from_id);
} else {
println!("更新redis数据成功");
}
let from_id_clone = from_id.to_string();
tokio::spawn(async move {
// 修改redis数据
if let Err(e) =
update_client_redis_data(&from_id_clone, current_user_data_joined).await
{
println!("更新redis数据失败:{:?} 用户id {}", e, from_id_clone);
} else {
println!("更新redis数据成功");
}
});
// 直接给sender发送数据通知
send_inside_message(
&target_sender_which,
......@@ -560,11 +620,13 @@ pub async fn handle_agora_call(
// 注意,拒接电话只能有一个toID
// 在这里,toID的状态数据由客户端维护
// 找到对应toId的sender
if let Some(target_sender) = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == client_message_data.to_id.to_string())
.map(|entry| entry.key().clone())
{
let sender_found = {
CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == client_message_data.to_id.to_string())
.map(|entry| entry.key().clone())
};
if let Some(target_sender) = sender_found {
// 直接发送CmdHangup数据
let cmd_refuse_message = serde_json::json!({
"msgType": "CmdRefuse",
......@@ -587,13 +649,16 @@ pub async fn handle_agora_call(
} else {
println!("找不到toID对应的sender");
}
// 更新所有用户状态数据
notify_all_clients_to_update_online_users().await;
tokio::spawn(async move {
// 要求所有用户更新在线人员列表
notify_all_clients_to_update_online_users().await;
});
}
// 主持人结束通话
"EndMeeting" => {
let user_found = { ONLINE_USERS.get(from_id) };
// 要求判断是否为主持人,只有主持人可以结束通话
if let Some(current_user_data) = ONLINE_USERS.get(from_id) {
if let Some(current_user_data) = user_found {
let current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
// 如果不是主持人,不允许发送该消息
if current_user_data_vec[6] != "1" {
......@@ -619,16 +684,22 @@ pub async fn handle_agora_call(
let channel_id = current_user_data_vec[1];
if channel_id != "" {
// 从ONLINE_USERS中过滤筛选出所有与channelID相同的用户id集合
let users_to_notify = ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[2] == channel_id
})
.map(|entry| entry.key().to_string())
.collect::<Vec<String>>();
let users_to_notify = {
ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[2] == channel_id
})
.map(|entry| entry.key().to_string())
.collect::<Vec<String>>()
};
let clone_for_users_to_notify = users_to_notify.clone();
// 再根据上面的id,找到所有sender,并且修改所有对应ONLINE_USERS数据
for user_id_current_chat in users_to_notify.iter() {
if let Some(current_user_data) = ONLINE_USERS.get(user_id_current_chat)
for user_id_current_chat in users_to_notify {
let user_d = {
ONLINE_USERS.get(&user_id_current_chat)
};
if let Some(current_user_data) = user_d
{
let mut current_user_data_vec: Vec<&str> =
current_user_data.split(',').collect();
......@@ -640,23 +711,25 @@ pub async fn handle_agora_call(
user_id_current_chat.to_string(),
current_user_data_joined.clone(),
);
// 更新对应redis数据
if let Err(e) = update_client_redis_data(
user_id_current_chat.as_str(),
current_user_data_joined,
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {}",
e, user_id_current_chat
);
} else {
println!("更新redis数据成功");
}
tokio::spawn(async move {
// 更新对应redis数据
if let Err(e) = update_client_redis_data(
user_id_current_chat.as_str(),
current_user_data_joined,
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {}",
e, user_id_current_chat
);
} else {
println!("更新redis数据成功");
}
});
}
}
for user_id in users_to_notify {
for user_id in clone_for_users_to_notify {
if let Some(target_sender) = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == user_id)
......@@ -682,16 +755,19 @@ pub async fn handle_agora_call(
.await;
}
}
// 发送完毕之后,要求更新所有用户数据
notify_all_clients_to_update_online_users().await;
tokio::spawn(async move {
// 发送完毕之后,要求更新所有用户数据
notify_all_clients_to_update_online_users().await;
});
}
}
}
}
// 通话过程中挂断
"Hangup" => {
let user_d = { ONLINE_USERS.get(from_id) };
// 挂断通话不是拒接,不需要处理refuse_procedure_map中的线程消息
if let Some(current_user_data) = ONLINE_USERS.get(from_id) {
if let Some(current_user_data) = user_d {
// 直接修改对应数据即可
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
let is_current_user_host = current_user_data_vec[6] == "1";
......@@ -702,12 +778,17 @@ pub async fn handle_agora_call(
current_user_data_vec[6] = "0";
let current_user_data_joined = current_user_data_vec.join(",");
ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone());
// 更新redis数据
if let Err(e) = update_client_redis_data(from_id, current_user_data_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, from_id);
} else {
println!("更新redis数据成功");
}
let from_id_clone = from_id.to_string();
tokio::spawn(async move {
// 更新redis数据
if let Err(e) =
update_client_redis_data(&from_id_clone, current_user_data_joined).await
{
println!("更新redis数据失败:{:?} 用户id {}", e, from_id_clone);
} else {
println!("更新redis数据成功");
}
});
// 直接向其发送CmdHangup即可
send_inside_message(
&target_sender_which,
......@@ -729,14 +810,16 @@ pub async fn handle_agora_call(
// 如果当前用户是主持人,主持人退出之后,需要在剩下的用户中决定新主持人
if is_current_user_host {
// 过滤出剩下来的channelID同一的在线用户,并且整合为Vec集合
let remaining_users: Vec<String> = ONLINE_USERS
.iter()
.filter(|entry| {
let v_vec: Vec<&str> = entry.value().split(',').collect();
v_vec[1] == current_chatting_channel_id
})
.map(|entry| entry.key().to_string())
.collect();
let remaining_users: Vec<String> = {
ONLINE_USERS
.iter()
.filter(|entry| {
let v_vec: Vec<&str> = entry.value().split(',').collect();
v_vec[1] == current_chatting_channel_id
})
.map(|entry| entry.key().to_string())
.collect()
};
// 没有人参与会议了,直接退出即可
if remaining_users.len() == 0 {
println!("当前频道没有人员,请重新发起通话");
......@@ -746,11 +829,13 @@ pub async fn handle_agora_call(
else {
// 无论如何,都要向他们发送CmdLeave消息
for user_id in remaining_users.iter() {
if let Some(target_sender) = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == *user_id)
.map(|entry| entry.key().clone())
{
let sender_found = {
CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == *user_id)
.map(|entry| entry.key().clone())
};
if let Some(target_sender) = sender_found {
// 找到这个sender标记符之后,发送消息CmdEndMeeting
let cmd_leave_message = serde_json::json!({
"msgType": "CmdLeave",
......@@ -775,144 +860,165 @@ pub async fn handle_agora_call(
.insert(current_chatting_channel_id.to_string(), cancel_tx);
let channel_id_clone = current_chatting_channel_id.to_string();
let ev_clone = event_sender.clone();
// 开启一个tokio线程,使用async move
tokio::spawn(async move {
let remaining_users_clone = remaining_users.clone();
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(20)) => {
// 20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令
println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {:?}", remaining_users);
if let Some(remaining_user_id) = remaining_users.get(0) {
{
if let Some(remain_user_info_data) = ONLINE_USERS.get(remaining_user_id){
// 找到对应Sender
if let Some(target_sender_which) = CLIENT_SENDERS.iter()
.find(|entry| entry.key().0 == *remaining_user_id) {
// 修改状态数据
let mut user_info_data_for_remained = remain_user_info_data.split(',').collect::<Vec<_>>();
user_info_data_for_remained[0] = "idle";
user_info_data_for_remained[1] = "";
user_info_data_for_remained[6] = "0";
let user_info_data_for_remained_joined = user_info_data_for_remained.join(",");
ONLINE_USERS.insert(remaining_user_id.to_string(), user_info_data_for_remained_joined.clone());
if let Err(e) = update_client_redis_data(remaining_user_id, user_info_data_for_remained_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, remaining_user_id);
println!("20s内如果该频道与会人数仍为1,则对剩下来的用户发送CmdHangup指令 {:?}", remaining_users_clone);
if let Some(remaining_user_id) = remaining_users_clone.get(0) {
let remaining_user_id_clone = remaining_user_id.clone(); // 克隆 remaining_user_id
let remain_user_d = ONLINE_USERS.get(&remaining_user_id_clone);
if let Some(remain_user_info_data) = remain_user_d {
let sender_d = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == remaining_user_id_clone)
.map(|entry| entry.key().clone());
if let Some(target_sender_which) = sender_d {
let mut user_info_data_for_remained = remain_user_info_data.split(',').collect::<Vec<&str>>();
user_info_data_for_remained[0] = "idle";
user_info_data_for_remained[1] = "";
user_info_data_for_remained[6] = "0";
let user_info_data_for_remained_joined = user_info_data_for_remained.join(",");
// 插入克隆后的 remaining_user_id_clone
ONLINE_USERS.insert(remaining_user_id_clone.clone(), user_info_data_for_remained_joined.clone());
// 内部 tokio::spawn 使用克隆后的 remaining_user_id_clone
let clone_for_redis = remaining_user_id_clone.clone();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(&clone_for_redis, user_info_data_for_remained_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, clone_for_redis);
} else {
println!("更新redis数据成功");
}
// 发送CmdHangup
send_inside_message(
&target_sender_which.key(),
&ev_clone,
serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": remaining_user_id,
"msgData": {
"channelId":"",
}
}).to_string(),
&remaining_user_id,
).await;
// 要求所有用户更新数据
});
send_inside_message(
&target_sender_which,
&ev_clone,
serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": remaining_user_id_clone,
"msgData": {
"channelId": "",
"rtcToken": ""
}
}).to_string(),
&remaining_user_id_clone,
).await;
tokio::spawn(async move {
notify_all_clients_to_update_online_users().await;
}
});
}
}
}
}
_ = cancel_rx.recv() => {
// 如果收到取消信号,则直接结束线程
println!("收到取消信号,有新的人员加入到会议,会议频道号 {} 结束线程", channel_id_clone);
}
}
});
}
// 如果余下用户数量超过2人及以上,则需要判断应该让谁让主持人
// 如果余下用户数量超过2人及以上,则需要判断应该让谁成为主持人
else {
// 分组:1.筛选出id在HOST_ENABLED_ID_SET中的用户,即允许成为主持人的用户,2.筛选出id不在HOST_ENABLED_ID_SET中的用户,即不允许成为主持人的用户
// 注意,这里只需要修改一下状态数据即可,不用单独发送消息
let (allowed_users, disallowed_users): (Vec<_>, Vec<_>) =
remaining_users
.iter()
.partition(|user_id| HOST_ENABLED_ID_SET.contains(user_id.as_str()));
if !allowed_users.is_empty() {
let (allowed_users, disallowed_users): (Vec<String>, Vec<String>) =
remaining_users.into_iter().partition(|user_id| {
HOST_ENABLED_ID_SET.contains(user_id.as_str())
});
let allowed_users_clone = allowed_users.clone(); // 克隆 allowed_users
if !allowed_users_clone.is_empty() {
// 让第一位id的用户成为主持人
if let Some(allowed_user_id) = allowed_users.get(0) {
if let Some(allowed_user_info_data) =
ONLINE_USERS.get(allowed_user_id.as_str())
{
{
// 修改状态数据
let mut user_info_data_for_allowed =
allowed_user_info_data
.split(',')
.collect::<Vec<_>>();
user_info_data_for_allowed[6] = "1";
// 修改online数据信息
let user_info_data_for_allowed_joined =
user_info_data_for_allowed.join(",");
ONLINE_USERS.insert(
allowed_user_id.to_string(),
user_info_data_for_allowed_joined.clone(),
);
if let Some(allowed_user_id) = allowed_users_clone.get(0) {
let allowed_user_id_clone = allowed_user_id.clone(); // 克隆 allowed_user_id
let data_f = {
ONLINE_USERS.get(&allowed_user_id_clone)
};
if let Some(allowed_user_info_data) = data_f {
let mut user_info_data_for_allowed =
allowed_user_info_data
.split(',')
.map(|s| s.to_string())
.collect::<Vec<String>>();
user_info_data_for_allowed[6] = "1".to_string();
// 修改online数据信息
let user_info_data_for_allowed_joined =
user_info_data_for_allowed.join(",");
ONLINE_USERS.insert(
allowed_user_id_clone.clone(),
user_info_data_for_allowed_joined.clone(),
);
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
allowed_user_id,
user_info_data_for_allowed_joined.clone(),
&allowed_user_id_clone,
user_info_data_for_allowed_joined,
)
.await
.await
{
println!(
"更新redis数据失败:{:?} 用户id {}",
e, allowed_user_id
)
e, allowed_user_id_clone
);
} else {
println!("更新redis数据成功");
}
}
});
}
}
} else if !disallowed_users.is_empty() {
// 让第一位
if let Some(disallowed_user_id) = disallowed_users.get(0) {
// 完全照搬上面的逻辑
if let Some(disallowed_user_info_data) =
ONLINE_USERS.get(disallowed_user_id.as_str())
{
{
let mut current_host_data = disallowed_user_info_data
let disallowed_users_clone = disallowed_users.clone(); // 克隆 disallowed_users
if let Some(disallowed_user_id) = disallowed_users_clone.get(0) {
let disallowed_user_id_clone = disallowed_user_id.clone(); // 克隆 disallowed_user_id
let d_f = {
ONLINE_USERS.get(&disallowed_user_id_clone)
};
if let Some(disallowed_user_info_data) = d_f {
let mut current_host_data =
disallowed_user_info_data
.split(',')
.collect::<Vec<_>>();
current_host_data[6] = "1";
let current_host_data_joined =
current_host_data.join(",");
// 更新ONLINE_USERS数据
ONLINE_USERS.insert(
disallowed_user_id.to_string(),
current_host_data_joined.clone(),
);
.map(|s| s.to_string())
.collect::<Vec<String>>();
current_host_data[6] = "1".to_string();
let current_host_data_joined = current_host_data.join(",");
// 更新ONLINE_USERS数据
ONLINE_USERS.insert(
disallowed_user_id_clone.clone(),
current_host_data_joined.clone(),
);
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
disallowed_user_id,
current_host_data_joined.clone(),
&disallowed_user_id_clone,
current_host_data_joined,
)
.await
.await
{
println!(
"更新redis数据失败:{:?} 用户id {}",
e, disallowed_user_id
)
e, disallowed_user_id_clone
);
} else {
println!("更新redis数据成功");
}
}
});
}
}
}
}
}
}
// 无论如何都要发送通知在线人员更新
notify_all_clients_to_update_online_users().await;
tokio::spawn(async move {
// 无论如何都要发送通知在线人员更新
notify_all_clients_to_update_online_users().await;
});
}
}
// 接听通话,这个是目前最重要的逻辑部分
......@@ -922,11 +1028,19 @@ pub async fn handle_agora_call(
// 3.修改状态并且广播出去
// 4.决定主持人。一般来说,谁发起了这个频道的人,并且具备成为主持人的资格,那么就直接成为主持人,否则就让第一个加入的人成为主持人。
"Connect" => {
println!("收到客户端Connect消息连接 {} 频道号 {}", &from_id, client_message_data.msg_data.get("channelID").unwrap_or(&serde_json::Value::String("null".to_string())));
println!(
"收到客户端Connect消息连接 {} 频道号 {}",
&from_id,
client_message_data
.msg_data
.get("channelID")
.unwrap_or(&serde_json::Value::String("null".to_string()))
);
if let Some(channel_id) = client_message_data.msg_data.get("channelID") {
// 这里的from_id就是被呼叫方,to_id就是呼叫方
// 首先先取消掉所有的挂断from_id的任务
if let Some(hangup_personnel) = refuse_procedure_map.get(from_id) {
let hangup_p = { refuse_procedure_map.get(from_id) };
if let Some(hangup_personnel) = hangup_p {
if let Err(e) = hangup_personnel.send(()) {
println!("挂断personnel的任务终结失败 {:?}", e)
} else {
......@@ -936,7 +1050,8 @@ pub async fn handle_agora_call(
// 将channel_id转换为字符串,并且进行解析
if let Some(channel_id_str) = channel_id.as_str() {
// 检查一下是否存在挂断channel的任务,如果存在,对齐发送消息,然后关闭
if let Some(hangup_channel_task) = channel_hangup_procedure_map.get(channel_id_str) {
let hangup_p = { channel_hangup_procedure_map.get(channel_id_str) };
if let Some(hangup_channel_task) = hangup_p {
if let Err(e) = hangup_channel_task.send(()) {
println!("挂断channel的任务终结失败 {:?}", e)
} else {
......@@ -944,19 +1059,30 @@ pub async fn handle_agora_call(
}
}
// 如果当前channel已经有主持人,则不做处理
let already_had_host = ONLINE_USERS.iter()
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] ==channel_id_str)
.any(|entry| entry.value().split(",").collect::<Vec<_>>()[6] == "1");
let already_had_host = {
ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[1] == channel_id_str
})
.any(|entry| entry.value().split(",").collect::<Vec<_>>()[6] == "1")
};
if already_had_host {
println!("当前channel已经有主持人,直接返回 {}", channel_id_str)
} else {
println!("当前channel没有主持人,开始处理 {}", channel_id_str);
// 首先判断发起方,也就是channel_id用_分割之后的第一个元素,是否具备成为主持人的资格
if let Some(channel_id_first_element) = channel_id_str.split("_").collect::<Vec<_>>().get(0) {
let last_remaining_id_set = channel_id_str.split("_").map(|s| s.to_string()).collect::<Vec<_>>();
if let Some(channel_id_first_element) =
last_remaining_id_set.get(0)
{
if HOST_ENABLED_ID_SET.contains(&channel_id_first_element.to_string()) {
// 具备成为主持人资格,直接修改其对应数据即可
if let Some(host_data) = ONLINE_USERS.get(&channel_id_first_element.to_string()) {
let mut host_data_vec = host_data.split(",").collect::<Vec<_>>();
let online_d =
{ ONLINE_USERS.get(&channel_id_first_element.to_string()) };
if let Some(host_data) = online_d {
let mut host_data_vec =
host_data.split(",").collect::<Vec<_>>();
host_data_vec[6] = "1";
let host_data_joined = host_data_vec.join(",");
// 直接再次更新online_users
......@@ -964,55 +1090,66 @@ pub async fn handle_agora_call(
channel_id_first_element.to_string(),
host_data_joined.clone(),
);
if let Err(e) = update_client_redis_data(
channel_id_first_element,
host_data_joined.clone(),
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {} 成为主持人",
e, channel_id_first_element
let the_first_for = channel_id_first_element.to_string();
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&the_first_for,
host_data_joined.clone(),
)
} else {
println!("更新redis数据成功");
}
.await
{
println!(
"更新redis数据失败:{:?} 用户id {} 成为主持人",
e, the_first_for
)
} else {
println!("更新redis数据成功");
}
});
}
} else {
// 获取所有同channel_id的用户,在ONLINE_USERS中寻找第一个满足条件的id
let all_in_group_chatters_id: Vec<String> = ONLINE_USERS.iter()
.filter(|entry| entry.value().split(",").collect::<Vec<_>>()[1] ==channel_id_str)
.map(|entry| entry.key().clone())
.collect::<Vec<String>>();
let all_in_group_chatters_id: Vec<String> = {
ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[1]
== channel_id_str
})
.map(|entry| entry.key().clone())
.collect::<Vec<String>>()
};
// 记录是否找到主持人对应id
let mut is_host_found = false;
// 在all_in_group_chatters_id中,找寻第一个在HOST_ENABLED_ID_SET中的id
for id in all_in_group_chatters_id {
if HOST_ENABLED_ID_SET.contains(&id) {
// 具备成为主持人资格,直接修改其对应数据即可
if let Some(host_data) = ONLINE_USERS.get(&id) {
let mut host_data_vec = host_data.split(",").collect::<Vec<_>>();
let hd = { ONLINE_USERS.get(&id) };
if let Some(host_data) = hd {
let mut host_data_vec =
host_data.split(",").collect::<Vec<_>>();
host_data_vec[6] = "1";
is_host_found = true;
// 更新对应数据
let host_data_joined = host_data_vec.join(",");
ONLINE_USERS.insert(
id.to_string(),
host_data_joined.clone(),
);
if let Err(e) = update_client_redis_data(
&id,
host_data_joined.clone(),
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {} 成为主持人",
e, id
ONLINE_USERS
.insert(id.to_string(), host_data_joined.clone());
tokio::spawn(async move {
if let Err(e) = update_client_redis_data(
&id,
host_data_joined.clone(),
)
} else {
println!("更新redis数据成功");
}
.await
{
println!(
"更新redis数据失败:{:?} 用户id {} 成为主持人",
e, id
)
} else {
println!("更新redis数据成功");
}
});
break;
}
}
......@@ -1024,16 +1161,16 @@ pub async fn handle_agora_call(
}
}
}
// 最后广播所有用户更新状态
notify_all_clients_to_update_online_users().await;
tokio::spawn(async move {
// 最后广播所有用户更新状态
notify_all_clients_to_update_online_users().await;
});
} else {
println!("客户端Connect消息 缺乏channel-id,拒绝处理消息")
}
}
// 处理静音
"Mute" => {
}
"Mute" => {}
_ => {}
}
}
......@@ -16,18 +16,18 @@ use tokio::sync::mpsc;
#[tokio::main]
async fn main() {
let listener = TcpListener::bind(addr).await.unwrap();
// 创建事件通道
let (event_sender, event_receiver) = mpsc::unbounded_channel();
// 启动事件处理任务
tokio::spawn(handle_events(event_receiver));
while let Ok((stream, _)) = listener.accept().await {
let client_event_sender = event_sender.clone();
// 创建一个用于事件中心向客户端发送消息的通道
let (center_to_client_sender, center_to_client_receiver) = mpsc::unbounded_channel();
tokio::spawn(handle_client(
stream,
client_event_sender,
event_sender.clone(),
center_to_client_sender,
center_to_client_receiver,
));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment