Commit 2579a50d by qlintonger xeno

尝试对接声网有关操作以及音视频+1

parent 1bc4703d
use crate::config::config::STATIC_WS_PWD;
use crate::deport::handle_ws_msg;
use crate::events::{register_client, ClientMessage, Event};
use crate::handles::handshake::handle_handshake;
use crate::handles::handshake::handle_websocket_handshake;
use crate::handles::heartbeat::heart_resp;
use crate::handles::online_users_update::send_online_users_and_send;
use crate::utils::json_utils::parse_message;
......@@ -12,8 +12,6 @@ use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender};
use tokio::sync::watch;
use tokio::time;
use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message};
lazy_static! {
......@@ -29,28 +27,11 @@ pub(crate) async fn handle_client(
let must_existed_params = ["deviceID", "fromID", "wsPwd"];
let mut connection_params = None;
let ws_stream = match accept_hdr_async(stream, |req: &Request, resp| {
match handle_handshake(req, &must_existed_params, STATIC_WS_PWD) {
Ok(params) => {
connection_params = Some(params);
Ok(resp)
}
Err(error_msg) => {
println!("{}", error_msg);
let error_resp = Response::builder()
.status(400)
.header("Content-Type", "text/plain")
.body(Some(error_msg))
.unwrap();
Err(error_resp)
}
}
})
.await
{
let ws_stream =
match handle_websocket_handshake(stream, &must_existed_params, STATIC_WS_PWD, &mut connection_params).await {
Ok(ws) => ws,
Err(e) => {
println!("WebSocket握手失败: {}", e);
println!("WebSocket握手失败: {:?}", e);
return Ok(());
}
};
......@@ -99,11 +80,7 @@ pub(crate) async fn handle_client(
break;
}
}
ClientMessage::SendClientMessage(
from_id,
client_message,
close
) => {
ClientMessage::SendClientMessage(from_id, client_message, close) => {
let real_user_id = from_id.0;
if let Err(e) = sender.send(Message::text(client_message)).await {
println!("发送给用户id {} 独立消息失败:{:?}", real_user_id, e);
......@@ -113,7 +90,7 @@ pub(crate) async fn handle_client(
// 通知外层循环关闭
close_tx.send(true).unwrap();
println!("发送给用户id {} 要求关闭连接", real_user_id);
break
break;
}
}
}
......@@ -144,7 +121,7 @@ pub(crate) async fn handle_client(
let event_sender_clone = event_sender.clone();
let connection_time_clone = connection_time.clone();
tokio::spawn(async move {
handle_ws_msg(data.msg_type, from_id_clone, event_sender_clone, connection_time_clone).await;
handle_ws_msg(&data, from_id_clone, event_sender_clone, connection_time_clone).await;
});
}
}
......
use crate::events::Event;
use crate::handles::handle_agora_call::handle_agora_call;
use crate::handles::online_users_update::{online_messages, send_online_users_resp};
use crate::typing::message_typed::ClientMessageData;
use tokio::sync::mpsc::UnboundedSender;
use crate::handles::handle_agora_call::handle_agora_call;
pub async fn handle_ws_msg(
msg_type: String,
client_message_data: &ClientMessageData,
from_id: String,
event_sender: UnboundedSender<Event>,
connection_time: u128,
) {
let msg_type = client_message_data.msg_type.clone();
match msg_type.as_str() {
"GetCompanyUserList" => {
println!("收到客户端获取在线用户列表 {}", &from_id);
......@@ -30,9 +32,18 @@ pub async fn handle_ws_msg(
}
}
}
"Call" | "CancelCall" => {
handle_agora_call(&msg_type, &from_id, &event_sender, &connection_time).await;
// 通话类消息直接托管给对应句柄即可
"Call" | "CancelCall" | "Refuse" | "EndMeeting" | "Hangup" | "Connect" | "Mute"
| "MuteAll" | "KickOut" | "MuteSelf" | "UnMuteSelf" => {
handle_agora_call(
&client_message_data,
&from_id,
&event_sender,
&connection_time,
)
.await;
}
// 针对其余消息类型,直接根据fromID -> toID规则发送即可
_ => {}
}
}
......@@ -125,7 +125,7 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
}
// 通知所有客户端线程发送 CmdUpdateOnlineUsers 消息
async fn notify_all_clients_to_update_online_users() {
pub async fn notify_all_clients_to_update_online_users() {
println!(
"尝试发送在新用户更新消息体,目前总人数为:{}",
CLIENT_SENDERS.len()
......
use crate::events::Event;
use crate::client::ONLINE_USERS;
use crate::events::{notify_all_clients_to_update_online_users, Event, CLIENT_SENDERS};
use crate::handles::redis::update_client_redis_data;
use crate::typing::message_typed::ClientMessageData;
use crate::utils::json_utils::get_current_timestamp;
use tokio::sync::mpsc::UnboundedSender;
async fn send_inside_message(
target_sender: &(String, u128),
event_sender: &UnboundedSender<Event>,
json_message: String,
from_id: &String,
) {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender.clone(),
json_message,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
} else {
println!("发送给用户id {} 独立消息成功", from_id);
}
}
pub async fn handle_agora_call(
msg_type: &String,
client_message_data: &ClientMessageData,
from_id: &String,
event_sender: &UnboundedSender<Event>,
connection_time: &u128,
) {
let target_sender_which = (from_id.to_string(), connection_time.clone());
// 对于非复杂性数据,不要使用serde_json,直接用format!宏即可
match client_message_data.msg_type.as_str() {
// 取消呼叫
"CancelCall" => {
let calling_to_id = &client_message_data.to_id;
println!(
"收到客户端取消呼叫 取消呼叫组: {} 呼叫方id {}",
calling_to_id,
from_id.to_string()
);
// 直接发送CmdHangup消息给当前用户
let hangup_message_json = serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
"msgData": {
"channelId": "",
"rtcToken": ""
}
})
.to_string();
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender_which.clone(),
hangup_message_json,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", from_id, e);
}
// 对同一个channelID的组,发送挂断消息
if let Some(user_status) = ONLINE_USERS.get(from_id) {
let mut the_current_user_data: Vec<&str> = user_status.split(',').collect();
// 如果当前用户channelID为空,则中断后续操作,并且直接重新赋值即可
if the_current_user_data[1].is_empty() {
the_current_user_data[1] = "";
the_current_user_data[0] = "idle";
the_current_user_data[6] = "0";
ONLINE_USERS.insert(
from_id.to_string(),
the_current_user_data.join(",").to_string(),
);
if let Err(e) = update_client_redis_data(
from_id,
the_current_user_data.join(",").to_string(),
)
.await
{
println!("更新redis数据失败:{:?}", e);
};
notify_all_clients_to_update_online_users().await;
return;
}
// 拆分toId为其他用户id,并且获取其数据信息
let all_to_hangup_id_vec: Vec<&str> = calling_to_id.split(',').collect();
for to_hangup_id in all_to_hangup_id_vec {
// 获取当前用户状态信息成功后,再进行额外操作
if let Some(the_other_caller_data) = ONLINE_USERS.get(to_hangup_id) {
let mut the_other_caller_data: Vec<&str> =
the_other_caller_data.split(',').collect();
// 如果处于同一频道,则发送挂断信息,
if the_other_caller_data[1] == the_current_user_data[1] {
let hangup_message_json = serde_json::json!({
"msgType": "CmdHangup",
"fromID": "0",
"fromName": "Server",
"toID": to_hangup_id,
"msgData": {
"channelId": "",
"rtcToken": ""
}
})
.to_string();
// 找到对应sender,发送CmdHangup消息
let target_sender = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == to_hangup_id) // 使用 entry.key() 访问键
.map(|entry| entry.key().clone());
if let Some(target_sender) = target_sender {
if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender,
hangup_message_json,
false,
)) {
println!("发送给用户id {} 独立消息失败:{:?}", to_hangup_id, e);
// 修改对应sender的数据
the_other_caller_data[0] = "idle";
the_other_caller_data[1] = "";
the_other_caller_data[6] = "0";
ONLINE_USERS.insert(
to_hangup_id.to_string(),
the_other_caller_data.join(",").to_string(),
);
if let Err(e) = update_client_redis_data(
to_hangup_id,
the_other_caller_data.join(",").to_string(),
)
.await
{
println!("更新redis数据失败:{:?}", e);
};
}
} else {
println!("未找到对应的sender");
}
}
}
}
// 要求所有在线用户更新数据
the_current_user_data[1] = "";
the_current_user_data[0] = "idle";
the_current_user_data[6] = "0";
ONLINE_USERS.insert(
from_id.to_string(),
the_current_user_data.join(",").to_string(),
);
if let Err(e) =
update_client_redis_data(from_id, the_current_user_data.join(",").to_string())
.await
{
println!("更新redis数据失败:{:?}", e);
};
notify_all_clients_to_update_online_users().await;
}
}
// 对于通话类消息
"Call" => {
let calling_to_id = &client_message_data.to_id;
println!(
"收到客户端呼叫 呼叫方id {} 呼叫组id {}",
from_id, calling_to_id
);
// 必须要指定呼叫对象
if calling_to_id.is_empty() {
let error_json = format!(
r#"{{"msgType": "Error", "fromID": "0", "fromName": "Server", "msgData": "请指定呼叫对象", "toID": "{}"}}"#,
&from_id
);
send_inside_message(&target_sender_which, &event_sender, error_json, &from_id).await;
return;
}
// 不允许自己跟自己打电话
if calling_to_id.eq(from_id) {
let error_self_json = format!(
r#"{{"msgType": "Error", "fromID": "0", "fromName": "Server", "msgData": "不能给自己打电话", "toID": "{}"}}"#,
&from_id
);
send_inside_message(&target_sender_which, &event_sender, error_self_json, &from_id).await;
return;
}
// 获取当前用户状态信息成功后,再进行额外操作
if let Some(user_status) = ONLINE_USERS.get(from_id) {
// 将获取的字符串用逗号拆分
let mut the_user_data: Vec<&str> = user_status.split(',').collect();
// 获取当前用户channelId
let channel_id_now = if the_user_data[1].is_empty() {
format!("{}-{}", from_id, get_current_timestamp())
} else {
the_user_data[1].to_string()
};
// 注意一下,这里的to_id有可能是多个,需要做一下过滤处理
let calling_to_id_vec: Vec<&str> = calling_to_id.split(',').collect();
// 计数成功呼叫个数
let mut success_full_send_count = 0;
for calling_to_id in calling_to_id_vec {
// 1.检查目标用户是否在线
if !ONLINE_USERS.contains_key(calling_to_id) {
let json_string = format!(
r#"{{"msgType":"Error","fromID":"0","fromName":"Server","msgData":"对方不在线","toID":"{}"}}"#,
&client_message_data.to_id
);
send_inside_message(&target_sender_which, &event_sender, json_string, &from_id).await;
continue;
}
// 2.检查目标用户是否为空闲状态
if let Some(user_status) = ONLINE_USERS.get(calling_to_id) {
// 将获取的字符串用逗号拆分
let mut the_another_user_data: Vec<&str> = user_status.split(',').collect();
// 如果不为idle,不允许后续操作
if the_another_user_data[0] != "idle" {
let json_string = format!(
r#"{{"msgType":"Error","fromID":"0","fromName":"Server","msgData":"对方正在呼叫中","toID":"{}"}}"#,
from_id
);
send_inside_message(&target_sender_which, &event_sender, json_string, &from_id).await;
continue;
}
// 3.如果为空闲状态,准备发送Call类型消息
let call_message_json = serde_json::json!({
"msgType": "Call",
"fromID": from_id,
"fromName": client_message_data.from_name,
"toID": calling_to_id,
"msgData": {
"channelId": channel_id_now,
"rtcToken": ""
}
})
.to_string();
// 通知被呼叫方的sender发送数据
let target_sender = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == calling_to_id) // 使用 entry.key() 访问键
.map(|entry| entry.key().clone()); // 使用 entry.value() 访问值
if let Some(sender) = target_sender {
if let Err(e) = event_sender.send(Event::SendClientMessage(
sender.clone(),
call_message_json,
false,
)) {
println!("发送给用户id {:?} 独立消息失败:{:?}", sender, e);
continue;
}
} else {
println!("找不到对应的sender,无法发送客户端消息!");
continue;
}
println!("发送给用户id {} 独立消息成功", from_id);
// 修改对应用户的状态数据
the_another_user_data[0] = "callin";
the_another_user_data[1] = channel_id_now.as_str();
the_another_user_data[6] = "0";
let the_another_user_data_joined = the_another_user_data.join(",");
// 更新状态数据
ONLINE_USERS.insert(
calling_to_id.to_string(),
the_another_user_data_joined.clone(),
);
// 修改redis数据
if let Err(e) = update_client_redis_data(
calling_to_id,
the_another_user_data_joined.clone(),
)
.await
{
println!("更新redis数据失败:{:?} 用户id {}", e, calling_to_id);
} else {
println!("更新redis数据成功");
}
success_full_send_count += 1;
}
}
// 如果成功发送了呼叫信息,则根据当前用户状态发送消息通知
if success_full_send_count > 0 {
// 如果当前用户本身为idle,则发送CmdCall
if the_user_data[0] == "idle" || the_user_data[0] == "calling" {
let cmd_call_message_json = serde_json::json!({
"msgType": "CmdCall",
"fromID": calling_to_id,
"fromName": client_message_data.from_name,
"toID": from_id,
"msgData": {
"channelId": channel_id_now,
"rtcToken": ""
}
})
.to_string();
// 更新状态数据
the_user_data[0] = "calling";
the_user_data[1] = channel_id_now.as_str();
// 判断是否应当被设置为主持人
// 若同channelID内的所有数据中没有一个isHost为0,则设置当前用户isHost为1,否则为0
if ONLINE_USERS
.iter()
.filter(|entry| {
entry.value().split(',').collect::<Vec<_>>()[1]
== channel_id_now
})
.any(|entry| entry.value().split(',').collect::<Vec<_>>()[6] == "0")
{
the_user_data[6] ="1";
} else {
the_user_data[6] ="0";
}
let the_user_data_joined = the_user_data.join(",");
ONLINE_USERS.insert(from_id.to_string(), the_user_data_joined.clone());
// 修改redis数据
if let Err(e) =
update_client_redis_data(from_id, the_user_data_joined).await
{
println!("更新redis数据失败:{:?} 用户id {}", e, from_id);
} else {
println!("更新redis数据成功");
}
send_inside_message(
&target_sender_which,
event_sender,
cmd_call_message_json,
&from_id,
)
.await;
// 最后广播用户更新
notify_all_clients_to_update_online_users().await;
// 开启定时器线程任务,如果目标用户20s内没有接听,则挂断,并且重置所有数据
}
}
}
}
// 拒接电话
"Refuse" => {
// 当前用户拒接电话之后,直接给当前用户发送CmdHangup消息
// 给toId用户发送CmdRefuse,表示对方拒接了电话
if let Some(current_user_data) = ONLINE_USERS.get(&from_id) {
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
let hangup_refuse_message = serde_json::json!({
"msgType": "CmdRefuse",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
"msgData": {
"channelId": "",
"rtcToken": ""
}
})
.to_string();
// 修改状态数据
current_user_data_vec[0] = "idle";
current_user_data_vec[1] = "";
let current_user_data_joined = current_user_data_vec.join(",");
ONLINE_USERS.insert(from_id.to_string(), current_user_data_joined.clone());
// 修改redis数据
if let Err(e) =
update_client_redis_data(from_id, current_user_data_joined).await
{
println!("更新redis数据失败:{:?} 用户id {}", e, from_id);
} else {
println!("更新redis数据成功");
}
// 直接给sender发送数据通知
send_inside_message(
&target_sender_which,
event_sender,
hangup_refuse_message,
&from_id,
)
.await;
}
// 注意,拒接电话只能有一个toID
// 在这里,toID的状态数据由客户端维护
// 找到对应toId的sender
if let Some(target_sender) = CLIENT_SENDERS
.iter()
.find(|entry| entry.key().0 == client_message_data.to_id.to_string())
.map(|entry| entry.key().clone()) {
// 直接发送CmdHangup数据
let cmd_refuse_message = serde_json::json!({
"msgType": "CmdRefuse",
"fromID": client_message_data.to_id,
"fromName": "Server",
"toID": from_id,
"msgData": {
"channelId": "",
"rtcToken": ""
}
})
.to_string();
send_inside_message(
&target_sender,
event_sender,
cmd_refuse_message,
&client_message_data.to_id
).await;
} else {
println!("找不到toID对应的sender");
}
// 更新所有用户状态数据
notify_all_clients_to_update_online_users().await;
}
// 主持人结束通话
"EndMeeting" => {
// 要求判断是否为主持人,只有主持人可以结束通话
if let Some(current_user_data) = ONLINE_USERS.get(&from_id) {
let mut current_user_data_vec: Vec<&str> = current_user_data.split(',').collect();
// 如果不是主持人,不允许发送该消息
if current_user_data_vec[6] != "1" {
// 发送错误信息
let error_message = serde_json::json!({
"msgType": "Error",
"fromID": "0",
"fromName": "Server",
"toID": from_id,
"msgData": "只有主持人可以结束会议!"
})
.to_string();
send_inside_message(
&target_sender_which,
event_sender,
error_message,
&from_id
).await;
} else {
// 如果是主持人,则向所有人发送CmdEndMeeting消息,并且清理所有数据
}
}
}
_ => {}
}
}
use std::collections::HashMap;
use tokio::net::TcpStream;
use tokio_tungstenite::{accept_hdr_async, WebSocketStream};
use tungstenite::handshake::client::Request;
use tungstenite::handshake::server::Response;
// 提取出来的处理握手的函数
pub(crate) fn handle_handshake(
......@@ -38,3 +42,39 @@ pub(crate) fn handle_handshake(
Ok(connection_params)
}
pub async fn handle_websocket_handshake(
stream: TcpStream,
must_existed_params: &[&str],
static_ws_pwd: &str,
connection_params: &mut Option<HashMap<String, String>>
) -> Result<WebSocketStream<TcpStream>, ()> {
let ws_stream = match accept_hdr_async(stream, |req: &Request, resp| {
match handle_handshake(req, must_existed_params, static_ws_pwd) {
Ok(params) => {
*connection_params = Some(params);
Ok(resp)
}
Err(error_msg) => {
println!("{}", error_msg);
let error_resp = Response::builder()
.status(400)
.header("Content-Type", "text/plain")
.body(Some(error_msg))
.unwrap();
Err(error_resp)
}
}
})
.await
{
Ok(ws) => ws,
Err(e) => {
println!("WebSocket握手失败: {}", e);
return Err(());
}
};
Ok(ws_stream)
}
......@@ -31,6 +31,18 @@ pub async fn remove_this_connection(from_id: &str) -> Result<(), redis::RedisErr
Ok(())
}
pub async fn update_client_redis_data(from_id: &str, data_str: String) -> Result<(), redis::RedisError> {
// 修改对应用户redis数据
let mut con = REDIS_POOL
.get_connection()
.expect("Failed to get Redis connection");
if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &data_str) {
println!("修改 Redis 中的 onlineUsers 哈希表时出错: {}", e);
return Err(e);
}
Ok(())
}
// 将当前用户的信息插入到 Redis 的 onlineUsers 集合中
pub async fn insert_this_connection(
from_id: &str,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment