From dcfb83fb2069bfcf4642b03453253e35479bf3da Mon Sep 17 00:00:00 2001 From: fx Date: Tue, 24 Oct 2023 01:15:22 +0200 Subject: first ping impl baseline, doesnt work --- src/main.rs | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index ce12cf6..9c31ec8 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,12 +4,14 @@ use axum::{Router, routing::post}; use axum::routing::{get, put}; use sqlx::PgPool; use time::util::local_offset; +use tokio::sync::mpsc::{self, Sender}; use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::{EnvFilter, fmt::{self, time::LocalTime}, prelude::*}; use crate::config::SETTINGS; use crate::db::init_db_pool; use crate::routes::device::{get_device, post_device, put_device}; use crate::routes::start::start; +use crate::services::ping::ws_ping; mod auth; mod config; @@ -17,6 +19,7 @@ mod routes; mod wol; mod db; mod error; +mod services; #[tokio::main] async fn main() { @@ -43,13 +46,23 @@ async fn main() { let db = init_db_pool().await; sqlx::migrate!().run(&db).await.unwrap(); - let shared_state = Arc::new(AppState { db }); + let (tx, mut rx) = mpsc::channel(32); + + // FIXME: once_cell? or just static mutable + tokio::spawn( async move { + while let Some(message) = rx.recv().await { + println!("GOT = {}", message); + } + }); + + let shared_state = Arc::new(AppState { db, ping_send: tx }); let app = Router::new() .route("/start", post(start)) .route("/device", get(get_device)) .route("/device", put(put_device)) .route("/device", post(post_device)) + .route("/status", get(ws_ping)) .with_state(shared_state); let addr = SETTINGS.get_string("serveraddr").unwrap_or("0.0.0.0:7229".to_string()); @@ -61,5 +74,7 @@ async fn main() { } pub struct AppState { - db: PgPool + db: PgPool, + ping_send: Sender, + // ping_receive: Receiver } -- cgit v1.2.3 From f9224ff02e688dec819ab81893320a0611f2a198 Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Tue, 24 Oct 2023 14:56:17 +0200 Subject: Seems to work --- src/main.rs | 12 ++---------- src/routes/start.rs | 10 +++++++--- src/services/ping.rs | 28 +++++++++++++++++----------- 3 files changed, 26 insertions(+), 24 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index 9c31ec8..124c44e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -4,7 +4,7 @@ use axum::{Router, routing::post}; use axum::routing::{get, put}; use sqlx::PgPool; use time::util::local_offset; -use tokio::sync::mpsc::{self, Sender}; +use tokio::sync::broadcast::{channel, Sender}; use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::{EnvFilter, fmt::{self, time::LocalTime}, prelude::*}; use crate::config::SETTINGS; @@ -46,15 +46,8 @@ async fn main() { let db = init_db_pool().await; sqlx::migrate!().run(&db).await.unwrap(); - let (tx, mut rx) = mpsc::channel(32); + let (tx, _) = channel(32); - // FIXME: once_cell? or just static mutable - tokio::spawn( async move { - while let Some(message) = rx.recv().await { - println!("GOT = {}", message); - } - }); - let shared_state = Arc::new(AppState { db, ping_send: tx }); let app = Router::new() @@ -76,5 +69,4 @@ async fn main() { pub struct AppState { db: PgPool, ping_send: Sender, - // ping_receive: Receiver } diff --git a/src/routes/start.rs b/src/routes/start.rs index b45fe5b..863ef16 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use axum::extract::State; use serde_json::{json, Value}; -use tracing::info; +use tracing::{debug, info}; use crate::auth::auth; use crate::config::SETTINGS; use crate::wol::{create_buffer, send_packet}; @@ -39,8 +39,12 @@ pub async fn start(State(state): State>, headers: HeaderMap )?; if payload.ping.is_some_and(|ping| ping) { - tokio::spawn(async move {crate::services::ping::spawn(state.ping_send.clone()).await}); - } + debug!("ping true"); + tokio::spawn(async move { + debug!("Init ping service"); + crate::services::ping::spawn(state.ping_send.clone()).await + }); + }; Ok(Json(json!(StartResponse { id: device.id, boot: true }))) } else { Err(WebolError::Generic) diff --git a/src/services/ping.rs b/src/services/ping.rs index 6e710ec..ff328a5 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,8 +1,9 @@ +use std::borrow::Cow; use std::sync::Arc; use axum::{extract::{WebSocketUpgrade, ws::WebSocket, State}, response::Response}; -use tokio::sync::mpsc::Sender; -use tracing::{debug, error}; +use tokio::sync::broadcast::{Sender}; +use tracing::{debug, error, trace}; use crate::{error::WebolError, AppState}; @@ -12,7 +13,7 @@ pub async fn spawn(tx: Sender) -> Result<(), WebolError> { let mut cont = true; while cont { let ping = surge_ping::ping( - "192.168.178.28".parse().map_err(WebolError::IpParse)?, + "127.0.0.1".parse().map_err(WebolError::IpParse)?, &payload ).await; @@ -30,24 +31,29 @@ pub async fn spawn(tx: Sender) -> Result<(), WebolError> { debug!("Ping took {:?}", duration); cont = false; // FIXME: remove unwrap - tx.send("Got ping".to_string()).await.unwrap(); + tx.send("Got ping".to_string()).unwrap(); }; } Ok(()) } -pub async fn ws_ping(ws: WebSocketUpgrade, State(_state): State>) -> Response { - ws.on_upgrade(handle_socket) +// TODO: Status to routes, websocket here +pub async fn ws_ping(State(state): State>, ws: WebSocketUpgrade) -> Response { + ws.on_upgrade(move |socket| handle_socket(socket, state.ping_send.clone())) } // FIXME: Handle commands through enum -async fn handle_socket(mut socket: WebSocket) { +async fn handle_socket(mut socket: WebSocket, tx: Sender) { // TODO: Understand Cow - - // match socket.send(axum::extract::ws::Message::Close(Some(CloseFrame { code: 4000, reason: Cow::Owned("started".to_owned()) }))).await.map_err(WebolError::Axum) { - match socket.send(axum::extract::ws::Message::Text("started".to_string())).await.map_err(WebolError::Axum) { + while let message = tx.subscribe().recv().await.unwrap() { + trace!("GOT = {}", message); + if &message == "Got ping" { + break; + } + }; + match socket.send(axum::extract::ws::Message::Close(Some(axum::extract::ws::CloseFrame { code: 4000, reason: Cow::Owned("started".to_owned()) }))).await.map_err(WebolError::Axum) { Ok(..) => (), Err(err) => { error!("Server Error: {:?}", err) } }; -} +} \ No newline at end of file -- cgit v1.2.3 From 00dd8a9abee6b9f0cfc37c6f20f30f0d99dfe91a Mon Sep 17 00:00:00 2001 From: fx Date: Wed, 25 Oct 2023 12:53:31 +0200 Subject: runs, no error handling --- Cargo.lock | 24 ++++++++++++++++ Cargo.toml | 2 ++ src/main.rs | 13 ++++++--- src/routes/mod.rs | 3 +- src/routes/start.rs | 24 +++++++++++----- src/routes/status.rs | 12 ++++++++ src/services/ping.rs | 79 +++++++++++++++++++++++++++++++++++++--------------- 7 files changed, 122 insertions(+), 35 deletions(-) create mode 100644 src/routes/status.rs (limited to 'src/main.rs') diff --git a/Cargo.lock b/Cargo.lock index bf813bf..2650edc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,6 +134,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.38", +] + [[package]] name = "backtrace" version = "0.3.69" @@ -2127,6 +2139,16 @@ version = "0.7.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" +[[package]] +name = "uuid" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "88ad59a7560b41a70d191093a945f0b87bc1deeda46fb237479708a1d6b6cdfc" +dependencies = [ + "getrandom", + "rand", +] + [[package]] name = "valuable" version = "0.1.0" @@ -2165,6 +2187,7 @@ name = "webol" version = "0.1.0" dependencies = [ "axum", + "axum-macros", "config", "once_cell", "serde", @@ -2175,6 +2198,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "uuid", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 9bdc4da..d29a1b3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,3 +17,5 @@ config = "0.13.3" once_cell = "1.18.0" sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio"]} surge-ping = "0.8.0" +axum-macros = "0.3.8" +uuid = { version = "1.5.0", features = ["v4", "fast-rng"] } diff --git a/src/main.rs b/src/main.rs index 124c44e..854b59d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::env; use std::sync::Arc; use axum::{Router, routing::post}; @@ -5,13 +6,14 @@ use axum::routing::{get, put}; use sqlx::PgPool; use time::util::local_offset; use tokio::sync::broadcast::{channel, Sender}; +use tokio::sync::Mutex; use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::{EnvFilter, fmt::{self, time::LocalTime}, prelude::*}; use crate::config::SETTINGS; use crate::db::init_db_pool; use crate::routes::device::{get_device, post_device, put_device}; use crate::routes::start::start; -use crate::services::ping::ws_ping; +use crate::routes::status::status; mod auth; mod config; @@ -47,15 +49,17 @@ async fn main() { sqlx::migrate!().run(&db).await.unwrap(); let (tx, _) = channel(32); + + let ping_map: HashMap = HashMap::new(); - let shared_state = Arc::new(AppState { db, ping_send: tx }); + let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map: Arc::new(Mutex::new(ping_map)) }); let app = Router::new() .route("/start", post(start)) .route("/device", get(get_device)) .route("/device", put(put_device)) .route("/device", post(post_device)) - .route("/status", get(ws_ping)) + .route("/status", get(status)) .with_state(shared_state); let addr = SETTINGS.get_string("serveraddr").unwrap_or("0.0.0.0:7229".to_string()); @@ -69,4 +73,5 @@ async fn main() { pub struct AppState { db: PgPool, ping_send: Sender, -} + ping_map: Arc>>, +} \ No newline at end of file diff --git a/src/routes/mod.rs b/src/routes/mod.rs index 12fbfab..d5ab0d6 100644 --- a/src/routes/mod.rs +++ b/src/routes/mod.rs @@ -1,2 +1,3 @@ pub mod start; -pub mod device; \ No newline at end of file +pub mod device; +pub mod status; \ No newline at end of file diff --git a/src/routes/start.rs b/src/routes/start.rs index 863ef16..45e7ec8 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -4,15 +4,18 @@ use serde::{Deserialize, Serialize}; use std::sync::Arc; use axum::extract::State; use serde_json::{json, Value}; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; +use uuid::Uuid; use crate::auth::auth; use crate::config::SETTINGS; use crate::wol::{create_buffer, send_packet}; use crate::db::Device; use crate::error::WebolError; +#[axum_macros::debug_handler] pub async fn start(State(state): State>, headers: HeaderMap, Json(payload): Json) -> Result, WebolError> { info!("POST request"); + warn!("{:?}", state.ping_map); let secret = headers.get("authorization"); let authorized = auth(secret).map_err(WebolError::Auth)?; if authorized { @@ -38,14 +41,20 @@ pub async fn start(State(state): State>, headers: HeaderMap create_buffer(&device.mac)? )?; - if payload.ping.is_some_and(|ping| ping) { - debug!("ping true"); - tokio::spawn(async move { + let uuid = if payload.ping.is_some_and(|ping| ping) { + let uuid_gen = Uuid::new_v4().to_string(); + let uuid_genc = uuid_gen.clone(); + tokio::spawn(async move{ debug!("Init ping service"); - crate::services::ping::spawn(state.ping_send.clone()).await + state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); + + warn!("{:?}", state.ping_map); + + crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string()).await; }); - }; - Ok(Json(json!(StartResponse { id: device.id, boot: true }))) + Some(uuid_genc) + } else { None }; + Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) } else { Err(WebolError::Generic) } @@ -61,4 +70,5 @@ pub struct StartPayload { struct StartResponse { id: String, boot: bool, + uuid: Option, } diff --git a/src/routes/status.rs b/src/routes/status.rs new file mode 100644 index 0000000..cdecf6a --- /dev/null +++ b/src/routes/status.rs @@ -0,0 +1,12 @@ +use std::sync::Arc; +use axum::extract::{State, WebSocketUpgrade}; +use axum::response::Response; +use serde::Deserialize; +use crate::AppState; +use crate::services::ping::status_websocket; + +#[axum_macros::debug_handler] +pub async fn status(State(state): State>, ws: WebSocketUpgrade) -> Response { + // TODO: remove unwrap + ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) +} \ No newline at end of file diff --git a/src/services/ping.rs b/src/services/ping.rs index ff328a5..e3d465d 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,26 +1,29 @@ use std::borrow::Cow; +use std::collections::HashMap; use std::sync::Arc; -use axum::{extract::{WebSocketUpgrade, ws::WebSocket, State}, response::Response}; +use axum::extract::{ws::WebSocket}; +use axum::extract::ws::Message; use tokio::sync::broadcast::{Sender}; -use tracing::{debug, error, trace}; +use tokio::sync::Mutex; +use tracing::{debug, error, trace, warn}; -use crate::{error::WebolError, AppState}; +use crate::error::WebolError; -pub async fn spawn(tx: Sender) -> Result<(), WebolError> { +pub async fn spawn(tx: Sender, ip: String) -> Result<(), WebolError> { let payload = [0; 8]; let mut cont = true; while cont { let ping = surge_ping::ping( - "127.0.0.1".parse().map_err(WebolError::IpParse)?, + ip.parse().map_err(WebolError::IpParse)?, &payload ).await; if let Err(ping) = ping { cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); - debug!("{}", cont); + // debug!("{}", cont); if !cont { return Err(ping).map_err(WebolError::Ping) @@ -31,29 +34,59 @@ pub async fn spawn(tx: Sender) -> Result<(), WebolError> { debug!("Ping took {:?}", duration); cont = false; // FIXME: remove unwrap - tx.send("Got ping".to_string()).unwrap(); + // FIXME: if error: SendError because no listener, then handle the entry directly + tx.send(ip.clone()); }; } Ok(()) } -// TODO: Status to routes, websocket here -pub async fn ws_ping(State(state): State>, ws: WebSocketUpgrade) -> Response { - ws.on_upgrade(move |socket| handle_socket(socket, state.ping_send.clone())) -} - // FIXME: Handle commands through enum -async fn handle_socket(mut socket: WebSocket, tx: Sender) { - // TODO: Understand Cow - while let message = tx.subscribe().recv().await.unwrap() { - trace!("GOT = {}", message); - if &message == "Got ping" { - break; +pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_map: Arc>>) { + warn!("{:?}", ping_map); + + let mut uuid: Option = None; + + trace!("wait for ws message (uuid)"); + let msg = socket.recv().await; + uuid = Some(msg.unwrap().unwrap().into_text().unwrap()); + + let uuid = uuid.unwrap(); + + trace!("Search for uuid: {:?}", uuid); + + let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); + + trace!("got device: {:?}", device); + + match device.1 { + true => { + debug!("already started"); + socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + socket.close().await.unwrap(); + }, + false => { + let ip = device.0.to_owned(); + let mut i = 0; + loop{ + trace!("{}", i); + // TODO: Check if older than 10 minutes, close if true + trace!("wait for tx message"); + let message = tx.subscribe().recv().await.unwrap(); + trace!("GOT = {}", message); + if message == ip { + trace!("message == ip"); + break; + } + i += 1; + }; + + socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + socket.close().await.unwrap(); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + ping_map.lock().await.remove(&uuid); + warn!("{:?}", ping_map); } - }; - match socket.send(axum::extract::ws::Message::Close(Some(axum::extract::ws::CloseFrame { code: 4000, reason: Cow::Owned("started".to_owned()) }))).await.map_err(WebolError::Axum) { - Ok(..) => (), - Err(err) => { error!("Server Error: {:?}", err) } - }; + } } \ No newline at end of file -- cgit v1.2.3 From 0cca10290d089aabac8f2e4356cfaf80f06ae194 Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Sun, 29 Oct 2023 19:55:26 +0100 Subject: does what is expected, but badly --- src/error.rs | 20 ++++++++++++---- src/main.rs | 5 ++-- src/routes/start.rs | 5 ++-- src/routes/status.rs | 2 -- src/services/ping.rs | 68 ++++++++++++++++++++++++++++++---------------------- 5 files changed, 60 insertions(+), 40 deletions(-) (limited to 'src/main.rs') diff --git a/src/error.rs b/src/error.rs index f143ee9..1592a78 100644 --- a/src/error.rs +++ b/src/error.rs @@ -10,19 +10,20 @@ use crate::auth::AuthError; #[derive(Debug)] pub enum WebolError { Generic, + // User(UserError), Auth(AuthError), Ping(surge_ping::SurgeError), DB(sqlx::Error), IpParse(::Err), BufferParse(std::num::ParseIntError), Broadcast(io::Error), - Axum(axum::Error) } impl IntoResponse for WebolError { fn into_response(self) -> Response { let (status, error_message) = match self { Self::Auth(err) => err.get(), + // Self::User(err) => err.get(), Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), Self::Ping(err) => { error!("Ping: {}", err.source().unwrap()); @@ -44,10 +45,6 @@ impl IntoResponse for WebolError { error!("server error: {}", err.to_string()); (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") }, - Self::Axum(err) => { - error!("server error: {}", err.to_string()); - (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") - }, }; let body = Json(json!({ "error": error_message, @@ -55,3 +52,16 @@ impl IntoResponse for WebolError { (status, body).into_response() } } + +// #[derive(Debug)] +// pub enum UserError { +// UnknownUUID, +// } +// +// impl UserError { +// pub fn get(self) -> (StatusCode, &'static str) { +// match self { +// Self::UnknownUUID => (StatusCode::UNPROCESSABLE_ENTITY, "Unknown UUID"), +// } +// } +// } diff --git a/src/main.rs b/src/main.rs index 854b59d..545d8fe 100644 --- a/src/main.rs +++ b/src/main.rs @@ -14,6 +14,7 @@ use crate::db::init_db_pool; use crate::routes::device::{get_device, post_device, put_device}; use crate::routes::start::start; use crate::routes::status::status; +use crate::services::ping::{BroadcastCommands, PingMap}; mod auth; mod config; @@ -72,6 +73,6 @@ async fn main() { pub struct AppState { db: PgPool, - ping_send: Sender, - ping_map: Arc>>, + ping_send: Sender, + ping_map: PingMap, } \ No newline at end of file diff --git a/src/routes/start.rs b/src/routes/start.rs index 45e7ec8..b1c8a73 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -44,15 +44,16 @@ pub async fn start(State(state): State>, headers: HeaderMap let uuid = if payload.ping.is_some_and(|ping| ping) { let uuid_gen = Uuid::new_v4().to_string(); let uuid_genc = uuid_gen.clone(); + let uuid_gencc = uuid_gen.clone(); tokio::spawn(async move{ debug!("Init ping service"); state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); warn!("{:?}", state.ping_map); - crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string()).await; + crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string(), uuid_genc.clone(), state.ping_map.clone()).await }); - Some(uuid_genc) + Some(uuid_gencc) } else { None }; Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) } else { diff --git a/src/routes/status.rs b/src/routes/status.rs index cdecf6a..4a5ec67 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs @@ -1,12 +1,10 @@ use std::sync::Arc; use axum::extract::{State, WebSocketUpgrade}; use axum::response::Response; -use serde::Deserialize; use crate::AppState; use crate::services::ping::status_websocket; #[axum_macros::debug_handler] pub async fn status(State(state): State>, ws: WebSocketUpgrade) -> Response { - // TODO: remove unwrap ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) } \ No newline at end of file diff --git a/src/services/ping.rs b/src/services/ping.rs index e3d465d..6835fc0 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -3,16 +3,19 @@ use std::collections::HashMap; use std::sync::Arc; use axum::extract::{ws::WebSocket}; -use axum::extract::ws::Message; +use axum::extract::ws::{CloseFrame, Message}; use tokio::sync::broadcast::{Sender}; use tokio::sync::Mutex; -use tracing::{debug, error, trace, warn}; +use tracing::{debug, trace, warn}; use crate::error::WebolError; -pub async fn spawn(tx: Sender, ip: String) -> Result<(), WebolError> { +pub type PingMap = Arc>>; + +pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { let payload = [0; 8]; + // TODO: Better while let mut cont = true; while cont { let ping = surge_ping::ping( @@ -22,40 +25,44 @@ pub async fn spawn(tx: Sender, ip: String) -> Result<(), WebolError> { if let Err(ping) = ping { cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); - - // debug!("{}", cont); - if !cont { return Err(ping).map_err(WebolError::Ping) } - } else { let (_, duration) = ping.unwrap(); debug!("Ping took {:?}", duration); cont = false; - // FIXME: remove unwrap - // FIXME: if error: SendError because no listener, then handle the entry directly - tx.send(ip.clone()); + handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await; }; } Ok(()) } -// FIXME: Handle commands through enum -pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_map: Arc>>) { - warn!("{:?}", ping_map); +async fn handle_broadcast_send(tx: &Sender, ip: String, ping_map: PingMap, uuid: String) { + debug!("sending pingsuccess message"); + ping_map.lock().await.insert(uuid.clone(), (ip.clone(), true)); + let _ = tx.send(BroadcastCommands::PingSuccess(ip)); + tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; + trace!("remove {} from ping_map", uuid); + ping_map.lock().await.remove(&uuid); +} - let mut uuid: Option = None; +#[derive(Clone, Debug)] +pub enum BroadcastCommands { + PingSuccess(String) +} + +pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_map: PingMap) { + warn!("{:?}", ping_map); trace!("wait for ws message (uuid)"); let msg = socket.recv().await; - uuid = Some(msg.unwrap().unwrap().into_text().unwrap()); - - let uuid = uuid.unwrap(); + let uuid = msg.unwrap().unwrap().into_text().unwrap(); trace!("Search for uuid: {:?}", uuid); + // TODO: Handle Error let device = ping_map.lock().await.get(&uuid).unwrap().to_owned(); trace!("got device: {:?}", device); @@ -63,29 +70,32 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_ma match device.1 { true => { debug!("already started"); - socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); - socket.close().await.unwrap(); + // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + // socket.close().await.unwrap(); + socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); }, false => { let ip = device.0.to_owned(); - let mut i = 0; loop{ - trace!("{}", i); - // TODO: Check if older than 10 minutes, close if true trace!("wait for tx message"); let message = tx.subscribe().recv().await.unwrap(); - trace!("GOT = {}", message); - if message == ip { + trace!("GOT = {:?}", message); + // if let BroadcastCommands::PingSuccess(msg_ip) = message { + // if msg_ip == ip { + // trace!("message == ip"); + // break; + // } + // } + let BroadcastCommands::PingSuccess(msg_ip) = message; + if msg_ip == ip { trace!("message == ip"); break; } - i += 1; }; - socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); - socket.close().await.unwrap(); - tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; - ping_map.lock().await.remove(&uuid); + socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); + // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); + // socket.close().await.unwrap(); warn!("{:?}", ping_map); } } -- cgit v1.2.3 From 84c32953ae5f52be44af4b48381747f55cb04f4a Mon Sep 17 00:00:00 2001 From: FxQnLr Date: Sun, 29 Oct 2023 20:30:01 +0100 Subject: impl dashmap --- Cargo.lock | 14 ++++++++++++++ Cargo.toml | 1 + src/main.rs | 7 +++---- src/routes/start.rs | 2 +- src/services/ping.rs | 11 +++++------ 5 files changed, 24 insertions(+), 11 deletions(-) (limited to 'src/main.rs') diff --git a/Cargo.lock b/Cargo.lock index 2650edc..a60c07d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -308,6 +308,19 @@ dependencies = [ "typenum", ] +[[package]] +name = "dashmap" +version = "5.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "978747c1d849a7d2ee5e8adc0159961c48fb7e5db2f06af6723b80123bb53856" +dependencies = [ + "cfg-if", + "hashbrown 0.14.2", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -2189,6 +2202,7 @@ dependencies = [ "axum", "axum-macros", "config", + "dashmap", "once_cell", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index d29a1b3..cf38752 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,4 @@ sqlx = { version = "0.7.1", features = ["postgres", "runtime-tokio"]} surge-ping = "0.8.0" axum-macros = "0.3.8" uuid = { version = "1.5.0", features = ["v4", "fast-rng"] } +dashmap = "5.5.3" diff --git a/src/main.rs b/src/main.rs index 545d8fe..762a817 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,11 @@ -use std::collections::HashMap; use std::env; use std::sync::Arc; use axum::{Router, routing::post}; use axum::routing::{get, put}; +use dashmap::DashMap; use sqlx::PgPool; use time::util::local_offset; use tokio::sync::broadcast::{channel, Sender}; -use tokio::sync::Mutex; use tracing::{info, level_filters::LevelFilter}; use tracing_subscriber::{EnvFilter, fmt::{self, time::LocalTime}, prelude::*}; use crate::config::SETTINGS; @@ -51,9 +50,9 @@ async fn main() { let (tx, _) = channel(32); - let ping_map: HashMap = HashMap::new(); + let ping_map: DashMap = DashMap::new(); - let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map: Arc::new(Mutex::new(ping_map)) }); + let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map: Arc::new(ping_map) }); let app = Router::new() .route("/start", post(start)) diff --git a/src/routes/start.rs b/src/routes/start.rs index b1c8a73..5b73281 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -47,7 +47,7 @@ pub async fn start(State(state): State>, headers: HeaderMap let uuid_gencc = uuid_gen.clone(); tokio::spawn(async move{ debug!("Init ping service"); - state.ping_map.lock().await.insert(uuid_gen, ("192.168.178.94".to_string(), false)); + state.ping_map.insert(uuid_gen, ("192.168.178.94".to_string(), false)); warn!("{:?}", state.ping_map); diff --git a/src/services/ping.rs b/src/services/ping.rs index 6835fc0..ed848fc 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -1,16 +1,15 @@ use std::borrow::Cow; -use std::collections::HashMap; use std::sync::Arc; use axum::extract::{ws::WebSocket}; use axum::extract::ws::{CloseFrame, Message}; +use dashmap::DashMap; use tokio::sync::broadcast::{Sender}; -use tokio::sync::Mutex; use tracing::{debug, trace, warn}; use crate::error::WebolError; -pub type PingMap = Arc>>; +pub type PingMap = Arc>; pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { let payload = [0; 8]; @@ -41,11 +40,11 @@ pub async fn spawn(tx: Sender, ip: String, uuid: String, ping async fn handle_broadcast_send(tx: &Sender, ip: String, ping_map: PingMap, uuid: String) { debug!("sending pingsuccess message"); - ping_map.lock().await.insert(uuid.clone(), (ip.clone(), true)); + ping_map.insert(uuid.clone(), (ip.clone(), true)); let _ = tx.send(BroadcastCommands::PingSuccess(ip)); tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; trace!("remove {} from ping_map", uuid); - ping_map.lock().await.remove(&uuid); + ping_map.remove(&uuid); } #[derive(Clone, Debug)] @@ -63,7 +62,7 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender Date: Sun, 29 Oct 2023 21:09:46 +0100 Subject: add ip to database and use for ping, remove arc from pingmap --- migrations/20231009123228_devices.sql | 7 ++++--- src/db.rs | 3 ++- src/main.rs | 2 +- src/routes/device.rs | 16 ++++++++++------ src/routes/start.rs | 11 +++++------ src/routes/status.rs | 2 +- src/services/ping.rs | 20 +++++++++++--------- 7 files changed, 34 insertions(+), 27 deletions(-) (limited to 'src/main.rs') diff --git a/migrations/20231009123228_devices.sql b/migrations/20231009123228_devices.sql index 9d0c7ca..b911b19 100644 --- a/migrations/20231009123228_devices.sql +++ b/migrations/20231009123228_devices.sql @@ -1,7 +1,8 @@ -- Add migration script here CREATE TABLE IF NOT EXISTS "devices" ( - "id" TEXT PRIMARY KEY NOT NULL, - "mac" TEXT NOT NULL, - "broadcast_addr" TEXT NOT NULL + "id" VARCHAR(255) PRIMARY KEY NOT NULL, + "mac" VARCHAR(17) NOT NULL, + "broadcast_addr" VARCHAR(39) NOT NULL, + "ip" VARCHAR(39) NOT NULL ) diff --git a/src/db.rs b/src/db.rs index 3c51e2b..51ea469 100644 --- a/src/db.rs +++ b/src/db.rs @@ -12,7 +12,8 @@ use crate::config::SETTINGS; pub struct Device { pub id: String, pub mac: String, - pub broadcast_addr: String + pub broadcast_addr: String, + pub ip: String } pub async fn init_db_pool() -> PgPool { diff --git a/src/main.rs b/src/main.rs index 762a817..ee540af 100644 --- a/src/main.rs +++ b/src/main.rs @@ -52,7 +52,7 @@ async fn main() { let ping_map: DashMap = DashMap::new(); - let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map: Arc::new(ping_map) }); + let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map }); let app = Router::new() .route("/start", post(start)) diff --git a/src/routes/device.rs b/src/routes/device.rs index 248d1e0..7353733 100644 --- a/src/routes/device.rs +++ b/src/routes/device.rs @@ -16,7 +16,7 @@ pub async fn get_device(State(state): State>, headers: Head let device = sqlx::query_as!( Device, r#" - SELECT id, mac, broadcast_addr + SELECT id, mac, broadcast_addr, ip FROM devices WHERE id = $1; "#, @@ -40,12 +40,13 @@ pub async fn put_device(State(state): State>, headers: Head if auth(secret).map_err(WebolError::Auth)? { sqlx::query!( r#" - INSERT INTO devices (id, mac, broadcast_addr) - VALUES ($1, $2, $3); + INSERT INTO devices (id, mac, broadcast_addr, ip) + VALUES ($1, $2, $3, $4); "#, payload.id, payload.mac, - payload.broadcast_addr + payload.broadcast_addr, + payload.ip ).execute(&state.db).await.map_err(WebolError::DB)?; Ok(Json(json!(PutDeviceResponse { success: true }))) @@ -59,6 +60,7 @@ pub struct PutDevicePayload { id: String, mac: String, broadcast_addr: String, + ip: String } #[derive(Serialize)] @@ -74,11 +76,12 @@ pub async fn post_device(State(state): State>, headers: Hea Device, r#" UPDATE devices - SET mac = $1, broadcast_addr = $2 WHERE id = $3 - RETURNING id, mac, broadcast_addr; + SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 + RETURNING id, mac, broadcast_addr, ip; "#, payload.mac, payload.broadcast_addr, + payload.ip, payload.id ).fetch_one(&state.db).await.map_err(WebolError::DB)?; @@ -93,4 +96,5 @@ pub struct PostDevicePayload { id: String, mac: String, broadcast_addr: String, + ip: String, } diff --git a/src/routes/start.rs b/src/routes/start.rs index 5b73281..3bccb0f 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -22,7 +22,7 @@ pub async fn start(State(state): State>, headers: HeaderMap let device = sqlx::query_as!( Device, r#" - SELECT id, mac, broadcast_addr + SELECT id, mac, broadcast_addr, ip FROM devices WHERE id = $1; "#, @@ -44,16 +44,15 @@ pub async fn start(State(state): State>, headers: HeaderMap let uuid = if payload.ping.is_some_and(|ping| ping) { let uuid_gen = Uuid::new_v4().to_string(); let uuid_genc = uuid_gen.clone(); - let uuid_gencc = uuid_gen.clone(); - tokio::spawn(async move{ + tokio::spawn(async move { debug!("Init ping service"); - state.ping_map.insert(uuid_gen, ("192.168.178.94".to_string(), false)); + state.ping_map.insert(uuid_gen.clone(), (device.ip.clone(), false)); warn!("{:?}", state.ping_map); - crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string(), uuid_genc.clone(), state.ping_map.clone()).await + crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await }); - Some(uuid_gencc) + Some(uuid_genc) } else { None }; Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) } else { diff --git a/src/routes/status.rs b/src/routes/status.rs index 4a5ec67..45f3e51 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs @@ -6,5 +6,5 @@ use crate::services::ping::status_websocket; #[axum_macros::debug_handler] pub async fn status(State(state): State>, ws: WebSocketUpgrade) -> Response { - ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) + ws.on_upgrade(move |socket| status_websocket(socket, state)) } \ No newline at end of file diff --git a/src/services/ping.rs b/src/services/ping.rs index ed848fc..04ad511 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -6,12 +6,13 @@ use axum::extract::ws::{CloseFrame, Message}; use dashmap::DashMap; use tokio::sync::broadcast::{Sender}; use tracing::{debug, trace, warn}; +use crate::AppState; use crate::error::WebolError; -pub type PingMap = Arc>; +pub type PingMap = DashMap; -pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { +pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: &PingMap) -> Result<(), WebolError> { let payload = [0; 8]; // TODO: Better while @@ -31,14 +32,14 @@ pub async fn spawn(tx: Sender, ip: String, uuid: String, ping let (_, duration) = ping.unwrap(); debug!("Ping took {:?}", duration); cont = false; - handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await; + handle_broadcast_send(&tx, ip.clone(), &ping_map, uuid.clone()).await; }; } Ok(()) } -async fn handle_broadcast_send(tx: &Sender, ip: String, ping_map: PingMap, uuid: String) { +async fn handle_broadcast_send(tx: &Sender, ip: String, ping_map: &PingMap, uuid: String) { debug!("sending pingsuccess message"); ping_map.insert(uuid.clone(), (ip.clone(), true)); let _ = tx.send(BroadcastCommands::PingSuccess(ip)); @@ -52,8 +53,8 @@ pub enum BroadcastCommands { PingSuccess(String) } -pub async fn status_websocket(mut socket: WebSocket, tx: Sender, ping_map: PingMap) { - warn!("{:?}", ping_map); +pub async fn status_websocket(mut socket: WebSocket, state: Arc) { + warn!("{:?}", state.ping_map); trace!("wait for ws message (uuid)"); let msg = socket.recv().await; @@ -62,13 +63,14 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender { debug!("already started"); + // TODO: What's better? // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); // socket.close().await.unwrap(); socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); @@ -77,7 +79,7 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender Date: Mon, 30 Oct 2023 12:22:31 +0100 Subject: changed pingmap tuple to own struct --- src/main.rs | 2 +- src/routes/start.rs | 3 ++- src/services/ping.rs | 16 +++++++++++----- 3 files changed, 14 insertions(+), 7 deletions(-) (limited to 'src/main.rs') diff --git a/src/main.rs b/src/main.rs index ee540af..e96b736 100644 --- a/src/main.rs +++ b/src/main.rs @@ -50,7 +50,7 @@ async fn main() { let (tx, _) = channel(32); - let ping_map: DashMap = DashMap::new(); + let ping_map: PingMap = DashMap::new(); let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map }); diff --git a/src/routes/start.rs b/src/routes/start.rs index 3bccb0f..c2c9378 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs @@ -11,6 +11,7 @@ use crate::config::SETTINGS; use crate::wol::{create_buffer, send_packet}; use crate::db::Device; use crate::error::WebolError; +use crate::services::ping::PingValue; #[axum_macros::debug_handler] pub async fn start(State(state): State>, headers: HeaderMap, Json(payload): Json) -> Result, WebolError> { @@ -46,7 +47,7 @@ pub async fn start(State(state): State>, headers: HeaderMap let uuid_genc = uuid_gen.clone(); tokio::spawn(async move { debug!("Init ping service"); - state.ping_map.insert(uuid_gen.clone(), (device.ip.clone(), false)); + state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); warn!("{:?}", state.ping_map); diff --git a/src/services/ping.rs b/src/services/ping.rs index 04ad511..f0cc4a3 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs @@ -10,7 +10,13 @@ use crate::AppState; use crate::error::WebolError; -pub type PingMap = DashMap; +pub type PingMap = DashMap; + +#[derive(Debug, Clone)] +pub struct PingValue { + pub ip: String, + pub online: bool +} pub async fn spawn(tx: Sender, ip: String, uuid: String, ping_map: &PingMap) -> Result<(), WebolError> { let payload = [0; 8]; @@ -32,7 +38,7 @@ pub async fn spawn(tx: Sender, ip: String, uuid: String, ping let (_, duration) = ping.unwrap(); debug!("Ping took {:?}", duration); cont = false; - handle_broadcast_send(&tx, ip.clone(), &ping_map, uuid.clone()).await; + handle_broadcast_send(&tx, ip.clone(), ping_map, uuid.clone()).await; }; } @@ -41,7 +47,7 @@ pub async fn spawn(tx: Sender, ip: String, uuid: String, ping async fn handle_broadcast_send(tx: &Sender, ip: String, ping_map: &PingMap, uuid: String) { debug!("sending pingsuccess message"); - ping_map.insert(uuid.clone(), (ip.clone(), true)); + ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); let _ = tx.send(BroadcastCommands::PingSuccess(ip)); tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; trace!("remove {} from ping_map", uuid); @@ -67,7 +73,7 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc) { trace!("got device: {:?}", device); - match device.1 { + match device.online { true => { debug!("already started"); // TODO: What's better? @@ -76,7 +82,7 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc) { socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); }, false => { - let ip = device.0.to_owned(); + let ip = device.ip.to_owned(); loop{ trace!("wait for tx message"); let message = state.ping_send.subscribe().recv().await.unwrap(); -- cgit v1.2.3