diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/db.rs | 2 | ||||
-rw-r--r-- | src/error.rs | 27 | ||||
-rw-r--r-- | src/routes/device.rs | 10 | ||||
-rw-r--r-- | src/routes/start.rs | 2 | ||||
-rw-r--r-- | src/services/ping.rs | 92 |
5 files changed, 59 insertions, 74 deletions
@@ -8,7 +8,7 @@ use tracing::{debug, info}; | |||
8 | #[cfg(not(debug_assertions))] | 8 | #[cfg(not(debug_assertions))] |
9 | use crate::config::SETTINGS; | 9 | use crate::config::SETTINGS; |
10 | 10 | ||
11 | #[derive(Serialize)] | 11 | #[derive(Serialize, Debug)] |
12 | pub struct Device { | 12 | pub struct Device { |
13 | pub id: String, | 13 | pub id: String, |
14 | pub mac: String, | 14 | pub mac: String, |
diff --git a/src/error.rs b/src/error.rs index 1592a78..5b82534 100644 --- a/src/error.rs +++ b/src/error.rs | |||
@@ -1,4 +1,3 @@ | |||
1 | use std::error::Error; | ||
2 | use std::io; | 1 | use std::io; |
3 | use axum::http::StatusCode; | 2 | use axum::http::StatusCode; |
4 | use axum::Json; | 3 | use axum::Json; |
@@ -10,9 +9,7 @@ use crate::auth::AuthError; | |||
10 | #[derive(Debug)] | 9 | #[derive(Debug)] |
11 | pub enum WebolError { | 10 | pub enum WebolError { |
12 | Generic, | 11 | Generic, |
13 | // User(UserError), | ||
14 | Auth(AuthError), | 12 | Auth(AuthError), |
15 | Ping(surge_ping::SurgeError), | ||
16 | DB(sqlx::Error), | 13 | DB(sqlx::Error), |
17 | IpParse(<std::net::IpAddr as std::str::FromStr>::Err), | 14 | IpParse(<std::net::IpAddr as std::str::FromStr>::Err), |
18 | BufferParse(std::num::ParseIntError), | 15 | BufferParse(std::num::ParseIntError), |
@@ -22,13 +19,10 @@ pub enum WebolError { | |||
22 | impl IntoResponse for WebolError { | 19 | impl IntoResponse for WebolError { |
23 | fn into_response(self) -> Response { | 20 | fn into_response(self) -> Response { |
24 | let (status, error_message) = match self { | 21 | let (status, error_message) = match self { |
25 | Self::Auth(err) => err.get(), | 22 | Self::Auth(err) => { |
26 | // Self::User(err) => err.get(), | 23 | err.get() |
27 | Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), | ||
28 | Self::Ping(err) => { | ||
29 | error!("Ping: {}", err.source().unwrap()); | ||
30 | (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") | ||
31 | }, | 24 | }, |
25 | Self::Generic => (StatusCode::INTERNAL_SERVER_ERROR, ""), | ||
32 | Self::IpParse(err) => { | 26 | Self::IpParse(err) => { |
33 | error!("server error: {}", err.to_string()); | 27 | error!("server error: {}", err.to_string()); |
34 | (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") | 28 | (StatusCode::INTERNAL_SERVER_ERROR, "Server Error") |
@@ -51,17 +45,4 @@ impl IntoResponse for WebolError { | |||
51 | })); | 45 | })); |
52 | (status, body).into_response() | 46 | (status, body).into_response() |
53 | } | 47 | } |
54 | } | 48 | } \ No newline at end of file |
55 | |||
56 | // #[derive(Debug)] | ||
57 | // pub enum UserError { | ||
58 | // UnknownUUID, | ||
59 | // } | ||
60 | // | ||
61 | // impl UserError { | ||
62 | // pub fn get(self) -> (StatusCode, &'static str) { | ||
63 | // match self { | ||
64 | // Self::UnknownUUID => (StatusCode::UNPROCESSABLE_ENTITY, "Unknown UUID"), | ||
65 | // } | ||
66 | // } | ||
67 | // } | ||
diff --git a/src/routes/device.rs b/src/routes/device.rs index 7353733..1eeff0b 100644 --- a/src/routes/device.rs +++ b/src/routes/device.rs | |||
@@ -4,13 +4,13 @@ use axum::headers::HeaderMap; | |||
4 | use axum::Json; | 4 | use axum::Json; |
5 | use serde::{Deserialize, Serialize}; | 5 | use serde::{Deserialize, Serialize}; |
6 | use serde_json::{json, Value}; | 6 | use serde_json::{json, Value}; |
7 | use tracing::info; | 7 | use tracing::{debug, info}; |
8 | use crate::auth::auth; | 8 | use crate::auth::auth; |
9 | use crate::db::Device; | 9 | use crate::db::Device; |
10 | use crate::error::WebolError; | 10 | use crate::error::WebolError; |
11 | 11 | ||
12 | pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: HeaderMap, Json(payload): Json<GetDevicePayload>) -> Result<Json<Value>, WebolError> { | 12 | pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: HeaderMap, Json(payload): Json<GetDevicePayload>) -> Result<Json<Value>, WebolError> { |
13 | info!("GET request"); | 13 | info!("add device {}", payload.id); |
14 | let secret = headers.get("authorization"); | 14 | let secret = headers.get("authorization"); |
15 | if auth(secret).map_err(WebolError::Auth)? { | 15 | if auth(secret).map_err(WebolError::Auth)? { |
16 | let device = sqlx::query_as!( | 16 | let device = sqlx::query_as!( |
@@ -23,6 +23,8 @@ pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: Head | |||
23 | payload.id | 23 | payload.id |
24 | ).fetch_one(&state.db).await.map_err(WebolError::DB)?; | 24 | ).fetch_one(&state.db).await.map_err(WebolError::DB)?; |
25 | 25 | ||
26 | debug!("got device {:?}", device); | ||
27 | |||
26 | Ok(Json(json!(device))) | 28 | Ok(Json(json!(device))) |
27 | } else { | 29 | } else { |
28 | Err(WebolError::Generic) | 30 | Err(WebolError::Generic) |
@@ -35,7 +37,7 @@ pub struct GetDevicePayload { | |||
35 | } | 37 | } |
36 | 38 | ||
37 | pub async fn put_device(State(state): State<Arc<crate::AppState>>, headers: HeaderMap, Json(payload): Json<PutDevicePayload>) -> Result<Json<Value>, WebolError> { | 39 | pub async fn put_device(State(state): State<Arc<crate::AppState>>, headers: HeaderMap, Json(payload): Json<PutDevicePayload>) -> Result<Json<Value>, WebolError> { |
38 | info!("PUT request"); | 40 | info!("add device {} ({}, {}, {})", payload.id, payload.mac, payload.broadcast_addr, payload.ip); |
39 | let secret = headers.get("authorization"); | 41 | let secret = headers.get("authorization"); |
40 | if auth(secret).map_err(WebolError::Auth)? { | 42 | if auth(secret).map_err(WebolError::Auth)? { |
41 | sqlx::query!( | 43 | sqlx::query!( |
@@ -69,7 +71,7 @@ pub struct PutDeviceResponse { | |||
69 | } | 71 | } |
70 | 72 | ||
71 | pub async fn post_device(State(state): State<Arc<crate::AppState>>, headers: HeaderMap, Json(payload): Json<PostDevicePayload>) -> Result<Json<Value>, WebolError> { | 73 | pub async fn post_device(State(state): State<Arc<crate::AppState>>, headers: HeaderMap, Json(payload): Json<PostDevicePayload>) -> Result<Json<Value>, WebolError> { |
72 | info!("POST request"); | 74 | info!("edit device {} ({}, {}, {})", payload.id, payload.mac, payload.broadcast_addr, payload.ip); |
73 | let secret = headers.get("authorization"); | 75 | let secret = headers.get("authorization"); |
74 | if auth(secret).map_err(WebolError::Auth)? { | 76 | if auth(secret).map_err(WebolError::Auth)? { |
75 | let device = sqlx::query_as!( | 77 | let device = sqlx::query_as!( |
diff --git a/src/routes/start.rs b/src/routes/start.rs index c2c9378..9cd358b 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs | |||
@@ -49,8 +49,6 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap | |||
49 | debug!("Init ping service"); | 49 | debug!("Init ping service"); |
50 | state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); | 50 | state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); |
51 | 51 | ||
52 | warn!("{:?}", state.ping_map); | ||
53 | |||
54 | crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await | 52 | crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await |
55 | }); | 53 | }); |
56 | Some(uuid_genc) | 54 | Some(uuid_genc) |
diff --git a/src/services/ping.rs b/src/services/ping.rs index f0cc4a3..a26dacc 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs | |||
@@ -1,14 +1,13 @@ | |||
1 | use std::borrow::Cow; | ||
2 | use std::sync::Arc; | 1 | use std::sync::Arc; |
3 | 2 | ||
4 | use axum::extract::{ws::WebSocket}; | 3 | use axum::extract::{ws::WebSocket}; |
5 | use axum::extract::ws::{CloseFrame, Message}; | 4 | use axum::extract::ws::Message; |
6 | use dashmap::DashMap; | 5 | use dashmap::DashMap; |
6 | use time::{Duration, Instant}; | ||
7 | use tokio::sync::broadcast::{Sender}; | 7 | use tokio::sync::broadcast::{Sender}; |
8 | use tracing::{debug, trace, warn}; | 8 | use tracing::{debug, error, trace}; |
9 | use crate::AppState; | 9 | use crate::AppState; |
10 | 10 | use crate::config::SETTINGS; | |
11 | use crate::error::WebolError; | ||
12 | 11 | ||
13 | pub type PingMap = DashMap<String, PingValue>; | 12 | pub type PingMap = DashMap<String, PingValue>; |
14 | 13 | ||
@@ -18,92 +17,97 @@ pub struct PingValue { | |||
18 | pub online: bool | 17 | pub online: bool |
19 | } | 18 | } |
20 | 19 | ||
21 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) -> Result<(), WebolError> { | 20 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) { |
21 | let timer = Instant::now(); | ||
22 | let payload = [0; 8]; | 22 | let payload = [0; 8]; |
23 | 23 | ||
24 | // TODO: Better while | ||
25 | let mut cont = true; | 24 | let mut cont = true; |
26 | while cont { | 25 | while cont { |
27 | let ping = surge_ping::ping( | 26 | let ping = surge_ping::ping( |
28 | ip.parse().map_err(WebolError::IpParse)?, | 27 | ip.parse().expect("bad ip"), |
29 | &payload | 28 | &payload |
30 | ).await; | 29 | ).await; |
31 | 30 | ||
32 | if let Err(ping) = ping { | 31 | if let Err(ping) = ping { |
33 | cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); | 32 | cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); |
34 | if !cont { | 33 | if !cont { |
35 | return Err(ping).map_err(WebolError::Ping) | 34 | error!("{}", ping.to_string()); |
35 | } | ||
36 | if timer.elapsed() >= Duration::minutes(SETTINGS.get_int("pingtimeout").unwrap_or(10)) { | ||
37 | let _ = tx.send(BroadcastCommands::PingTimeout(uuid.clone())); | ||
38 | trace!("remove {} from ping_map after timeout", uuid); | ||
39 | ping_map.remove(&uuid); | ||
40 | cont = false; | ||
36 | } | 41 | } |
37 | } else { | 42 | } else { |
38 | let (_, duration) = ping.unwrap(); | 43 | let (_, duration) = ping.map_err(|err| error!("{}", err.to_string())).expect("fatal error"); |
39 | debug!("Ping took {:?}", duration); | 44 | debug!("Ping took {:?}", duration); |
40 | cont = false; | 45 | cont = false; |
41 | handle_broadcast_send(&tx, ip.clone(), ping_map, uuid.clone()).await; | 46 | handle_broadcast_send(&tx, ip.clone(), ping_map, uuid.clone()).await; |
42 | }; | 47 | }; |
43 | } | 48 | } |
44 | |||
45 | Ok(()) | ||
46 | } | 49 | } |
47 | 50 | ||
48 | async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: &PingMap, uuid: String) { | 51 | async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: &PingMap, uuid: String) { |
49 | debug!("sending pingsuccess message"); | 52 | debug!("send pingsuccess message"); |
50 | ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); | 53 | ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); |
51 | let _ = tx.send(BroadcastCommands::PingSuccess(ip)); | 54 | let _ = tx.send(BroadcastCommands::PingSuccess(uuid.clone())); |
52 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; | 55 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; |
53 | trace!("remove {} from ping_map", uuid); | 56 | trace!("remove {} from ping_map after success", uuid); |
54 | ping_map.remove(&uuid); | 57 | ping_map.remove(&uuid); |
55 | } | 58 | } |
56 | 59 | ||
57 | #[derive(Clone, Debug)] | 60 | #[derive(Clone, Debug)] |
58 | pub enum BroadcastCommands { | 61 | pub enum BroadcastCommands { |
59 | PingSuccess(String) | 62 | PingSuccess(String), |
63 | PingTimeout(String) | ||
60 | } | 64 | } |
61 | 65 | ||
62 | pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { | 66 | pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { |
63 | warn!("{:?}", state.ping_map); | ||
64 | |||
65 | trace!("wait for ws message (uuid)"); | 67 | trace!("wait for ws message (uuid)"); |
66 | let msg = socket.recv().await; | 68 | let msg = socket.recv().await; |
67 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); | 69 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); |
68 | 70 | ||
69 | trace!("Search for uuid: {:?}", uuid); | 71 | trace!("Search for uuid: {:?}", uuid); |
70 | 72 | ||
71 | // TODO: Handle Error | 73 | match state.ping_map.get(&uuid) { |
72 | let device = state.ping_map.get(&uuid).unwrap().to_owned(); | 74 | Some(device) => { |
75 | debug!("got device: {} (online: {})", device.ip, device.online); | ||
76 | let _ = socket.send(process_device(state.clone(), uuid, device.to_owned()).await).await; | ||
77 | }, | ||
78 | None => { | ||
79 | debug!("didn't find any device"); | ||
80 | let _ = socket.send(Message::Text(format!("notfound_{}", uuid))).await; | ||
81 | }, | ||
82 | }; | ||
73 | 83 | ||
74 | trace!("got device: {:?}", device); | 84 | let _ = socket.close().await; |
85 | } | ||
75 | 86 | ||
87 | async fn process_device(state: Arc<AppState>, uuid: String, device: PingValue) -> Message { | ||
76 | match device.online { | 88 | match device.online { |
77 | true => { | 89 | true => { |
78 | debug!("already started"); | 90 | debug!("already started"); |
79 | // TODO: What's better? | 91 | Message::Text(format!("start_{}", uuid)) |
80 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); | ||
81 | // socket.close().await.unwrap(); | ||
82 | socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); | ||
83 | }, | 92 | }, |
84 | false => { | 93 | false => { |
85 | let ip = device.ip.to_owned(); | ||
86 | loop{ | 94 | loop{ |
87 | trace!("wait for tx message"); | 95 | trace!("wait for tx message"); |
88 | let message = state.ping_send.subscribe().recv().await.unwrap(); | 96 | let message = state.ping_send.subscribe().recv().await.expect("fatal error"); |
89 | trace!("GOT = {:?}", message); | 97 | trace!("got message {:?}", message); |
90 | // if let BroadcastCommands::PingSuccess(msg_ip) = message { | 98 | return match message { |
91 | // if msg_ip == ip { | 99 | BroadcastCommands::PingSuccess(msg_uuid) => { |
92 | // trace!("message == ip"); | 100 | if msg_uuid != uuid { continue; } |
93 | // break; | 101 | trace!("message == uuid success"); |
94 | // } | 102 | Message::Text(format!("start_{}", uuid)) |
95 | // } | 103 | }, |
96 | let BroadcastCommands::PingSuccess(msg_ip) = message; | 104 | BroadcastCommands::PingTimeout(msg_uuid) => { |
97 | if msg_ip == ip { | 105 | if msg_uuid != uuid { continue; } |
98 | trace!("message == ip"); | 106 | trace!("message == uuid timeout"); |
99 | break; | 107 | Message::Text(format!("timeout_{}", uuid)) |
108 | } | ||
100 | } | 109 | } |
101 | }; | 110 | } |
102 | |||
103 | socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); | ||
104 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); | ||
105 | // socket.close().await.unwrap(); | ||
106 | warn!("{:?}", state.ping_map); | ||
107 | } | 111 | } |
108 | } | 112 | } |
109 | } \ No newline at end of file | 113 | } \ No newline at end of file |