Commit 007799d2 by qlintonger xeno

setup-first-done

parent fa206dac
......@@ -4,14 +4,9 @@ version = "0.1.0"
edition = "2021"
[dependencies]
actix-web = "4"
actix-ws = "0.3.0"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread", "macros"] }
tokio = { version = "1.42.0", features = ["rt", "rt-multi-thread", "macros", "time"] }
tokio-tungstenite = "0.26.1"
tungstenite = "0.26.1"
futures = "0.3.31"
url = "2.5.4"
log = "0.4.22"
env_logger = "0.10.0"
\ No newline at end of file
futures = "0.3.31"
\ No newline at end of file
use crate::handle_messages::handle_other_message;
use crate::heartbeat::handle_heartbeat;
use crate::json_utils::{make_common_resp, parse_message};
use crate::utils;
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
use std::time::{Duration, Instant};
use tokio::time;
use tokio_tungstenite::accept_hdr_async;
use tungstenite::handshake::server::{Request, Response};
use tungstenite::{Error, Message};
use crate::utils;
// 提取出来的处理握手的函数
fn handle_handshake(
req: &Request,
must_existed_params: &[&str],
) -> Result<HashMap<String, String>, String> {
println!("新客户端连接: {}", req.uri());
let connection_params = match utils::get_connection_params(req.uri().to_string()) {
Ok(p) => p,
Err(e) => {
let error_msg = format!("缺少重要连接数据段: {}", e);
println!("{}", error_msg);
return Err(error_msg);
}
};
let not_existed = must_existed_params
.iter()
.find(|param| !connection_params.contains_key(&param.to_string()));
if let Some(param) = not_existed {
let error_msg = format!("缺少重要连接数据段: {}", param);
println!("{}", error_msg);
return Err(error_msg);
}
Ok(connection_params)
}
pub(crate) async fn handle_client(stream: tokio::net::TcpStream) -> Result<(), Error> {
let must_existed_params = vec!["deviceId", "fromId", "wsPwd"];
let mut ws_stream = accept_hdr_async(stream, |req: &Request, resp: Response| {
println!("新客户端连接: {}", req.uri());
let connection_params = match utils::get_connection_params(req.uri().to_string()) {
Ok(p) => p,
Err(e) => {
println!("缺少重要连接数据段: {}", e);
return Err(Error::ConnectionClosed);
let must_existed_params = ["deviceId", "fromId", "wsPwd"];
let mut connection_params = None;
let ws_stream = match accept_hdr_async(stream, |req: &Request, resp| {
match handle_handshake(req, &must_existed_params) {
Ok(params) => {
connection_params = Some(params);
Ok(resp)
}
Err(error_msg) => {
// 可以在这里根据需要进一步处理错误响应,这里简单打印错误信息
println!("{}", error_msg);
let error_resp = Response::builder()
.status(400)
.header("Content-Type", "text/plain")
.body(Some(error_msg))
.unwrap();
Err(error_resp)
}
};
let not_existed = must_existed_params.find(|param| !connection_params.contains_key(param));
if not_existed != None {
println!("缺少重要连接数据段: {}", not_existed);
return Err(Error::ConnectionClosed);
}
Ok(resp)
})
.await
.expect("WebSocket handshake error");
while let Some(msg) = ws_stream.next().await {
let msg = msg?;
if msg.is_text() {
println!("Client message: {}", msg.to_text()?);
ws_stream
.send(Message::text(format!("Your message is {}", msg)))
.await?;
.await
{
Ok(ws) => ws,
Err(e) => {
println!("WebSocket握手失败: {}", e);
return Ok(());
}
};
// 将 WebSocketStream 拆分为发送器和接收器
let (mut sender, mut receiver) = ws_stream.split();
if let Some(params) = connection_params {
if let Some(from_id) = params.get("fromId") {
let from_id = from_id.clone();
let from_name = params.get("fromName").clone();
let mut last_heartbeat_time = Instant::now();
loop {
tokio::select! {
// 处理消息接收
maybe_msg = receiver.next() => {
match maybe_msg {
Some(Ok(msg)) => {
if msg.is_text() {
let text = msg.to_text()?;
match parse_message(text) {
Ok(data) => {
match data.msg_type.as_str() {
"Heart" => {
println!("收到客户端心跳消息 {}", &data);
handle_heartbeat(&mut last_heartbeat_time);
if let Ok(json_str) = make_common_resp(Default::default(), "Heart") {
sender.send(Message::text(json_str)).await?;
}
},
_ => {
handle_other_message(&mut sender, &data).await?;
}
}
}
Err(e) => {
println!("解析JSON数据出错: {}", e);
}
}
}
}
Some(Err(e)) => {
println!("接受客户端消息出错: {}", e);
break;
}
None => {
println!("客户端断开连接");
break;
}
}
}
// 处理心跳超时
_ = time::sleep_until(tokio::time::Instant::from(last_heartbeat_time + Duration::from_secs(20))) => {
println!("用户id-{},用户名-{} 20秒内没有发送心跳,挂断连接", from_id, from_name);
break;
}
}
}
println!("断开与用户id: {},用户名:{}连接", from_id, from_name);
}
} else {
println!("无法获取连接参数");
}
println!("Client finished");
Ok(())
}
use crate::json_utils::{make_common_resp, MessageData};
use futures::SinkExt;
use tungstenite::{Error, Message};
// 处理其他类型消息
pub(crate) async fn handle_other_message(
// 增加 + std::marker::Unpin 限制
sender: &mut (impl SinkExt<Message, Error = Error> + std::marker::Unpin),
data: &MessageData,
) -> Result<(), Error> {
println!("收到客户端消息: {:?}", data);
if let Ok(json_str) = make_common_resp(Default::default(), "") {
sender.send(Message::text(json_str)).await?;
}
Ok(())
}
\ No newline at end of file
use std::time::Instant;
pub fn handle_heartbeat(last_time: &mut Instant) {
*last_time = Instant::now();
}
\ No newline at end of file
use serde::{Deserialize, Serialize};
use serde_json::Result;
// 定义消息结构
#[derive(Serialize, Deserialize, Debug)]
pub struct MessageData {
#[serde(rename = "msgType")]
pub msg_type: String,
#[serde(rename = "fromId")]
pub from_id: String,
#[serde(rename = "fromName")]
pub from_name: String,
#[serde(rename = "msgData")]
pub msg_data: serde_json::Value,
}
pub fn make_common_resp(value: serde_json::Value, msg_type: &str) -> Result<String> {
serialize_message(&MessageData {
msg_type: msg_type.to_string(),
from_id: "0".to_string(),
from_name: "Server".to_string(),
msg_data: value,
})
}
// 解析 JSON 消息
pub fn parse_message(json_str: &str) -> Result<MessageData> {
serde_json::from_str(json_str)
}
// 序列化消息为 JSON 字符串
pub fn serialize_message(msg: &MessageData) -> Result<String> {
serde_json::to_string(msg)
}
\ No newline at end of file
extern crate core;
mod client;
mod utils;
mod json_utils;
mod heartbeat;
mod handle_messages;
use client::handle_client;
use tokio::net::TcpListener;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment