diff options
author | FxQnLr <[email protected]> | 2024-02-18 21:16:46 +0100 |
---|---|---|
committer | FxQnLr <[email protected]> | 2024-02-18 21:16:46 +0100 |
commit | 2f9f18b80a9e2134f674f345e48a5f21de5efadd (patch) | |
tree | c4202bb5c1a490233e89d928cf8c5b91258d4c90 /src/routes | |
parent | 016fa3a31f8847d3f52800941b7f8fe5ef872240 (diff) | |
download | webol-2f9f18b80a9e2134f674f345e48a5f21de5efadd.tar webol-2f9f18b80a9e2134f674f345e48a5f21de5efadd.tar.gz webol-2f9f18b80a9e2134f674f345e48a5f21de5efadd.zip |
Refactor stuff. Use Postgres Types
Diffstat (limited to 'src/routes')
-rw-r--r-- | src/routes/device.rs | 19 | ||||
-rw-r--r-- | src/routes/mod.rs | 3 | ||||
-rw-r--r-- | src/routes/start.rs | 78 | ||||
-rw-r--r-- | src/routes/status.rs | 79 |
4 files changed, 128 insertions, 51 deletions
diff --git a/src/routes/device.rs b/src/routes/device.rs index 5ca574a..2f0093d 100644 --- a/src/routes/device.rs +++ b/src/routes/device.rs | |||
@@ -4,9 +4,11 @@ use crate::error::Error; | |||
4 | use axum::extract::State; | 4 | use axum::extract::State; |
5 | use axum::http::HeaderMap; | 5 | use axum::http::HeaderMap; |
6 | use axum::Json; | 6 | use axum::Json; |
7 | use mac_address::MacAddress; | ||
7 | use serde::{Deserialize, Serialize}; | 8 | use serde::{Deserialize, Serialize}; |
8 | use serde_json::{json, Value}; | 9 | use serde_json::{json, Value}; |
9 | use std::sync::Arc; | 10 | use sqlx::types::ipnetwork::IpNetwork; |
11 | use std::{sync::Arc, str::FromStr}; | ||
10 | use tracing::{debug, info}; | 12 | use tracing::{debug, info}; |
11 | 13 | ||
12 | pub async fn get( | 14 | pub async fn get( |
@@ -14,7 +16,7 @@ pub async fn get( | |||
14 | headers: HeaderMap, | 16 | headers: HeaderMap, |
15 | Json(payload): Json<GetDevicePayload>, | 17 | Json(payload): Json<GetDevicePayload>, |
16 | ) -> Result<Json<Value>, Error> { | 18 | ) -> Result<Json<Value>, Error> { |
17 | info!("add device {}", payload.id); | 19 | info!("get device {}", payload.id); |
18 | let secret = headers.get("authorization"); | 20 | let secret = headers.get("authorization"); |
19 | let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); | 21 | let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); |
20 | if authorized { | 22 | if authorized { |
@@ -52,18 +54,21 @@ pub async fn put( | |||
52 | "add device {} ({}, {}, {})", | 54 | "add device {} ({}, {}, {})", |
53 | payload.id, payload.mac, payload.broadcast_addr, payload.ip | 55 | payload.id, payload.mac, payload.broadcast_addr, payload.ip |
54 | ); | 56 | ); |
57 | |||
55 | let secret = headers.get("authorization"); | 58 | let secret = headers.get("authorization"); |
56 | let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); | 59 | let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); |
57 | if authorized { | 60 | if authorized { |
61 | let ip = IpNetwork::from_str(&payload.ip)?; | ||
62 | let mac = MacAddress::from_str(&payload.mac)?; | ||
58 | sqlx::query!( | 63 | sqlx::query!( |
59 | r#" | 64 | r#" |
60 | INSERT INTO devices (id, mac, broadcast_addr, ip) | 65 | INSERT INTO devices (id, mac, broadcast_addr, ip) |
61 | VALUES ($1, $2, $3, $4); | 66 | VALUES ($1, $2, $3, $4); |
62 | "#, | 67 | "#, |
63 | payload.id, | 68 | payload.id, |
64 | payload.mac, | 69 | mac, |
65 | payload.broadcast_addr, | 70 | payload.broadcast_addr, |
66 | payload.ip | 71 | ip |
67 | ) | 72 | ) |
68 | .execute(&state.db) | 73 | .execute(&state.db) |
69 | .await?; | 74 | .await?; |
@@ -99,6 +104,8 @@ pub async fn post( | |||
99 | let secret = headers.get("authorization"); | 104 | let secret = headers.get("authorization"); |
100 | let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); | 105 | let authorized = matches!(auth(&state.config, secret)?, crate::auth::Response::Success); |
101 | if authorized { | 106 | if authorized { |
107 | let ip = IpNetwork::from_str(&payload.ip)?; | ||
108 | let mac = MacAddress::from_str(&payload.mac)?; | ||
102 | let device = sqlx::query_as!( | 109 | let device = sqlx::query_as!( |
103 | Device, | 110 | Device, |
104 | r#" | 111 | r#" |
@@ -106,9 +113,9 @@ pub async fn post( | |||
106 | SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 | 113 | SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 |
107 | RETURNING id, mac, broadcast_addr, ip, times; | 114 | RETURNING id, mac, broadcast_addr, ip, times; |
108 | "#, | 115 | "#, |
109 | payload.mac, | 116 | mac, |
110 | payload.broadcast_addr, | 117 | payload.broadcast_addr, |
111 | payload.ip, | 118 | ip, |
112 | payload.id | 119 | payload.id |
113 | ) | 120 | ) |
114 | .fetch_one(&state.db) | 121 | .fetch_one(&state.db) |
diff --git a/src/routes/mod.rs b/src/routes/mod.rs deleted file mode 100644 index d5ab0d6..0000000 --- a/src/routes/mod.rs +++ /dev/null | |||
@@ -1,3 +0,0 @@ | |||
1 | pub mod start; | ||
2 | pub mod device; | ||
3 | pub mod status; \ No newline at end of file | ||
diff --git a/src/routes/start.rs b/src/routes/start.rs index ec4f98f..4888325 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs | |||
@@ -12,7 +12,6 @@ use std::sync::Arc; | |||
12 | use tracing::{debug, info}; | 12 | use tracing::{debug, info}; |
13 | use uuid::Uuid; | 13 | use uuid::Uuid; |
14 | 14 | ||
15 | #[axum_macros::debug_handler] | ||
16 | pub async fn start( | 15 | pub async fn start( |
17 | State(state): State<Arc<crate::AppState>>, | 16 | State(state): State<Arc<crate::AppState>>, |
18 | headers: HeaderMap, | 17 | headers: HeaderMap, |
@@ -41,45 +40,11 @@ pub async fn start( | |||
41 | let _ = send_packet( | 40 | let _ = send_packet( |
42 | bind_addr, | 41 | bind_addr, |
43 | &device.broadcast_addr, | 42 | &device.broadcast_addr, |
44 | &create_buffer(&device.mac)?, | 43 | &create_buffer(&device.mac.to_string())?, |
45 | )?; | 44 | )?; |
46 | let dev_id = device.id.clone(); | 45 | let dev_id = device.id.clone(); |
47 | let uuid = if payload.ping.is_some_and(|ping| ping) { | 46 | let uuid = if payload.ping.is_some_and(|ping| ping) { |
48 | let mut uuid: Option<String> = None; | 47 | Some(setup_ping(state, device)) |
49 | for (key, value) in state.ping_map.clone() { | ||
50 | if value.ip == device.ip { | ||
51 | debug!("service already exists"); | ||
52 | uuid = Some(key); | ||
53 | break; | ||
54 | } | ||
55 | } | ||
56 | let uuid_gen = match uuid { | ||
57 | Some(u) => u, | ||
58 | None => Uuid::new_v4().to_string(), | ||
59 | }; | ||
60 | let uuid_genc = uuid_gen.clone(); | ||
61 | |||
62 | tokio::spawn(async move { | ||
63 | debug!("init ping service"); | ||
64 | state.ping_map.insert( | ||
65 | uuid_gen.clone(), | ||
66 | PingValue { | ||
67 | ip: device.ip.clone(), | ||
68 | online: false, | ||
69 | }, | ||
70 | ); | ||
71 | |||
72 | crate::services::ping::spawn( | ||
73 | state.ping_send.clone(), | ||
74 | &state.config, | ||
75 | device, | ||
76 | uuid_gen.clone(), | ||
77 | &state.ping_map, | ||
78 | &state.db, | ||
79 | ) | ||
80 | .await; | ||
81 | }); | ||
82 | Some(uuid_genc) | ||
83 | } else { | 48 | } else { |
84 | None | 49 | None |
85 | }; | 50 | }; |
@@ -93,6 +58,45 @@ pub async fn start( | |||
93 | } | 58 | } |
94 | } | 59 | } |
95 | 60 | ||
61 | fn setup_ping(state: Arc<crate::AppState>, device: Device) -> String { | ||
62 | let mut uuid: Option<String> = None; | ||
63 | for (key, value) in state.ping_map.clone() { | ||
64 | if value.ip == device.ip { | ||
65 | debug!("service already exists"); | ||
66 | uuid = Some(key); | ||
67 | break; | ||
68 | } | ||
69 | } | ||
70 | let uuid_gen = match uuid { | ||
71 | Some(u) => u, | ||
72 | None => Uuid::new_v4().to_string(), | ||
73 | }; | ||
74 | let uuid_ret = uuid_gen.clone(); | ||
75 | |||
76 | debug!("init ping service"); | ||
77 | state.ping_map.insert( | ||
78 | uuid_gen.clone(), | ||
79 | PingValue { | ||
80 | ip: device.ip, | ||
81 | online: false, | ||
82 | }, | ||
83 | ); | ||
84 | |||
85 | tokio::spawn(async move { | ||
86 | crate::services::ping::spawn( | ||
87 | state.ping_send.clone(), | ||
88 | &state.config, | ||
89 | device, | ||
90 | uuid_gen, | ||
91 | &state.ping_map, | ||
92 | &state.db, | ||
93 | ) | ||
94 | .await; | ||
95 | }); | ||
96 | |||
97 | uuid_ret | ||
98 | } | ||
99 | |||
96 | #[derive(Deserialize)] | 100 | #[derive(Deserialize)] |
97 | pub struct Payload { | 101 | pub struct Payload { |
98 | id: String, | 102 | id: String, |
diff --git a/src/routes/status.rs b/src/routes/status.rs index 31ef996..0e25f7d 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs | |||
@@ -1,10 +1,79 @@ | |||
1 | use std::sync::Arc; | 1 | use crate::services::ping::BroadcastCommand; |
2 | use crate::AppState; | ||
3 | use axum::extract::ws::{Message, WebSocket}; | ||
2 | use axum::extract::{State, WebSocketUpgrade}; | 4 | use axum::extract::{State, WebSocketUpgrade}; |
3 | use axum::response::Response; | 5 | use axum::response::Response; |
4 | use crate::AppState; | 6 | use sqlx::PgPool; |
5 | use crate::services::ping::status_websocket; | 7 | use std::sync::Arc; |
8 | use tracing::{debug, trace}; | ||
6 | 9 | ||
7 | #[axum_macros::debug_handler] | ||
8 | pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { | 10 | pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { |
9 | ws.on_upgrade(move |socket| status_websocket(socket, state)) | 11 | ws.on_upgrade(move |socket| websocket(socket, state)) |
12 | } | ||
13 | |||
14 | pub async fn websocket(mut socket: WebSocket, state: Arc<AppState>) { | ||
15 | trace!("wait for ws message (uuid)"); | ||
16 | let msg = socket.recv().await; | ||
17 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); | ||
18 | |||
19 | trace!("Search for uuid: {}", uuid); | ||
20 | |||
21 | let eta = get_eta(&state.db).await; | ||
22 | let _ = socket | ||
23 | .send(Message::Text(format!("eta_{eta}_{uuid}"))) | ||
24 | .await; | ||
25 | |||
26 | let device_exists = state.ping_map.contains_key(&uuid); | ||
27 | if device_exists { | ||
28 | let _ = socket | ||
29 | .send(receive_ping_broadcast(state.clone(), uuid).await) | ||
30 | .await; | ||
31 | } else { | ||
32 | debug!("didn't find any device"); | ||
33 | let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await; | ||
34 | }; | ||
35 | |||
36 | let _ = socket.close().await; | ||
37 | } | ||
38 | |||
39 | async fn receive_ping_broadcast(state: Arc<AppState>, uuid: String) -> Message { | ||
40 | let pm = state.ping_map.clone().into_read_only(); | ||
41 | let device = pm.get(&uuid).expect("fatal error"); | ||
42 | debug!("got device: {} (online: {})", device.ip, device.online); | ||
43 | if device.online { | ||
44 | debug!("already started"); | ||
45 | Message::Text(BroadcastCommand::success(uuid).to_string()) | ||
46 | } else { | ||
47 | loop { | ||
48 | trace!("wait for tx message"); | ||
49 | let message = state | ||
50 | .ping_send | ||
51 | .subscribe() | ||
52 | .recv() | ||
53 | .await | ||
54 | .expect("fatal error"); | ||
55 | trace!("got message {:?}", message); | ||
56 | |||
57 | if message.uuid != uuid { | ||
58 | continue; | ||
59 | } | ||
60 | trace!("message == uuid success"); | ||
61 | return Message::Text(message.to_string()); | ||
62 | } | ||
63 | } | ||
64 | } | ||
65 | |||
66 | async fn get_eta(db: &PgPool) -> i64 { | ||
67 | let query = sqlx::query!(r#"SELECT times FROM devices;"#) | ||
68 | .fetch_one(db) | ||
69 | .await | ||
70 | .unwrap(); | ||
71 | |||
72 | let times = if let Some(times) = query.times { | ||
73 | times | ||
74 | } else { | ||
75 | vec![0] | ||
76 | }; | ||
77 | |||
78 | times.iter().sum::<i64>() / i64::try_from(times.len()).unwrap() | ||
10 | } | 79 | } |