Commit 701a290a by qlintonger xeno

尝试对接声网有关操作以及音视频+1.15

parent 1ebd2716
......@@ -3,7 +3,16 @@ use crate::events::{notify_all_clients_to_update_online_users, Event, CLIENT_SEN
use crate::handles::redis::update_client_redis_data;
use crate::typing::message_typed::ClientMessageData;
use crate::utils::json_utils::get_current_timestamp;
use dashmap::DashMap;
use lazy_static::lazy_static;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::sleep;
lazy_static! {
pub static ref refuse_procedure_map: DashMap<String, UnboundedSender<()>> = DashMap::new();
}
async fn send_inside_message(
target_sender: &(String, u128),
......@@ -84,6 +93,16 @@ pub async fn handle_agora_call(
// 拆分toId为其他用户id,并且获取其数据信息
let all_to_hangup_id_vec: Vec<&str> = calling_to_id.split(',').collect();
for to_hangup_id in all_to_hangup_id_vec {
// 取消拒绝接听的线程队列
if let Some(ref_procedure_sender) =
refuse_procedure_map.get(&to_hangup_id.to_string())
{
if let Err(e) = ref_procedure_sender.send(()) {
println!("取消拒绝接听的线程队列失败:{:?}", e);
} else {
println!("取消拒绝接听的线程队列成功");
}
}
// 获取当前用户状态信息成功后,再进行额外操作
if let Some(the_other_caller_data) = ONLINE_USERS.get(to_hangup_id) {
let mut the_other_caller_data: Vec<&str> =
......@@ -166,7 +185,8 @@ pub async fn handle_agora_call(
r#"{{"msgType": "Error", "fromID": "0", "fromName": "Server", "msgData": "请指定呼叫对象", "toID": "{}"}}"#,
&from_id
);
send_inside_message(&target_sender_which, &event_sender, error_json, &from_id).await;
send_inside_message(&target_sender_which, &event_sender, error_json, &from_id)
.await;
return;
}
// 不允许自己跟自己打电话
......@@ -175,7 +195,13 @@ pub async fn handle_agora_call(
r#"{{"msgType": "Error", "fromID": "0", "fromName": "Server", "msgData": "不能给自己打电话", "toID": "{}"}}"#,
&from_id
);
send_inside_message(&target_sender_which, &event_sender, error_self_json, &from_id).await;
send_inside_message(
&target_sender_which,
&event_sender,
error_self_json,
&from_id,
)
.await;
return;
}
// 获取当前用户状态信息成功后,再进行额外操作
......@@ -190,8 +216,8 @@ pub async fn handle_agora_call(
};
// 注意一下,这里的to_id有可能是多个,需要做一下过滤处理
let calling_to_id_vec: Vec<&str> = calling_to_id.split(',').collect();
// 计数成功呼叫个数
let mut success_full_send_count = 0;
// 储存需要计入CmdRefuse线程用户
let mut refuse_thread_users: Vec<String> = vec![];
for calling_to_id in calling_to_id_vec {
// 1.检查目标用户是否在线
if !ONLINE_USERS.contains_key(calling_to_id) {
......@@ -199,7 +225,13 @@ pub async fn handle_agora_call(
r#"{{"msgType":"Error","fromID":"0","fromName":"Server","msgData":"对方不在线","toID":"{}"}}"#,
&client_message_data.to_id
);
send_inside_message(&target_sender_which, &event_sender, json_string, &from_id).await;
send_inside_message(
&target_sender_which,
&event_sender,
json_string,
&from_id,
)
.await;
continue;
}
// 2.检查目标用户是否为空闲状态
......@@ -212,7 +244,13 @@ pub async fn handle_agora_call(
r#"{{"msgType":"Error","fromID":"0","fromName":"Server","msgData":"对方正在呼叫中","toID":"{}"}}"#,
from_id
);
send_inside_message(&target_sender_which, &event_sender, json_string, &from_id).await;
send_inside_message(
&target_sender_which,
&event_sender,
json_string,
&from_id,
)
.await;
continue;
}
// 3.如果为空闲状态,准备发送Call类型消息
......@@ -267,11 +305,11 @@ pub async fn handle_agora_call(
} else {
println!("更新redis数据成功");
}
success_full_send_count += 1;
refuse_thread_users.push(calling_to_id.to_string());
}
}
// 如果成功发送了呼叫信息,则根据当前用户状态发送消息通知
if success_full_send_count > 0 {
if refuse_thread_users.len() > 0 {
// 如果当前用户本身为idle,则发送CmdCall
if the_user_data[0] == "idle" || the_user_data[0] == "calling" {
let cmd_call_message_json = serde_json::json!({
......@@ -288,20 +326,8 @@ pub async fn handle_agora_call(
// 更新状态数据
the_user_data[0] = "calling";
the_user_data[1] = channel_id_now.as_str();
// 判断是否应当被设置为主持人
// 若同channelID内的所有数据中没有一个isHost为0,则设置当前用户isHost为1,否则为0
if ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(',').collect::<Vec<_>>()[1]
== channel_id_now
})
.any(|entry| entry.value().split(',').collect::<Vec<_>>()[6] == "0")
{
the_user_data[6] ="1";
} else {
the_user_data[6] ="0";
}
the_user_data[6] = "0";
// 在连接成功之后再设置为主持人,在此之前一律将所有用户的isHost数据段设置为0
let the_user_data_joined = the_user_data.join(",");
ONLINE_USERS.insert(from_id.to_string(), the_user_data_joined.clone());
// 修改redis数据
......@@ -321,8 +347,159 @@ pub async fn handle_agora_call(
.await;
// 最后广播用户更新
notify_all_clients_to_update_online_users().await;
// 开启定时器线程任务,如果目标用户20s内没有接听,则挂断,并且重置所有数据
// 开启定时器线程任务,如果所有用户20s内没有接听,则挂断,发送所有CmdRefuse数据并重置
for user_id in refuse_thread_users {
if let Some(current_user_status_data) = ONLINE_USERS.get(&user_id) {
// 设置线程任务
let user_id_clone = user_id.clone();
let from_id = from_id.clone();
let event_sender = event_sender.clone();
let (cancel_tx, mut cancel_rx) = mpsc::unbounded_channel::<()>();
refuse_procedure_map.insert(user_id, cancel_tx);
tokio::spawn(async move {
tokio::select! {
_ = sleep(Duration::from_secs(20)) => {
// 20秒内没有接听,准备挂断通话
let mut current_user_data_vec: Vec<&str> = current_user_status_data.split(',').collect();
// 提前取出channelID,判断是否还有人在频道里面
let current_channel_id = current_user_data_vec[1];
current_user_data_vec[0] = "idle";
current_user_data_vec[1] = "";
current_user_data_vec[6] = "0";
// 更新本地数据
ONLINE_USERS.insert(
user_id_clone.clone(),
current_user_data_vec.join(","),
);
// 更新redis数据
if let Err(e) = update_client_redis_data(
user_id_clone.as_str(),
current_user_data_vec.join(","),
)
.await {
println!("更新redis数据失败:{:?} 用户id {}", e, user_id_clone);
} else {
println!("通知挂断refuse线程,更新redis数据成功");
}
// 对其发送CmdHangup,对发起方发送CmdRefuse
// 发起方的isHost一定是1,找到该用户即可
let host_user_id_now = ONLINE_USERS
.iter()
.find(|entry| {
entry.value().split(',').collect::<Vec<_>>()[1]
== current_channel_id
&& entry.value().split(',').collect::<Vec<_>>()[6]
== "1"
})
.map(|entry| entry.key().clone());
// 再找到对应的Sender
let host_user_sender_which = if let Some(host_user_id) = host_user_id_now {
CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == host_user_id)
.map(|entry| entry.key().clone())
} else {
None
};
if let Some(host_user_sender_which) = host_user_sender_which {
// 对其发送消息
send_inside_message(
&host_user_sender_which,
&event_sender,
serde_json::json!({
"msgType": "CmdRefuse",
"fromID": user_id_clone,
"fromName": "Unknown",
"toID": &host_user_sender_which.0,
"msgData": {
"channelId": "",
"rtcToken": ""
}
}).to_string(),
&host_user_sender_which.0,
)
.await;
}
// 向当前用户Sender发送CmdHangup要求其挂断
let hangup_user_sender_which = if let Some(hangup_user_sender_which) =
CLIENT_SENDERS.iter().find(|entry| entry.key().0 == user_id_clone) {
Some(hangup_user_sender_which.key().clone())
} else {
None
};
if let Some(hangup_user_sender_which) = hangup_user_sender_which {
// 对其发送消息
send_inside_message(
&hangup_user_sender_which,
&event_sender,
serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
"msgData": {
"channelId": "",
"rtcToken": ""
}
}).to_string(),
&user_id_clone.to_string(),
).await;
}
// 检查剩余频道人员数量
let left_users: Vec<(String, String)> = ONLINE_USERS.iter()
.filter(|entry| entry.value().split(',').collect::<Vec<_>>()[1] == current_channel_id)
.map(|entry| (entry.key().clone(), entry.value().clone())) // 克隆键和值
.collect();
if left_users.len() <= 1 {
let only_left_user = left_users.get(0);
if let Some((user_id, user_data)) = only_left_user {
let mut user_data = user_data.split(',').collect::<Vec<&str>>();
{
let user_is_host = user_data[1] == "1";
if user_is_host {
// 发起方
// 找到发起方的Sender
if let Some(host_user_sender_which) =
CLIENT_SENDERS.iter().find(|entry| entry.key().0 == user_id.to_string()) {
// 更新状态数据到OnlineUsers
user_data[0] = "idle";
user_data[1] = "";
user_data[6] = "0";
ONLINE_USERS.insert(user_id.to_string(), user_data.join(","));
// 更新redis数据
if let Err(e) = update_client_redis_data(&user_id.to_string(), user_data.join(",")).await {
println!("只有一人在会议中,更新redis数据失败: {}", e)
} else {
println!("只有一人在会议中,更新redis数据成功")
}
// 对其发送CmdCancelCall
send_inside_message(
host_user_sender_which.key(),
&event_sender,
serde_json::json!({
"msgType": "CmdCancelCall",
"fromID": "0",
"fromName": "Server",
"toID": host_user_sender_which.key().0,
"msgData": {}
}).to_string(), user_id,
).await;
}
}
}
}
}
// 要求所有用户更新在线人员列表
notify_all_clients_to_update_online_users().await;
}
_ = cancel_rx.recv() => {
// 线程被取消
println!("用户已接听或者主动挂断 {}", user_id_clone);
}
}
});
}
}
}
}
}
......@@ -330,11 +507,19 @@ pub async fn handle_agora_call(
// 拒接电话
"Refuse" => {
// 当前用户拒接电话之后,直接给当前用户发送CmdHangup消息
// 给toId用户发送CmdRefuse,表示对方拒接了电话
// 主动拒绝之后,也需要取消refuse_procedure_map的线程任务
if let Some(ref_procedure_sender) = refuse_procedure_map.get(from_id) {
if let Err(e) = ref_procedure_sender.send(()) {
println!("取消拒绝接听的线程队列失败:{:?}", e);
} else {
println!("取消拒绝接听的线程队列成功");
}
}
// 给当前from_id用户发送CmdHangup,表示已经成功拒接了电话,并且修改了状态
if let Some(current_user_data) = ONLINE_USERS.get(from_id) {
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
let hangup_refuse_message = serde_json::json!({
"msgType": "CmdRefuse",
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
......@@ -347,12 +532,11 @@ pub async fn handle_agora_call(
// 修改状态数据
current_user_data_vec[0] = "idle";
current_user_data_vec[1] = "";
current_user_data_vec[6] = "0";
let current_user_data_joined = current_user_data_vec.join(",");
ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone());
// 修改redis数据
if let Err(e) =
update_client_redis_data(from_id, current_user_data_joined).await
{
if let Err(e) = update_client_redis_data(from_id, current_user_data_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, from_id);
} else {
println!("更新redis数据成功");
......@@ -372,7 +556,8 @@ pub async fn handle_agora_call(
if let Some(target_sender) = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == client_message_data.to_id.to_string())
.map(|entry| entry.key().clone()) {
.map(|entry| entry.key().clone())
{
// 直接发送CmdHangup数据
let cmd_refuse_message = serde_json::json!({
"msgType": "CmdRefuse",
......@@ -389,8 +574,9 @@ pub async fn handle_agora_call(
&target_sender,
event_sender,
cmd_refuse_message,
&client_message_data.to_id
).await;
&client_message_data.to_id,
)
.await;
} else {
println!("找不到toID对应的sender");
}
......@@ -401,7 +587,7 @@ pub async fn handle_agora_call(
"EndMeeting" => {
// 要求判断是否为主持人,只有主持人可以结束通话
if let Some(current_user_data) = ONLINE_USERS.get(from_id) {
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
let current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
// 如果不是主持人,不允许发送该消息
if current_user_data_vec[6] != "1" {
// 发送错误信息
......@@ -417,13 +603,133 @@ pub async fn handle_agora_call(
&target_sender_which,
event_sender,
error_message,
&from_id
).await;
&from_id,
)
.await;
} else {
// 如果是主持人,则向所有人发送CmdEndMeeting消息,并且清理所有数据
// 提取出目前channel_id
let channel_id = current_user_data_vec[1];
if channel_id != "" {
// 从ONLINE_USERS中过滤筛选出所有与channelID相同的用户id集合
let users_to_notify = ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(",").collect::<Vec<_>>()[2] == channel_id
})
.map(|entry| entry.key().to_string())
.collect::<Vec<String>>();
// 再根据上面的id,找到所有sender,并且修改所有对应ONLINE_USERS数据
for user_id_current_chat in users_to_notify.iter() {
if let Some(current_user_data) = ONLINE_USERS.get(user_id_current_chat)
{
let mut current_user_data_vec: Vec<&str> =
current_user_data.split(',').collect();
current_user_data_vec[0] = "idle";
current_user_data_vec[1] = "";
current_user_data_vec[6] = "0";
let current_user_data_joined = current_user_data_vec.join(",");
ONLINE_USERS.insert(
user_id_current_chat.to_string(),
current_user_data_joined.clone(),
);
// 更新对应redis数据
if let Err(e) = update_client_redis_data(
user_id_current_chat.as_str(),
current_user_data_joined,
)
.await
{
println!(
"更新redis数据失败:{:?} 用户id {}",
e, user_id_current_chat
);
} else {
println!("更新redis数据成功");
}
}
}
for user_id in users_to_notify {
if let Some(target_sender) = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == user_id)
.map(|entry| entry.key().clone())
{
// 找到这个sender标记符之后,发送消息CmdEndMeeting
send_inside_message(
&target_sender,
event_sender,
serde_json::json!({
"msgType": "CmdEndMeeting",
"fromID": "0",
"fromName": "Server",
"toID": user_id,
"msgData": {
"channelId": "",
"rtcToken": ""
}
})
.to_string(),
&user_id,
)
.await;
}
}
// 发送完毕之后,要求更新所有用户数据
notify_all_clients_to_update_online_users().await;
}
}
}
}
// 通话过程中挂断
"Hangup" => {
// 挂断通话不是拒接,不需要处理refuse_procedure_map中的线程消息
if let Some(current_user_data) = ONLINE_USERS.get(from_id) {
// 直接修改对应数据即可
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
current_user_data_vec[0] = "idle";
current_user_data_vec[1] = "";
let is_current_user_host = current_user_data_vec[6] == "1";
// 无论是否是主持人,都需要发送对应消息,更新数据以及同步redis
current_user_data_vec[6] = "0";
let current_user_data_joined = current_user_data_vec.join(",");
ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone());
// 更新redis数据
if let Err(e) = update_client_redis_data(from_id, current_user_data_joined).await {
println!("更新redis数据失败:{:?} 用户id {}", e, from_id);
} else {
println!("更新redis数据成功");
}
// 直接向其发送CmdHangup即可
send_inside_message(
&target_sender_which,
event_sender,
serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
"msgData": {
"channelId": "",
"rtcToken": ""
}
})
.to_string(),
&from_id,
)
.await;
// 判断一下当前用户是不是主持人
if is_current_user_host {
}
// 无论如何都要发送通知在线人员更新
notify_all_clients_to_update_online_users().await;
}
}
// 接听通话,这个是目前最重要的逻辑部分
"Connect" => {
}
_ => {}
}
}
use std::collections::HashMap;
use dashmap::DashSet;
use lazy_static::lazy_static;
use tokio::net::TcpStream;
use tokio_tungstenite::{accept_hdr_async, WebSocketStream};
use tungstenite::handshake::client::Request;
use tungstenite::handshake::server::Response;
// 存储可以为host的id集合
// 存储hasMike以及hasCamera
lazy_static! {
pub static ref HOST_ENABLED_ID_SET: DashSet<String> = DashSet::new();
}
// 提取出来的处理握手的函数
pub(crate) fn handle_handshake(
req: &tungstenite::handshake::server::Request,
......@@ -38,6 +46,12 @@ pub(crate) fn handle_handshake(
return Err("wsPwd不正确!".to_string());
}
// 存储是否为host
if connection_params.get("enable_host").is_some() {
println!("当前id允许可以成为host! {}", connection_params.get("fromID").unwrap());
HOST_ENABLED_ID_SET.insert(connection_params.get("fromID").unwrap().to_string());
}
println!("握手成功!");
Ok(connection_params)
......
......@@ -54,12 +54,21 @@ pub async fn insert_this_connection(
let device_id = params.get("deviceID").cloned().unwrap_or("".to_string());
let from_name = params.get("fromName").cloned().unwrap_or("".to_string());
let has_mike = if let Some(has_mike) = params.get("hasMike"){
has_mike.to_string()
} else {
"0".to_string()
};
let has_camera = if let Some(has_camera) = params.get("hasCamera"){
has_camera.to_string()
} else {
"0".to_string()
};
// 按照结构体OnlineUserMessage的格式构造用户信息字符串,用逗号拼接即可
// callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName
let user_info_str = format!(
"{},{},{},{},1,1,0,0,{}",
"idle", "", from_id, device_id, from_name
"{},{},{},{},{},{},0,0,{}",
"idle", "", from_id, device_id, has_camera, has_mike,from_name
);
if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &user_info_str) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment