diff options
-rw-r--r-- | migrations/20231009123228_devices.sql | 7 | ||||
-rw-r--r-- | src/db.rs | 3 | ||||
-rw-r--r-- | src/main.rs | 2 | ||||
-rw-r--r-- | src/routes/device.rs | 16 | ||||
-rw-r--r-- | src/routes/start.rs | 11 | ||||
-rw-r--r-- | src/routes/status.rs | 2 | ||||
-rw-r--r-- | src/services/ping.rs | 20 |
7 files changed, 34 insertions, 27 deletions
diff --git a/migrations/20231009123228_devices.sql b/migrations/20231009123228_devices.sql index 9d0c7ca..b911b19 100644 --- a/migrations/20231009123228_devices.sql +++ b/migrations/20231009123228_devices.sql | |||
@@ -1,7 +1,8 @@ | |||
1 | -- Add migration script here | 1 | -- Add migration script here |
2 | CREATE TABLE IF NOT EXISTS "devices" | 2 | CREATE TABLE IF NOT EXISTS "devices" |
3 | ( | 3 | ( |
4 | "id" TEXT PRIMARY KEY NOT NULL, | 4 | "id" VARCHAR(255) PRIMARY KEY NOT NULL, |
5 | "mac" TEXT NOT NULL, | 5 | "mac" VARCHAR(17) NOT NULL, |
6 | "broadcast_addr" TEXT NOT NULL | 6 | "broadcast_addr" VARCHAR(39) NOT NULL, |
7 | "ip" VARCHAR(39) NOT NULL | ||
7 | ) | 8 | ) |
@@ -12,7 +12,8 @@ use crate::config::SETTINGS; | |||
12 | pub struct Device { | 12 | pub struct Device { |
13 | pub id: String, | 13 | pub id: String, |
14 | pub mac: String, | 14 | pub mac: String, |
15 | pub broadcast_addr: String | 15 | pub broadcast_addr: String, |
16 | pub ip: String | ||
16 | } | 17 | } |
17 | 18 | ||
18 | pub async fn init_db_pool() -> PgPool { | 19 | pub async fn init_db_pool() -> PgPool { |
diff --git a/src/main.rs b/src/main.rs index 762a817..ee540af 100644 --- a/src/main.rs +++ b/src/main.rs | |||
@@ -52,7 +52,7 @@ async fn main() { | |||
52 | 52 | ||
53 | let ping_map: DashMap<String, (String, bool)> = DashMap::new(); | 53 | let ping_map: DashMap<String, (String, bool)> = DashMap::new(); |
54 | 54 | ||
55 | let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map: Arc::new(ping_map) }); | 55 | let shared_state = Arc::new(AppState { db, ping_send: tx, ping_map }); |
56 | 56 | ||
57 | let app = Router::new() | 57 | let app = Router::new() |
58 | .route("/start", post(start)) | 58 | .route("/start", post(start)) |
diff --git a/src/routes/device.rs b/src/routes/device.rs index 248d1e0..7353733 100644 --- a/src/routes/device.rs +++ b/src/routes/device.rs | |||
@@ -16,7 +16,7 @@ pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: Head | |||
16 | let device = sqlx::query_as!( | 16 | let device = sqlx::query_as!( |
17 | Device, | 17 | Device, |
18 | r#" | 18 | r#" |
19 | SELECT id, mac, broadcast_addr | 19 | SELECT id, mac, broadcast_addr, ip |
20 | FROM devices | 20 | FROM devices |
21 | WHERE id = $1; | 21 | WHERE id = $1; |
22 | "#, | 22 | "#, |
@@ -40,12 +40,13 @@ pub async fn put_device(State(state): State<Arc<crate::AppState>>, headers: Head | |||
40 | if auth(secret).map_err(WebolError::Auth)? { | 40 | if auth(secret).map_err(WebolError::Auth)? { |
41 | sqlx::query!( | 41 | sqlx::query!( |
42 | r#" | 42 | r#" |
43 | INSERT INTO devices (id, mac, broadcast_addr) | 43 | INSERT INTO devices (id, mac, broadcast_addr, ip) |
44 | VALUES ($1, $2, $3); | 44 | VALUES ($1, $2, $3, $4); |
45 | "#, | 45 | "#, |
46 | payload.id, | 46 | payload.id, |
47 | payload.mac, | 47 | payload.mac, |
48 | payload.broadcast_addr | 48 | payload.broadcast_addr, |
49 | payload.ip | ||
49 | ).execute(&state.db).await.map_err(WebolError::DB)?; | 50 | ).execute(&state.db).await.map_err(WebolError::DB)?; |
50 | 51 | ||
51 | Ok(Json(json!(PutDeviceResponse { success: true }))) | 52 | Ok(Json(json!(PutDeviceResponse { success: true }))) |
@@ -59,6 +60,7 @@ pub struct PutDevicePayload { | |||
59 | id: String, | 60 | id: String, |
60 | mac: String, | 61 | mac: String, |
61 | broadcast_addr: String, | 62 | broadcast_addr: String, |
63 | ip: String | ||
62 | } | 64 | } |
63 | 65 | ||
64 | #[derive(Serialize)] | 66 | #[derive(Serialize)] |
@@ -74,11 +76,12 @@ pub async fn post_device(State(state): State<Arc<crate::AppState>>, headers: Hea | |||
74 | Device, | 76 | Device, |
75 | r#" | 77 | r#" |
76 | UPDATE devices | 78 | UPDATE devices |
77 | SET mac = $1, broadcast_addr = $2 WHERE id = $3 | 79 | SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 |
78 | RETURNING id, mac, broadcast_addr; | 80 | RETURNING id, mac, broadcast_addr, ip; |
79 | "#, | 81 | "#, |
80 | payload.mac, | 82 | payload.mac, |
81 | payload.broadcast_addr, | 83 | payload.broadcast_addr, |
84 | payload.ip, | ||
82 | payload.id | 85 | payload.id |
83 | ).fetch_one(&state.db).await.map_err(WebolError::DB)?; | 86 | ).fetch_one(&state.db).await.map_err(WebolError::DB)?; |
84 | 87 | ||
@@ -93,4 +96,5 @@ pub struct PostDevicePayload { | |||
93 | id: String, | 96 | id: String, |
94 | mac: String, | 97 | mac: String, |
95 | broadcast_addr: String, | 98 | broadcast_addr: String, |
99 | ip: String, | ||
96 | } | 100 | } |
diff --git a/src/routes/start.rs b/src/routes/start.rs index 5b73281..3bccb0f 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs | |||
@@ -22,7 +22,7 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap | |||
22 | let device = sqlx::query_as!( | 22 | let device = sqlx::query_as!( |
23 | Device, | 23 | Device, |
24 | r#" | 24 | r#" |
25 | SELECT id, mac, broadcast_addr | 25 | SELECT id, mac, broadcast_addr, ip |
26 | FROM devices | 26 | FROM devices |
27 | WHERE id = $1; | 27 | WHERE id = $1; |
28 | "#, | 28 | "#, |
@@ -44,16 +44,15 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap | |||
44 | let uuid = if payload.ping.is_some_and(|ping| ping) { | 44 | let uuid = if payload.ping.is_some_and(|ping| ping) { |
45 | let uuid_gen = Uuid::new_v4().to_string(); | 45 | let uuid_gen = Uuid::new_v4().to_string(); |
46 | let uuid_genc = uuid_gen.clone(); | 46 | let uuid_genc = uuid_gen.clone(); |
47 | let uuid_gencc = uuid_gen.clone(); | 47 | tokio::spawn(async move { |
48 | tokio::spawn(async move{ | ||
49 | debug!("Init ping service"); | 48 | debug!("Init ping service"); |
50 | state.ping_map.insert(uuid_gen, ("192.168.178.94".to_string(), false)); | 49 | state.ping_map.insert(uuid_gen.clone(), (device.ip.clone(), false)); |
51 | 50 | ||
52 | warn!("{:?}", state.ping_map); | 51 | warn!("{:?}", state.ping_map); |
53 | 52 | ||
54 | crate::services::ping::spawn(state.ping_send.clone(), "192.168.178.94".to_string(), uuid_genc.clone(), state.ping_map.clone()).await | 53 | crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await |
55 | }); | 54 | }); |
56 | Some(uuid_gencc) | 55 | Some(uuid_genc) |
57 | } else { None }; | 56 | } else { None }; |
58 | Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) | 57 | Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) |
59 | } else { | 58 | } else { |
diff --git a/src/routes/status.rs b/src/routes/status.rs index 4a5ec67..45f3e51 100644 --- a/src/routes/status.rs +++ b/src/routes/status.rs | |||
@@ -6,5 +6,5 @@ use crate::services::ping::status_websocket; | |||
6 | 6 | ||
7 | #[axum_macros::debug_handler] | 7 | #[axum_macros::debug_handler] |
8 | pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { | 8 | pub async fn status(State(state): State<Arc<AppState>>, ws: WebSocketUpgrade) -> Response { |
9 | ws.on_upgrade(move |socket| status_websocket(socket, state.ping_send.clone(), state.ping_map.clone())) | 9 | ws.on_upgrade(move |socket| status_websocket(socket, state)) |
10 | } \ No newline at end of file | 10 | } \ No newline at end of file |
diff --git a/src/services/ping.rs b/src/services/ping.rs index ed848fc..04ad511 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs | |||
@@ -6,12 +6,13 @@ use axum::extract::ws::{CloseFrame, Message}; | |||
6 | use dashmap::DashMap; | 6 | use dashmap::DashMap; |
7 | use tokio::sync::broadcast::{Sender}; | 7 | use tokio::sync::broadcast::{Sender}; |
8 | use tracing::{debug, trace, warn}; | 8 | use tracing::{debug, trace, warn}; |
9 | use crate::AppState; | ||
9 | 10 | ||
10 | use crate::error::WebolError; | 11 | use crate::error::WebolError; |
11 | 12 | ||
12 | pub type PingMap = Arc<DashMap<String, (String, bool)>>; | 13 | pub type PingMap = DashMap<String, (String, bool)>; |
13 | 14 | ||
14 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { | 15 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) -> Result<(), WebolError> { |
15 | let payload = [0; 8]; | 16 | let payload = [0; 8]; |
16 | 17 | ||
17 | // TODO: Better while | 18 | // TODO: Better while |
@@ -31,14 +32,14 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping | |||
31 | let (_, duration) = ping.unwrap(); | 32 | let (_, duration) = ping.unwrap(); |
32 | debug!("Ping took {:?}", duration); | 33 | debug!("Ping took {:?}", duration); |
33 | cont = false; | 34 | cont = false; |
34 | handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await; | 35 | handle_broadcast_send(&tx, ip.clone(), &ping_map, uuid.clone()).await; |
35 | }; | 36 | }; |
36 | } | 37 | } |
37 | 38 | ||
38 | Ok(()) | 39 | Ok(()) |
39 | } | 40 | } |
40 | 41 | ||
41 | async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: PingMap, uuid: String) { | 42 | async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: &PingMap, uuid: String) { |
42 | debug!("sending pingsuccess message"); | 43 | debug!("sending pingsuccess message"); |
43 | ping_map.insert(uuid.clone(), (ip.clone(), true)); | 44 | ping_map.insert(uuid.clone(), (ip.clone(), true)); |
44 | let _ = tx.send(BroadcastCommands::PingSuccess(ip)); | 45 | let _ = tx.send(BroadcastCommands::PingSuccess(ip)); |
@@ -52,8 +53,8 @@ pub enum BroadcastCommands { | |||
52 | PingSuccess(String) | 53 | PingSuccess(String) |
53 | } | 54 | } |
54 | 55 | ||
55 | pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommands>, ping_map: PingMap) { | 56 | pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { |
56 | warn!("{:?}", ping_map); | 57 | warn!("{:?}", state.ping_map); |
57 | 58 | ||
58 | trace!("wait for ws message (uuid)"); | 59 | trace!("wait for ws message (uuid)"); |
59 | let msg = socket.recv().await; | 60 | let msg = socket.recv().await; |
@@ -62,13 +63,14 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand | |||
62 | trace!("Search for uuid: {:?}", uuid); | 63 | trace!("Search for uuid: {:?}", uuid); |
63 | 64 | ||
64 | // TODO: Handle Error | 65 | // TODO: Handle Error |
65 | let device = ping_map.get(&uuid).unwrap().to_owned(); | 66 | let device = state.ping_map.get(&uuid).unwrap().to_owned(); |
66 | 67 | ||
67 | trace!("got device: {:?}", device); | 68 | trace!("got device: {:?}", device); |
68 | 69 | ||
69 | match device.1 { | 70 | match device.1 { |
70 | true => { | 71 | true => { |
71 | debug!("already started"); | 72 | debug!("already started"); |
73 | // TODO: What's better? | ||
72 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); | 74 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); |
73 | // socket.close().await.unwrap(); | 75 | // socket.close().await.unwrap(); |
74 | socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); | 76 | socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); |
@@ -77,7 +79,7 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand | |||
77 | let ip = device.0.to_owned(); | 79 | let ip = device.0.to_owned(); |
78 | loop{ | 80 | loop{ |
79 | trace!("wait for tx message"); | 81 | trace!("wait for tx message"); |
80 | let message = tx.subscribe().recv().await.unwrap(); | 82 | let message = state.ping_send.subscribe().recv().await.unwrap(); |
81 | trace!("GOT = {:?}", message); | 83 | trace!("GOT = {:?}", message); |
82 | // if let BroadcastCommands::PingSuccess(msg_ip) = message { | 84 | // if let BroadcastCommands::PingSuccess(msg_ip) = message { |
83 | // if msg_ip == ip { | 85 | // if msg_ip == ip { |
@@ -95,7 +97,7 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand | |||
95 | socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); | 97 | socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); |
96 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); | 98 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); |
97 | // socket.close().await.unwrap(); | 99 | // socket.close().await.unwrap(); |
98 | warn!("{:?}", ping_map); | 100 | warn!("{:?}", state.ping_map); |
99 | } | 101 | } |
100 | } | 102 | } |
101 | } \ No newline at end of file | 103 | } \ No newline at end of file |