From dcfb83fb2069bfcf4642b03453253e35479bf3da Mon Sep 17 00:00:00 2001 From: fx Date: Tue, 24 Oct 2023 01:15:22 +0200 Subject: first ping impl baseline, doesnt work --- src/error.rs | 37 +++++++++++++++++++++++++++++++------ 1 file changed, 31 insertions(+), 6 deletions(-) (limited to 'src/error.rs') diff --git a/src/error.rs b/src/error.rs index db2fc86..f143ee9 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,5 @@ use std::error::Error; +use std::io; use axum::http::StatusCode; use axum::Json; use axum::response::{IntoResponse, Response}; @@ -8,21 +9,45 @@ use crate::auth::AuthError; #[derive(Debug)] pub enum WebolError { - Auth(AuthError), Generic, - Server(Box), + Auth(AuthError), + Ping(surge_ping::SurgeError), + DB(sqlx::Error), + IpParse(::Err), + BufferParse(std::num::ParseIntError), + Broadcast(io::Error), + Axum(axum::Error) } impl IntoResponse for WebolError { fn into_response(self) -> Response { let (status, error_message) = match self { - WebolError::Auth(err) => err.get(), - WebolError::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), - WebolError::Server(err) => { + Self::Auth(err) => err.get(), + Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), + Self::Ping(err) => { + error!("Ping: {}", err.source().unwrap()); + (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") + }, + Self::IpParse(err) => { + error!("server error: {}", err.to_string()); + (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") + }, + Self::DB(err) => { + error!("server error: {}", err.to_string()); + (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") + }, + Self::Broadcast(err) => { + error!("server error: {}", err.to_string()); + (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") + }, + Self::BufferParse(err) => { + error!("server error: {}", err.to_string()); + (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") + }, + Self::Axum(err) => { error!("server error: {}", err.to_string()); (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") }, - }; let body = Json(json!({ "error": error_message, -- cgit v1.2.3 From 0cca10290d089aabac8f2e4356cfaf80f06ae194 Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Sun, 29 Oct 2023 19:55:26 +0100 Subject: does what is expected, but badly --- src/error.rs | 20 ++++++++++++---- src/main.rs | 5 ++-- src/routes/start.rs | 5 ++-- src/routes/status.rs | 2 -- src/services/ping.rs | 68 ++++++++++++++++++++++++++++++---------------------- 5 files changed, 60 insertions(+), 40 deletions(-) (limited to 'src/error.rs') diff --git a/src/error.rs b/src/error.rs index f143ee9..1592a78 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,19 +10,20 @@ use crate::auth::AuthError; #[derive(Debug)] pub enum WebolError { Generic, + // User(UserError), Auth(AuthError), Ping(surge_ping::SurgeError), DB(sqlx::Error), IpParse(::Err), BufferParse(std::num::ParseIntError), Broadcast(io::Error), - Axum(axum::Error) } impl IntoResponse for WebolError { fn into_response(self) -> Response { let (status, error_message) = match self { Self::Auth(err) => err.get(), + // Self::User(err) => err.get(), Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), Self::Ping(err) => { error!("Ping: {}", err.source().unwrap()); @@ -44,10 +45,6 @@ impl IntoResponse for WebolError { error!("server error: {}", err.to_string()); (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") }, - Self::Axum(err) => { - error!("server error: {}", err.to_string()); - (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") - }, }; let body = Json(json!({ "error": error_message, @@ -55,3 +52,16 @@ impl IntoResponse for WebolError { (status, body).into_response() } } + +// #[derive(Debug)] +// pub enum UserError { +// UnknownUUID, +// } +// +// impl UserError { +// pub fn get(self) -> (StatusCode, &'static str) { +// match self { +// Self::UnknownUUID => (StatusCode::UNPROCESSABLE_ENTITY, "Unknown UUID"), +// } +// } +// } diff --git a/src/main.rs b/src/main.rs index 854b59d..545d8fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use crate::db::init_db_pool; use crate::routes::device::{get_device, post_device, put_device}; use crate::routes::start::start; use crate::routes::status::status; +use crate::services::ping::{BroadcastCommands, PingMap}; mod auth; mod config; @@ -72,6 +73,6 @@ async fn main() { pub struct AppState { db: PgPool, - ping_send: Sender, - ping_map: Arc>>, + ping_send: Sender, + ping_map: PingMap, } \ No newline at end of file diff --git a/src/routes/start.rs b/src/routes/start.rs index 45e7ec8..b1c8a73 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -44,15 +44,16 @@ pub async fn start(State(state): State>, headers: HeaderMap let uuid = if payload.ping.is_some_and(|ping| ping) { let uuid_gen = Uuid::new_v4().to_string(); let uuid_genc = uuid_gen.clone(); + let uuid_gencc = uuid_gen.clone(); tokio::spawn(async move{ debug!("Init ping service"); state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); warn!("{:?}", state.ping_map); - crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string()).await; + crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string(), uuid_genc.clone(), state.ping_map.clone()).await }); - Some(uuid_genc) + Some(uuid_gencc) } else { None }; Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) } else { diff --git a/src/routes/status.rs b/src/routes/status.rs index cdecf6a..4a5ec67 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs @@ -1,12 +1,10 @@ use std::sync::Arc; use axum::extract::{State, WebSocketUpgrade}; use axum::response::Response; -use serde::Deserialize; use crate::AppState; use crate::services::ping::status_websocket; #[axum_macros::debug_handler] pub async fn status(State(state): State>, ws: WebSocketUpgrade) -> Response { - // TODO: remove unwrap ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) } \ No newline at end of file diff --git a/src/services/ping.rs b/src/services/ping.rs index e3d465d..6835fc0 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -3,16 +3,19 @@ use std::collections::HashMap; use std::sync::Arc; use axum::extract::{ws::WebSocket}; -use axum::extract::ws::Message; +use axum::extract::ws::{CloseFrame, Message}; use tokio::sync::broadcast::{Sender}; use tokio::sync::Mutex; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, trace, warn}; use crate::error::WebolError; -pub async fn spawn(tx: Sender, ip: String) -> Result<(), WebolError> { +pub type PingMap = Arc>>; + +pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { let payload = [0; 8]; + // TODO: Better while let mut cont = true; while cont { let ping = surge_ping::ping( @@ -22,40 +25,44 @@ pub async fn spawn(tx: Sender, ip: String) -> Result<(), WebolError> { if let Err(ping) = ping { cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); - - // debug!("{}", cont); - if !cont { return Err(ping).map_err(WebolError::Ping) } - } else { let (_, duration) = ping.unwrap(); debug!("Ping took {:?}", duration); cont = false; - // FIXME: remove unwrap - // FIXME: if error: SendError because no listener, then handle the entry directly - tx.send(ip.clone()); + handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await; }; } Ok(()) } -// FIXME: Handle commands through enum -pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_map: Arc>>) { - warn!("{:?}", ping_map); +async fn handle_broadcast_send(tx: &Sender, ip: String, ping_map: PingMap, uuid: String) { + debug!("sending pingsuccess message"); + ping_map.lock().await.insert(uuid.clone(), (ip.clone(), true)); + let _ = tx.send(BroadcastCommands::PingSuccess(ip)); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + trace!("remove {} from ping_map", uuid); + ping_map.lock().await.remove(&uuid); +} - let mut uuid: Option = None; +#[derive(Clone, Debug)] +pub enum BroadcastCommands { + PingSuccess(String) +} + +pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_map: PingMap) { + warn!("{:?}", ping_map); trace!("wait for ws message (uuid)"); let msg = socket.recv().await; - uuid = Some(msg.unwrap().unwrap().into_text().unwrap()); - - let uuid = uuid.unwrap(); + let uuid = msg.unwrap().unwrap().into_text().unwrap(); trace!("Search for uuid: {:?}", uuid); + // TODO: Handle Error let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); trace!("got device: {:?}", device); @@ -63,29 +70,32 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_ma match device.1 { true => { debug!("already started"); - socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); - socket.close().await.unwrap(); + // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + // socket.close().await.unwrap(); + socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); }, false => { let ip = device.0.to_owned(); - let mut i = 0; loop{ - trace!("{}", i); - // TODO: Check if older than 10 minutes, close if true trace!("wait for tx message"); let message = tx.subscribe().recv().await.unwrap(); - trace!("GOT = {}", message); - if message == ip { + trace!("GOT = {:?}", message); + // if let BroadcastCommands::PingSuccess(msg_ip) = message { + // if msg_ip == ip { + // trace!("message == ip"); + // break; + // } + // } + let BroadcastCommands::PingSuccess(msg_ip) = message; + if msg_ip == ip { trace!("message == ip"); break; } - i += 1; }; - socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); - socket.close().await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - ping_map.lock().await.remove(&uuid); + socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); + // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + // socket.close().await.unwrap(); warn!("{:?}", ping_map); } } -- cgit v1.2.3 From 5b7302cf9be4e0badd691203e160ca110613e34c Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Thu, 2 Nov 2023 19:44:29 +0100 Subject: ping timeout and cleanup --- Cargo.lock | 22 ++++++------- Cargo.toml | 2 +- README.md | 2 ++ src/db.rs | 2 +- src/error.rs | 27 +++------------ src/routes/device.rs | 10 +++--- src/routes/start.rs | 2 -- src/services/ping.rs | 92 +++++++++++++++++++++++++++------------------------- 8 files changed, 73 insertions(+), 86 deletions(-) (limited to 'src/error.rs') diff --git a/Cargo.lock b/Cargo.lock index df6537a..350c8e9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -276,9 +276,9 @@ dependencies = [ [[package]] name = "crc-catalog" -version = "2.2.0" +version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cace84e55f07e7301bae1c519df89cdad8cc3cd868413d3fdbdeca9ff3db484" +checksum = "4939f9ed1444bd8c896d37f3090012fa6e7834fe84ef8c9daa166109515732f9" [[package]] name = "crossbeam-queue" @@ -726,9 +726,9 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.0.2" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8adf3ddd720272c6ea8bf59463c04e0f93d0bbf7c5439b691bca2987e0270897" +checksum = "d530e1a18b1cb4c484e6e34556a0d948706958449fca0cab753d649f2bce3d1f" dependencies = [ "equivalent", "hashbrown 0.14.2", @@ -1416,9 +1416,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.107" +version = "1.0.108" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b420ce6e3d8bd882e9b243c6eed35dbc9a6110c9769e74b584e0d68d1f20c65" +checksum = "3d1c7e3eac408d115102c4c24ad393e0821bb3a5df4d506a80f85f7a742a526b" dependencies = [ "itoa", "ryu", @@ -2187,7 +2187,7 @@ checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" [[package]] name = "webol" -version = "0.1.0" +version = "0.2.0" dependencies = [ "axum", "axum-macros", @@ -2310,18 +2310,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.7.20" +version = "0.7.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dd66a62464e3ffd4e37bd09950c2b9dd6c4f8767380fabba0d523f9a775bc85a" +checksum = "e50cbb27c30666a6108abd6bc7577556265b44f243e2be89a8bc4e07a528c107" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.7.20" +version = "0.7.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "255c4596d41e6916ced49cfafea18727b24d67878fa180ddfd69b9df34fd1726" +checksum = "a25f293fe55f0a48e7010d65552bb63704f6ceb55a1a385da10d41d8f78e4a3d" dependencies = [ "proc-macro2", "quote", diff --git a/Cargo.toml b/Cargo.toml index 7b2c22b..1bf823f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "webol" -version = "0.1.0" +version = "0.2.0" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html diff --git a/README.md b/README.md index d177df2..e4a6c44 100644 --- a/README.md +++ b/README.md @@ -7,3 +7,5 @@ WEBOL_APIKEY: `String` WEBOL_SERVERADDR: `Option` (0.0.0.0:7229) WEBOL_BINDADDR: `Option` (0.0.0.0:1111) + +WEBOL_PINGTIMEOUT: `Option` (10) \ No newline at end of file diff --git a/src/db.rs b/src/db.rs index 51ea469..c012b47 100644 --- a/src/db.rs +++ b/src/db.rs @@ -8,7 +8,7 @@ use tracing::{debug, info}; #[cfg(not(debug_assertions))] use crate::config::SETTINGS; -#[derive(Serialize)] +#[derive(Serialize, Debug)] pub struct Device { pub id: String, pub mac: String, diff --git a/src/error.rs b/src/error.rs index 1592a78..5b82534 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,3 @@ -use std::error::Error; use std::io; use axum::http::StatusCode; use axum::Json; @@ -10,9 +9,7 @@ use crate::auth::AuthError; #[derive(Debug)] pub enum WebolError { Generic, - // User(UserError), Auth(AuthError), - Ping(surge_ping::SurgeError), DB(sqlx::Error), IpParse(::Err), BufferParse(std::num::ParseIntError), @@ -22,13 +19,10 @@ pub enum WebolError { impl IntoResponse for WebolError { fn into_response(self) -> Response { let (status, error_message) = match self { - Self::Auth(err) => err.get(), - // Self::User(err) => err.get(), - Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), - Self::Ping(err) => { - error!("Ping: {}", err.source().unwrap()); - (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") + Self::Auth(err) => { + err.get() }, + Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), Self::IpParse(err) => { error!("server error: {}", err.to_string()); (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") @@ -51,17 +45,4 @@ impl IntoResponse for WebolError { })); (status, body).into_response() } -} - -// #[derive(Debug)] -// pub enum UserError { -// UnknownUUID, -// } -// -// impl UserError { -// pub fn get(self) -> (StatusCode, &'static str) { -// match self { -// Self::UnknownUUID => (StatusCode::UNPROCESSABLE_ENTITY, "Unknown UUID"), -// } -// } -// } +} \ No newline at end of file diff --git a/src/routes/device.rs b/src/routes/device.rs index 7353733..1eeff0b 100644 --- a/src/routes/device.rs +++ b/src/routes/device.rs @@ -4,13 +4,13 @@ use axum::headers::HeaderMap; use axum::Json; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use tracing::info; +use tracing::{debug, info}; use crate::auth::auth; use crate::db::Device; use crate::error::WebolError; pub async fn get_device(State(state): State>, headers: HeaderMap, Json(payload): Json) -> Result, WebolError> { - info!("GET request"); + info!("add device {}", payload.id); let secret = headers.get("authorization"); if auth(secret).map_err(WebolError::Auth)? { let device = sqlx::query_as!( @@ -23,6 +23,8 @@ pub async fn get_device(State(state): State>, headers: Head payload.id ).fetch_one(&state.db).await.map_err(WebolError::DB)?; + debug!("got device {:?}", device); + Ok(Json(json!(device))) } else { Err(WebolError::Generic) @@ -35,7 +37,7 @@ pub struct GetDevicePayload { } pub async fn put_device(State(state): State>, headers: HeaderMap, Json(payload): Json) -> Result, WebolError> { - info!("PUT request"); + info!("add device {} ({}, {}, {})", payload.id, payload.mac, payload.broadcast_addr, payload.ip); let secret = headers.get("authorization"); if auth(secret).map_err(WebolError::Auth)? { sqlx::query!( @@ -69,7 +71,7 @@ pub struct PutDeviceResponse { } pub async fn post_device(State(state): State>, headers: HeaderMap, Json(payload): Json) -> Result, WebolError> { - info!("POST request"); + info!("edit device {} ({}, {}, {})", payload.id, payload.mac, payload.broadcast_addr, payload.ip); let secret = headers.get("authorization"); if auth(secret).map_err(WebolError::Auth)? { let device = sqlx::query_as!( diff --git a/src/routes/start.rs b/src/routes/start.rs index c2c9378..9cd358b 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -49,8 +49,6 @@ pub async fn start(State(state): State>, headers: HeaderMap debug!("Init ping service"); state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); - warn!("{:?}", state.ping_map); - crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await }); Some(uuid_genc) diff --git a/src/services/ping.rs b/src/services/ping.rs index f0cc4a3..a26dacc 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,14 +1,13 @@ -use std::borrow::Cow; use std::sync::Arc; use axum::extract::{ws::WebSocket}; -use axum::extract::ws::{CloseFrame, Message}; +use axum::extract::ws::Message; use dashmap::DashMap; +use time::{Duration, Instant}; use tokio::sync::broadcast::{Sender}; -use tracing::{debug, trace, warn}; +use tracing::{debug, error, trace}; use crate::AppState; - -use crate::error::WebolError; +use crate::config::SETTINGS; pub type PingMap = DashMap; @@ -18,92 +17,97 @@ pub struct PingValue { pub online: bool } -pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: &PingMap) -> Result<(), WebolError> { +pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: &PingMap) { + let timer = Instant::now(); let payload = [0; 8]; - // TODO: Better while let mut cont = true; while cont { let ping = surge_ping::ping( - ip.parse().map_err(WebolError::IpParse)?, + ip.parse().expect("bad ip"), &payload ).await; if let Err(ping) = ping { cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); if !cont { - return Err(ping).map_err(WebolError::Ping) + error!("{}", ping.to_string()); + } + if timer.elapsed() >= Duration::minutes(SETTINGS.get_int("pingtimeout").unwrap_or(10)) { + let _ = tx.send(BroadcastCommands::PingTimeout(uuid.clone())); + trace!("remove {} from ping_map after timeout", uuid); + ping_map.remove(&uuid); + cont = false; } } else { - let (_, duration) = ping.unwrap(); + let (_, duration) = ping.map_err(|err| error!("{}", err.to_string())).expect("fatal error"); debug!("Ping took {:?}", duration); cont = false; handle_broadcast_send(&tx, ip.clone(), ping_map, uuid.clone()).await; }; } - - Ok(()) } async fn handle_broadcast_send(tx: &Sender, ip: String, ping_map: &PingMap, uuid: String) { - debug!("sending pingsuccess message"); + debug!("send pingsuccess message"); ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); - let _ = tx.send(BroadcastCommands::PingSuccess(ip)); + let _ = tx.send(BroadcastCommands::PingSuccess(uuid.clone())); tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - trace!("remove {} from ping_map", uuid); + trace!("remove {} from ping_map after success", uuid); ping_map.remove(&uuid); } #[derive(Clone, Debug)] pub enum BroadcastCommands { - PingSuccess(String) + PingSuccess(String), + PingTimeout(String) } pub async fn status_websocket(mut socket: WebSocket, state: Arc) { - warn!("{:?}", state.ping_map); - trace!("wait for ws message (uuid)"); let msg = socket.recv().await; let uuid = msg.unwrap().unwrap().into_text().unwrap(); trace!("Search for uuid: {:?}", uuid); - // TODO: Handle Error - let device = state.ping_map.get(&uuid).unwrap().to_owned(); + match state.ping_map.get(&uuid) { + Some(device) => { + debug!("got device: {} (online: {})", device.ip, device.online); + let _ = socket.send(process_device(state.clone(), uuid, device.to_owned()).await).await; + }, + None => { + debug!("didn't find any device"); + let _ = socket.send(Message::Text(format!("notfound_{}", uuid))).await; + }, + }; - trace!("got device: {:?}", device); + let _ = socket.close().await; +} +async fn process_device(state: Arc, uuid: String, device: PingValue) -> Message { match device.online { true => { debug!("already started"); - // TODO: What's better? - // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); - // socket.close().await.unwrap(); - socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); + Message::Text(format!("start_{}", uuid)) }, false => { - let ip = device.ip.to_owned(); loop{ trace!("wait for tx message"); - let message = state.ping_send.subscribe().recv().await.unwrap(); - trace!("GOT = {:?}", message); - // if let BroadcastCommands::PingSuccess(msg_ip) = message { - // if msg_ip == ip { - // trace!("message == ip"); - // break; - // } - // } - let BroadcastCommands::PingSuccess(msg_ip) = message; - if msg_ip == ip { - trace!("message == ip"); - break; + let message = state.ping_send.subscribe().recv().await.expect("fatal error"); + trace!("got message {:?}", message); + return match message { + BroadcastCommands::PingSuccess(msg_uuid) => { + if msg_uuid != uuid { continue; } + trace!("message == uuid success"); + Message::Text(format!("start_{}", uuid)) + }, + BroadcastCommands::PingTimeout(msg_uuid) => { + if msg_uuid != uuid { continue; } + trace!("message == uuid timeout"); + Message::Text(format!("timeout_{}", uuid)) + } } - }; - - socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); - // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); - // socket.close().await.unwrap(); - warn!("{:?}", state.ping_map); + } } } } \ No newline at end of file -- cgit v1.2.3