diff options
author | FxQnLr <[email protected]> | 2023-11-17 11:19:43 +0100 |
---|---|---|
committer | GitHub <[email protected]> | 2023-11-17 11:19:43 +0100 |
commit | da6367885d31698464e1bec122e3e673974427c6 (patch) | |
tree | b7ee38cc5a247151d99f0fbb75a529e5b07718e9 /src | |
parent | 4f124e6ba636e6191c2960d96d0057f3061988fc (diff) | |
parent | 075b0bdc47713e303f9954556fa4b4bb472b441a (diff) | |
download | webol-da6367885d31698464e1bec122e3e673974427c6.tar webol-da6367885d31698464e1bec122e3e673974427c6.tar.gz webol-da6367885d31698464e1bec122e3e673974427c6.zip |
Merge pull request #8 from FxQnLr/eta
Eta
Diffstat (limited to 'src')
-rw-r--r-- | src/db.rs | 3 | ||||
-rw-r--r-- | src/routes/device.rs | 4 | ||||
-rw-r--r-- | src/routes/start.rs | 22 | ||||
-rw-r--r-- | src/services/ping.rs | 35 |
4 files changed, 52 insertions, 12 deletions
@@ -13,7 +13,8 @@ pub struct Device { | |||
13 | pub id: String, | 13 | pub id: String, |
14 | pub mac: String, | 14 | pub mac: String, |
15 | pub broadcast_addr: String, | 15 | pub broadcast_addr: String, |
16 | pub ip: String | 16 | pub ip: String, |
17 | pub times: Option<Vec<i64>> | ||
17 | } | 18 | } |
18 | 19 | ||
19 | pub async fn init_db_pool() -> PgPool { | 20 | pub async fn init_db_pool() -> PgPool { |
diff --git a/src/routes/device.rs b/src/routes/device.rs index 1eeff0b..678d117 100644 --- a/src/routes/device.rs +++ b/src/routes/device.rs | |||
@@ -16,7 +16,7 @@ pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: Head | |||
16 | let device = sqlx::query_as!( | 16 | let device = sqlx::query_as!( |
17 | Device, | 17 | Device, |
18 | r#" | 18 | r#" |
19 | SELECT id, mac, broadcast_addr, ip | 19 | SELECT id, mac, broadcast_addr, ip, times |
20 | FROM devices | 20 | FROM devices |
21 | WHERE id = $1; | 21 | WHERE id = $1; |
22 | "#, | 22 | "#, |
@@ -79,7 +79,7 @@ pub async fn post_device(State(state): State<Arc<crate::AppState>>, headers: Hea | |||
79 | r#" | 79 | r#" |
80 | UPDATE devices | 80 | UPDATE devices |
81 | SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 | 81 | SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 |
82 | RETURNING id, mac, broadcast_addr, ip; | 82 | RETURNING id, mac, broadcast_addr, ip, times; |
83 | "#, | 83 | "#, |
84 | payload.mac, | 84 | payload.mac, |
85 | payload.broadcast_addr, | 85 | payload.broadcast_addr, |
diff --git a/src/routes/start.rs b/src/routes/start.rs index 271f924..1555db3 100644 --- a/src/routes/start.rs +++ b/src/routes/start.rs | |||
@@ -22,7 +22,7 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap | |||
22 | let device = sqlx::query_as!( | 22 | let device = sqlx::query_as!( |
23 | Device, | 23 | Device, |
24 | r#" | 24 | r#" |
25 | SELECT id, mac, broadcast_addr, ip | 25 | SELECT id, mac, broadcast_addr, ip, times |
26 | FROM devices | 26 | FROM devices |
27 | WHERE id = $1; | 27 | WHERE id = $1; |
28 | "#, | 28 | "#, |
@@ -40,19 +40,31 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap | |||
40 | &device.broadcast_addr.parse().map_err(WebolError::IpParse)?, | 40 | &device.broadcast_addr.parse().map_err(WebolError::IpParse)?, |
41 | create_buffer(&device.mac)? | 41 | create_buffer(&device.mac)? |
42 | )?; | 42 | )?; |
43 | 43 | let dev_id = device.id.clone(); | |
44 | let uuid = if payload.ping.is_some_and(|ping| ping) { | 44 | let uuid = if payload.ping.is_some_and(|ping| ping) { |
45 | let uuid_gen = Uuid::new_v4().to_string(); | 45 | let mut uuid: Option<String> = None; |
46 | for (key, value) in state.ping_map.clone() { | ||
47 | if value.ip == device.ip { | ||
48 | debug!("service already exists"); | ||
49 | uuid = Some(key); | ||
50 | break; | ||
51 | } | ||
52 | }; | ||
53 | let uuid_gen = match uuid { | ||
54 | Some(u) => u, | ||
55 | None => Uuid::new_v4().to_string(), | ||
56 | }; | ||
46 | let uuid_genc = uuid_gen.clone(); | 57 | let uuid_genc = uuid_gen.clone(); |
58 | |||
47 | tokio::spawn(async move { | 59 | tokio::spawn(async move { |
48 | debug!("init ping service"); | 60 | debug!("init ping service"); |
49 | state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); | 61 | state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); |
50 | 62 | ||
51 | crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await | 63 | crate::services::ping::spawn(state.ping_send.clone(), device, uuid_gen.clone(), &state.ping_map, &state.db).await |
52 | }); | 64 | }); |
53 | Some(uuid_genc) | 65 | Some(uuid_genc) |
54 | } else { None }; | 66 | } else { None }; |
55 | Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) | 67 | Ok(Json(json!(StartResponse { id: dev_id, boot: true, uuid }))) |
56 | } else { | 68 | } else { |
57 | Err(WebolError::Generic) | 69 | Err(WebolError::Generic) |
58 | } | 70 | } |
diff --git a/src/services/ping.rs b/src/services/ping.rs index 67acc1c..c3bdced 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs | |||
@@ -5,11 +5,13 @@ use std::sync::Arc; | |||
5 | use axum::extract::{ws::WebSocket}; | 5 | use axum::extract::{ws::WebSocket}; |
6 | use axum::extract::ws::Message; | 6 | use axum::extract::ws::Message; |
7 | use dashmap::DashMap; | 7 | use dashmap::DashMap; |
8 | use sqlx::PgPool; | ||
8 | use time::{Duration, Instant}; | 9 | use time::{Duration, Instant}; |
9 | use tokio::sync::broadcast::{Sender}; | 10 | use tokio::sync::broadcast::{Sender}; |
10 | use tracing::{debug, error, trace}; | 11 | use tracing::{debug, error, trace}; |
11 | use crate::AppState; | 12 | use crate::AppState; |
12 | use crate::config::SETTINGS; | 13 | use crate::config::SETTINGS; |
14 | use crate::db::Device; | ||
13 | 15 | ||
14 | pub type PingMap = DashMap<String, PingValue>; | 16 | pub type PingMap = DashMap<String, PingValue>; |
15 | 17 | ||
@@ -19,11 +21,11 @@ pub struct PingValue { | |||
19 | pub online: bool | 21 | pub online: bool |
20 | } | 22 | } |
21 | 23 | ||
22 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) { | 24 | pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, ping_map: &PingMap, db: &PgPool) { |
23 | let timer = Instant::now(); | 25 | let timer = Instant::now(); |
24 | let payload = [0; 8]; | 26 | let payload = [0; 8]; |
25 | 27 | ||
26 | let ping_ip = IpAddr::from_str(&ip).expect("bad ip"); | 28 | let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip"); |
27 | 29 | ||
28 | let mut msg: Option<BroadcastCommands> = None; | 30 | let mut msg: Option<BroadcastCommands> = None; |
29 | while msg.is_none() { | 31 | while msg.is_none() { |
@@ -52,7 +54,16 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping | |||
52 | 54 | ||
53 | let _ = tx.send(msg.clone()); | 55 | let _ = tx.send(msg.clone()); |
54 | if let BroadcastCommands::Success(..) = msg { | 56 | if let BroadcastCommands::Success(..) = msg { |
55 | ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); | 57 | sqlx::query!( |
58 | r#" | ||
59 | UPDATE devices | ||
60 | SET times = array_append(times, $1) | ||
61 | WHERE id = $2; | ||
62 | "#, | ||
63 | timer.elapsed().whole_seconds(), | ||
64 | device.id | ||
65 | ).execute(db).await.unwrap(); | ||
66 | ping_map.insert(uuid.clone(), PingValue { ip: device.ip.clone(), online: true }); | ||
56 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; | 67 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; |
57 | } | 68 | } |
58 | trace!("remove {} from ping_map", uuid); | 69 | trace!("remove {} from ping_map", uuid); |
@@ -71,7 +82,10 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { | |||
71 | let msg = socket.recv().await; | 82 | let msg = socket.recv().await; |
72 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); | 83 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); |
73 | 84 | ||
74 | trace!("Search for uuid: {:?}", uuid); | 85 | trace!("Search for uuid: {}", uuid); |
86 | |||
87 | let eta = get_eta(&state.db).await; | ||
88 | let _ = socket.send(Message::Text(format!("eta_{}_{}", eta, uuid))).await; | ||
75 | 89 | ||
76 | let device_exists = state.ping_map.contains_key(&uuid); | 90 | let device_exists = state.ping_map.contains_key(&uuid); |
77 | match device_exists { | 91 | match device_exists { |
@@ -87,6 +101,19 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { | |||
87 | let _ = socket.close().await; | 101 | let _ = socket.close().await; |
88 | } | 102 | } |
89 | 103 | ||
104 | async fn get_eta(db: &PgPool) -> i64 { | ||
105 | let query = sqlx::query!( | ||
106 | r#"SELECT times FROM devices;"# | ||
107 | ).fetch_one(db).await.unwrap(); | ||
108 | |||
109 | let times = match query.times { | ||
110 | None => { vec![0] }, | ||
111 | Some(t) => t, | ||
112 | }; | ||
113 | times.iter().sum::<i64>() / times.len() as i64 | ||
114 | |||
115 | } | ||
116 | |||
90 | async fn process_device(state: Arc<AppState>, uuid: String) -> Message { | 117 | async fn process_device(state: Arc<AppState>, uuid: String) -> Message { |
91 | let pm = state.ping_map.clone().into_read_only(); | 118 | let pm = state.ping_map.clone().into_read_only(); |
92 | let device = pm.get(&uuid).expect("fatal error"); | 119 | let device = pm.get(&uuid).expect("fatal error"); |