From 0cca10290d089aabac8f2e4356cfaf80f06ae194 Mon Sep 17 00:00:00 2001 From: FxQnLr <felixquinn03@gmail.com> Date: Sun, 29 Oct 2023 19:55:26 +0100 Subject: does what is expected, but badly --- src/error.rs | 20 ++++++++++++---- src/main.rs | 5 ++-- src/routes/start.rs | 5 ++-- src/routes/status.rs | 2 -- src/services/ping.rs | 68 ++++++++++++++++++++++++++++++---------------------- 5 files changed, 60 insertions(+), 40 deletions(-) diff --git a/src/error.rs b/src/error.rs index f143ee9..1592a78 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,19 +10,20 @@ use crate::auth::AuthError; #[derive(Debug)] pub enum WebolError { Generic, + // User(UserError), Auth(AuthError), Ping(surge_ping::SurgeError), DB(sqlx::Error), IpParse(<std::net::IpAddr as std::str::FromStr>::Err), BufferParse(std::num::ParseIntError), Broadcast(io::Error), - Axum(axum::Error) } impl IntoResponse for WebolError { fn into_response(self) -> Response { let (status, error_message) = match self { Self::Auth(err) => err.get(), + // Self::User(err) => err.get(), Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), Self::Ping(err) => { error!("Ping: {}", err.source().unwrap()); @@ -44,10 +45,6 @@ impl IntoResponse for WebolError { error!("server error: {}", err.to_string()); (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") }, - Self::Axum(err) => { - error!("server error: {}", err.to_string()); - (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") - }, }; let body = Json(json!({ "error": error_message, @@ -55,3 +52,16 @@ impl IntoResponse for WebolError { (status, body).into_response() } } + +// #[derive(Debug)] +// pub enum UserError { +// UnknownUUID, +// } +// +// impl UserError { +// pub fn get(self) -> (StatusCode, &'static str) { +// match self { +// Self::UnknownUUID => (StatusCode::UNPROCESSABLE_ENTITY, "Unknown UUID"), +// } +// } +// } diff --git a/src/main.rs b/src/main.rs index 854b59d..545d8fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use crate::db::init_db_pool; use crate::routes::device::{get_device, post_device, put_device}; use crate::routes::start::start; use crate::routes::status::status; +use crate::services::ping::{BroadcastCommands, PingMap}; mod auth; mod config; @@ -72,6 +73,6 @@ async fn main() { pub struct AppState { db: PgPool, - ping_send: Sender<String>, - ping_map: Arc<Mutex<HashMap<String, (String, bool)>>>, + ping_send: Sender<BroadcastCommands>, + ping_map: PingMap, } \ No newline at end of file diff --git a/src/routes/start.rs b/src/routes/start.rs index 45e7ec8..b1c8a73 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -44,15 +44,16 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap let uuid = if payload.ping.is_some_and(|ping| ping) { let uuid_gen = Uuid::new_v4().to_string(); let uuid_genc = uuid_gen.clone(); + let uuid_gencc = uuid_gen.clone(); tokio::spawn(async move{ debug!("Init ping service"); state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); warn!("{:?}", state.ping_map); - crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string()).await; + crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string(), uuid_genc.clone(), state.ping_map.clone()).await }); - Some(uuid_genc) + Some(uuid_gencc) } else { None }; Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) } else { diff --git a/src/routes/status.rs b/src/routes/status.rs index cdecf6a..4a5ec67 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs @@ -1,12 +1,10 @@ use std::sync::Arc; use axum::extract::{State, WebSocketUpgrade}; use axum::response::Response; -use serde::Deserialize; use crate::AppState; use crate::services::ping::status_websocket; #[axum_macros::debug_handler] pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { - // TODO: remove unwrap ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) } \ No newline at end of file diff --git a/src/services/ping.rs b/src/services/ping.rs index e3d465d..6835fc0 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -3,16 +3,19 @@ use std::collections::HashMap; use std::sync::Arc; use axum::extract::{ws::WebSocket}; -use axum::extract::ws::Message; +use axum::extract::ws::{CloseFrame, Message}; use tokio::sync::broadcast::{Sender}; use tokio::sync::Mutex; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, trace, warn}; use crate::error::WebolError; -pub async fn spawn(tx: Sender<String>, ip: String) -> Result<(), WebolError> { +pub type PingMap = Arc<Mutex<HashMap<String, (String, bool)>>>; + +pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { let payload = [0; 8]; + // TODO: Better while let mut cont = true; while cont { let ping = surge_ping::ping( @@ -22,40 +25,44 @@ pub async fn spawn(tx: Sender<String>, ip: String) -> Result<(), WebolError> { if let Err(ping) = ping { cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); - - // debug!("{}", cont); - if !cont { return Err(ping).map_err(WebolError::Ping) } - } else { let (_, duration) = ping.unwrap(); debug!("Ping took {:?}", duration); cont = false; - // FIXME: remove unwrap - // FIXME: if error: SendError because no listener, then handle the entry directly - tx.send(ip.clone()); + handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await; }; } Ok(()) } -// FIXME: Handle commands through enum -pub async fn status_websocket(mut socket: WebSocket, tx: Sender<String>, ping_map: Arc<Mutex<HashMap<String, (String, bool)>>>) { - warn!("{:?}", ping_map); +async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: PingMap, uuid: String) { + debug!("sending pingsuccess message"); + ping_map.lock().await.insert(uuid.clone(), (ip.clone(), true)); + let _ = tx.send(BroadcastCommands::PingSuccess(ip)); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + trace!("remove {} from ping_map", uuid); + ping_map.lock().await.remove(&uuid); +} - let mut uuid: Option<String> = None; +#[derive(Clone, Debug)] +pub enum BroadcastCommands { + PingSuccess(String) +} + +pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommands>, ping_map: PingMap) { + warn!("{:?}", ping_map); trace!("wait for ws message (uuid)"); let msg = socket.recv().await; - uuid = Some(msg.unwrap().unwrap().into_text().unwrap()); - - let uuid = uuid.unwrap(); + let uuid = msg.unwrap().unwrap().into_text().unwrap(); trace!("Search for uuid: {:?}", uuid); + // TODO: Handle Error let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); trace!("got device: {:?}", device); @@ -63,29 +70,32 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<String>, ping_ma match device.1 { true => { debug!("already started"); - socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); - socket.close().await.unwrap(); + // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + // socket.close().await.unwrap(); + socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); }, false => { let ip = device.0.to_owned(); - let mut i = 0; loop{ - trace!("{}", i); - // TODO: Check if older than 10 minutes, close if true trace!("wait for tx message"); let message = tx.subscribe().recv().await.unwrap(); - trace!("GOT = {}", message); - if message == ip { + trace!("GOT = {:?}", message); + // if let BroadcastCommands::PingSuccess(msg_ip) = message { + // if msg_ip == ip { + // trace!("message == ip"); + // break; + // } + // } + let BroadcastCommands::PingSuccess(msg_ip) = message; + if msg_ip == ip { trace!("message == ip"); break; } - i += 1; }; - socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); - socket.close().await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - ping_map.lock().await.remove(&uuid); + socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); + // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + // socket.close().await.unwrap(); warn!("{:?}", ping_map); } } -- cgit v1.2.3