From 00dd8a9abee6b9f0cfc37c6f20f30f0d99dfe91a Mon Sep 17 00:00:00 2001 From: fx Date: Wed, 25 Oct 2023 12:53:31 +0200 Subject: runs, no error handling --- src/services/ping.rs | 79 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 23 deletions(-) (limited to 'src/services') diff --git a/src/services/ping.rs b/src/services/ping.rs index ff328a5..e3d465d 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,26 +1,29 @@ use std::borrow::Cow; +use std::collections::HashMap; use std::sync::Arc; -use axum::{extract::{WebSocketUpgrade, ws::WebSocket, State}, response::Response}; +use axum::extract::{ws::WebSocket}; +use axum::extract::ws::Message; use tokio::sync::broadcast::{Sender}; -use tracing::{debug, error, trace}; +use tokio::sync::Mutex; +use tracing::{debug, error, trace, warn}; -use crate::{error::WebolError, AppState}; +use crate::error::WebolError; -pub async fn spawn(tx: Sender) -> Result<(), WebolError> { +pub async fn spawn(tx: Sender, ip: String) -> Result<(), WebolError> { let payload = [0; 8]; let mut cont = true; while cont { let ping = surge_ping::ping( - "127.0.0.1".parse().map_err(WebolError::IpParse)?, + ip.parse().map_err(WebolError::IpParse)?, &payload ).await; if let Err(ping) = ping { cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); - debug!("{}", cont); + // debug!("{}", cont); if !cont { return Err(ping).map_err(WebolError::Ping) @@ -31,29 +34,59 @@ pub async fn spawn(tx: Sender) -> Result<(), WebolError> { debug!("Ping took {:?}", duration); cont = false; // FIXME: remove unwrap - tx.send("Got ping".to_string()).unwrap(); + // FIXME: if error: SendError because no listener, then handle the entry directly + tx.send(ip.clone()); }; } Ok(()) } -// TODO: Status to routes, websocket here -pub async fn ws_ping(State(state): State>, ws: WebSocketUpgrade) -> Response { - ws.on_upgrade(move |socket| handle_socket(socket, state.ping_send.clone())) -} - // FIXME: Handle commands through enum -async fn handle_socket(mut socket: WebSocket, tx: Sender) { - // TODO: Understand Cow - while let message = tx.subscribe().recv().await.unwrap() { - trace!("GOT = {}", message); - if &message == "Got ping" { - break; +pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_map: Arc>>) { + warn!("{:?}", ping_map); + + let mut uuid: Option = None; + + trace!("wait for ws message (uuid)"); + let msg = socket.recv().await; + uuid = Some(msg.unwrap().unwrap().into_text().unwrap()); + + let uuid = uuid.unwrap(); + + trace!("Search for uuid: {:?}", uuid); + + let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); + + trace!("got device: {:?}", device); + + match device.1 { + true => { + debug!("already started"); + socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + socket.close().await.unwrap(); + }, + false => { + let ip = device.0.to_owned(); + let mut i = 0; + loop{ + trace!("{}", i); + // TODO: Check if older than 10 minutes, close if true + trace!("wait for tx message"); + let message = tx.subscribe().recv().await.unwrap(); + trace!("GOT = {}", message); + if message == ip { + trace!("message == ip"); + break; + } + i += 1; + }; + + socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + socket.close().await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + ping_map.lock().await.remove(&uuid); + warn!("{:?}", ping_map); } - }; - match socket.send(axum::extract::ws::Message::Close(Some(axum::extract::ws::CloseFrame { code: 4000, reason: Cow::Owned("started".to_owned()) }))).await.map_err(WebolError::Axum) { - Ok(..) => (), - Err(err) => { error!("Server Error: {:?}", err) } - }; + } } \ No newline at end of file -- cgit v1.2.3