Commit 6ebcc949 by qlintonger xeno

基本消息类型完成,准备测试一下+4

parent 5328a85f
...@@ -9,7 +9,7 @@ use dashmap::DashMap; ...@@ -9,7 +9,7 @@ use dashmap::DashMap;
use futures::{SinkExt, StreamExt}; use futures::{SinkExt, StreamExt};
use lazy_static::lazy_static; use lazy_static::lazy_static;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc::{Receiver, Sender, UnboundedSender}; use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender};
use tokio::sync::watch; use tokio::sync::watch;
use tokio::time; use tokio::time;
use tungstenite::{Error, Message}; use tungstenite::{Error, Message};
...@@ -21,8 +21,8 @@ lazy_static! { ...@@ -21,8 +21,8 @@ lazy_static! {
pub(crate) async fn handle_client( pub(crate) async fn handle_client(
stream: tokio::net::TcpStream, stream: tokio::net::TcpStream,
event_sender: UnboundedSender<Event>, event_sender: UnboundedSender<Event>,
center_to_client_sender: Sender<ClientMessage>, center_to_client_sender: UnboundedSender<ClientMessage>,
mut center_to_client_receiver: Receiver<ClientMessage>, mut center_to_client_receiver: UnboundedReceiver<ClientMessage>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let must_existed_params = ["deviceID", "fromID", "wsPwd"]; let must_existed_params = ["deviceID", "fromID", "wsPwd"];
let mut connection_params = None; let mut connection_params = None;
...@@ -82,6 +82,7 @@ pub(crate) async fn handle_client( ...@@ -82,6 +82,7 @@ pub(crate) async fn handle_client(
} }
ClientMessage::SendClientMessage(from_id, client_message, close) => { ClientMessage::SendClientMessage(from_id, client_message, close) => {
let real_user_id = from_id.0; let real_user_id = from_id.0;
println!("消息中心:==> 收到消息 {:?} 发送给 {}", &client_message, real_user_id.clone());
if let Err(e) = sender.send(Message::text(client_message)).await { if let Err(e) = sender.send(Message::text(client_message)).await {
println!("发送给用户id {} 独立消息失败:{:?}", real_user_id, e); println!("发送给用户id {} 独立消息失败:{:?}", real_user_id, e);
break; break;
...@@ -113,7 +114,11 @@ pub(crate) async fn handle_client( ...@@ -113,7 +114,11 @@ pub(crate) async fn handle_client(
println!("收到客户端心跳消息 {:?}", &data); println!("收到客户端心跳消息 {:?}", &data);
last_heartbeat_time = Instant::now(); last_heartbeat_time = Instant::now();
if let Ok(json_str) = heart_resp(&from_id) { if let Ok(json_str) = heart_resp(&from_id) {
event_sender.clone().send(Event::SendClientMessage((from_id.clone(), connection_time), json_str, false)).unwrap(); if let Err(e) = event_sender.clone().send(Event::SendClientMessage((from_id.clone(), connection_time), json_str, false)){
println!("处理心跳消息出错了:{:?}", e)
} else {
println!("处理心跳消息成功")
}
} }
}, },
_ => { _ => {
......
...@@ -22,14 +22,14 @@ pub enum ClientMessage { ...@@ -22,14 +22,14 @@ pub enum ClientMessage {
} }
lazy_static! { lazy_static! {
pub static ref CLIENT_SENDERS: DashMap<(String, u128), mpsc::Sender<ClientMessage>> = pub static ref CLIENT_SENDERS: DashMap<(String, u128), mpsc::UnboundedSender<ClientMessage>> =
DashMap::new(); DashMap::new();
} }
// 注册客户端的发送者 // 注册客户端的发送者
pub async fn register_client( pub async fn register_client(
from_id: (String, u128), from_id: (String, u128),
center_to_client_sender: mpsc::Sender<ClientMessage>, center_to_client_sender: mpsc::UnboundedSender<ClientMessage>,
) { ) {
close_existing_connection(&from_id, true).await; close_existing_connection(&from_id, true).await;
println!("注册用户 {:?} 前数量 {}", &from_id, CLIENT_SENDERS.len()); println!("注册用户 {:?} 前数量 {}", &from_id, CLIENT_SENDERS.len());
...@@ -60,7 +60,7 @@ pub async fn close_existing_connection(from_id: &(String, u128), close_old: bool ...@@ -60,7 +60,7 @@ pub async fn close_existing_connection(from_id: &(String, u128), close_old: bool
msg_data: Null, msg_data: Null,
to_id: key.0.clone(), to_id: key.0.clone(),
}; };
if let Err(e) = sender.send(ClientMessage::SendClientMessage(key.clone(), serde_json::to_string(&other_login_msg).unwrap(), true)).await { if let Err(e) = sender.send(ClientMessage::SendClientMessage(key.clone(), serde_json::to_string(&other_login_msg).unwrap(), true)) {
println!("通知客户端 {:?} 关闭连接失败: {:?}", key, e); println!("通知客户端 {:?} 关闭连接失败: {:?}", key, e);
} else { } else {
println!("通知客户端 {:?} 关闭连接成功", key); println!("通知客户端 {:?} 关闭连接成功", key);
...@@ -80,7 +80,7 @@ pub async fn close_existing_connection(from_id: &(String, u128), close_old: bool ...@@ -80,7 +80,7 @@ pub async fn close_existing_connection(from_id: &(String, u128), close_old: bool
msg_data: Null, msg_data: Null,
to_id: real_user_id.clone(), to_id: real_user_id.clone(),
}; };
if let Err(e) = sender.send(ClientMessage::SendClientMessage(from_id.clone(), serde_json::to_string(&other_login_msg).unwrap(), true)).await { if let Err(e) = sender.send(ClientMessage::SendClientMessage(from_id.clone(), serde_json::to_string(&other_login_msg).unwrap(), true)) {
println!("通知客户端 {:?} 关闭连接失败: {:?}", from_id, e); println!("通知客户端 {:?} 关闭连接失败: {:?}", from_id, e);
} else { } else {
println!("通知客户端 {:?} 关闭连接成功", from_id); println!("通知客户端 {:?} 关闭连接成功", from_id);
...@@ -110,10 +110,12 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) { ...@@ -110,10 +110,12 @@ pub async fn handle_events(mut receiver: mpsc::UnboundedReceiver<Event>) {
notify_all_clients_to_update_online_users().await; notify_all_clients_to_update_online_users().await;
} }
Event::SendClientMessage(from_id, msg, close) => { Event::SendClientMessage(from_id, msg, close) => {
println!("事件中心代发消息 ===>> 发送消息 {:?} 到客户端 {:?}", msg, from_id);
let target_sender = CLIENT_SENDERS.get(&from_id).unwrap(); let target_sender = CLIENT_SENDERS.get(&from_id).unwrap();
println!("成功获取到sender标记");
if let Err(e) = target_sender if let Err(e) = target_sender
.send(ClientMessage::SendClientMessage(from_id, msg, close)) .send(ClientMessage::SendClientMessage(from_id, msg, close))
.await
{ {
println!("通知对应sender发送ws消息失败 {}", e); println!("通知对应sender发送ws消息失败 {}", e);
} else { } else {
...@@ -131,9 +133,9 @@ pub async fn notify_all_clients_to_update_online_users() { ...@@ -131,9 +133,9 @@ pub async fn notify_all_clients_to_update_online_users() {
CLIENT_SENDERS.len() CLIENT_SENDERS.len()
); );
for entry in CLIENT_SENDERS.iter() { for entry in CLIENT_SENDERS.iter() {
let sender: &mpsc::Sender<ClientMessage> = entry.value(); let sender: &mpsc::UnboundedSender<ClientMessage> = entry.value();
let from_id = entry.key(); let from_id = entry.key();
if let Err(e) = sender.send(ClientMessage::CmdUpdateOnlineUsers).await { if let Err(e) = sender.send(ClientMessage::CmdUpdateOnlineUsers) {
println!("通知客户端 {:?} 更新在线用户列表失败: {:?}", from_id, e); println!("通知客户端 {:?} 更新在线用户列表失败: {:?}", from_id, e);
} else { } else {
println!("通知客户端 {:?} 更新在线用户列表成功 ===> $$$", from_id); println!("通知客户端 {:?} 更新在线用户列表成功 ===> $$$", from_id);
......
...@@ -23,6 +23,7 @@ async fn send_inside_message( ...@@ -23,6 +23,7 @@ async fn send_inside_message(
json_message: String, json_message: String,
from_id: &String, from_id: &String,
) { ) {
println!("发送给用户id {} 的消息 {}", from_id, json_message);
if let Err(e) = event_sender.send(Event::SendClientMessage( if let Err(e) = event_sender.send(Event::SendClientMessage(
target_sender.clone(), target_sender.clone(),
json_message, json_message,
...@@ -221,6 +222,7 @@ pub async fn handle_agora_call( ...@@ -221,6 +222,7 @@ pub async fn handle_agora_call(
let calling_to_id_vec: Vec<&str> = calling_to_id.split(',').collect(); let calling_to_id_vec: Vec<&str> = calling_to_id.split(',').collect();
// 储存需要计入CmdRefuse线程用户 // 储存需要计入CmdRefuse线程用户
let mut refuse_thread_users: Vec<String> = vec![]; let mut refuse_thread_users: Vec<String> = vec![];
println!("当前用户channelId {} 呼叫方id集合是 {:?}", channel_id_now, calling_to_id_vec);
for calling_to_id in calling_to_id_vec { for calling_to_id in calling_to_id_vec {
// 1.检查目标用户是否在线 // 1.检查目标用户是否在线
if !ONLINE_USERS.contains_key(calling_to_id) { if !ONLINE_USERS.contains_key(calling_to_id) {
...@@ -276,11 +278,13 @@ pub async fn handle_agora_call( ...@@ -276,11 +278,13 @@ pub async fn handle_agora_call(
if let Some(sender) = target_sender { if let Some(sender) = target_sender {
if let Err(e) = event_sender.send(Event::SendClientMessage( if let Err(e) = event_sender.send(Event::SendClientMessage(
sender.clone(), sender.clone(),
call_message_json, call_message_json.clone(),
false, false,
)) { )) {
println!("发送给用户id {:?} 独立消息失败:{:?}", sender, e); println!("发送给用户id {:?} 独立消息失败:{:?}", sender, e);
continue; continue;
} else {
println!("发送给用户id {} 独立消息成功 消息内容 {}", calling_to_id, call_message_json);
} }
} else { } else {
println!("找不到对应的sender,无法发送客户端消息!"); println!("找不到对应的sender,无法发送客户端消息!");
......
...@@ -76,7 +76,7 @@ pub async fn insert_this_connection( ...@@ -76,7 +76,7 @@ pub async fn insert_this_connection(
// callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName // callState,channelID,deviceID,fromID,hasCamera,hasMike,isHost,userCallGroup,fromName
let user_info_str = format!( let user_info_str = format!(
"{},{},{},{},{},{},0,{},{}", "{},{},{},{},{},{},0,{},{}",
"idle", "", from_id, device_id, has_camera, has_mike, user_call_group, from_name "idle", "", device_id, from_id, has_camera, has_mike, user_call_group, from_name
); );
if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &user_info_str) { if let Err(e) = con.hset::<&str, &str, &str, ()>("onlineUsers", from_id, &user_info_str) {
......
...@@ -24,7 +24,7 @@ async fn main() { ...@@ -24,7 +24,7 @@ async fn main() {
while let Ok((stream, _)) = listener.accept().await { while let Ok((stream, _)) = listener.accept().await {
let client_event_sender = event_sender.clone(); let client_event_sender = event_sender.clone();
// 创建一个用于事件中心向客户端发送消息的通道 // 创建一个用于事件中心向客户端发送消息的通道
let (center_to_client_sender, center_to_client_receiver) = mpsc::channel(10); let (center_to_client_sender, center_to_client_receiver) = mpsc::unbounded_channel();
tokio::spawn(handle_client( tokio::spawn(handle_client(
stream, stream,
client_event_sender, client_event_sender,
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment