summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/db.rs3
-rw-r--r--src/routes/device.rs4
-rw-r--r--src/routes/start.rs9
-rw-r--r--src/services/ping.rs37
4 files changed, 42 insertions, 11 deletions
diff --git a/src/db.rs b/src/db.rs
index c012b47..8a6b16e 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -13,7 +13,8 @@ pub struct Device {
13 pub id: String, 13 pub id: String,
14 pub mac: String, 14 pub mac: String,
15 pub broadcast_addr: String, 15 pub broadcast_addr: String,
16 pub ip: String 16 pub ip: String,
17 pub times: Option<Vec<i64>>
17} 18}
18 19
19pub async fn init_db_pool() -> PgPool { 20pub async fn init_db_pool() -> PgPool {
diff --git a/src/routes/device.rs b/src/routes/device.rs
index 1eeff0b..678d117 100644
--- a/src/routes/device.rs
+++ b/src/routes/device.rs
@@ -16,7 +16,7 @@ pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: Head
16 let device = sqlx::query_as!( 16 let device = sqlx::query_as!(
17 Device, 17 Device,
18 r#" 18 r#"
19 SELECT id, mac, broadcast_addr, ip 19 SELECT id, mac, broadcast_addr, ip, times
20 FROM devices 20 FROM devices
21 WHERE id = $1; 21 WHERE id = $1;
22 "#, 22 "#,
@@ -79,7 +79,7 @@ pub async fn post_device(State(state): State<Arc<crate::AppState>>, headers: Hea
79 r#" 79 r#"
80 UPDATE devices 80 UPDATE devices
81 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 81 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4
82 RETURNING id, mac, broadcast_addr, ip; 82 RETURNING id, mac, broadcast_addr, ip, times;
83 "#, 83 "#,
84 payload.mac, 84 payload.mac,
85 payload.broadcast_addr, 85 payload.broadcast_addr,
diff --git a/src/routes/start.rs b/src/routes/start.rs
index 271f924..401ae97 100644
--- a/src/routes/start.rs
+++ b/src/routes/start.rs
@@ -22,7 +22,7 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
22 let device = sqlx::query_as!( 22 let device = sqlx::query_as!(
23 Device, 23 Device,
24 r#" 24 r#"
25 SELECT id, mac, broadcast_addr, ip 25 SELECT id, mac, broadcast_addr, ip, times
26 FROM devices 26 FROM devices
27 WHERE id = $1; 27 WHERE id = $1;
28 "#, 28 "#,
@@ -40,19 +40,20 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
40 &device.broadcast_addr.parse().map_err(WebolError::IpParse)?, 40 &device.broadcast_addr.parse().map_err(WebolError::IpParse)?,
41 create_buffer(&device.mac)? 41 create_buffer(&device.mac)?
42 )?; 42 )?;
43 43 let dev_id = device.id.clone();
44 let uuid = if payload.ping.is_some_and(|ping| ping) { 44 let uuid = if payload.ping.is_some_and(|ping| ping) {
45 let uuid_gen = Uuid::new_v4().to_string(); 45 let uuid_gen = Uuid::new_v4().to_string();
46 let uuid_genc = uuid_gen.clone(); 46 let uuid_genc = uuid_gen.clone();
47 // TODO: Check if service already runs
47 tokio::spawn(async move { 48 tokio::spawn(async move {
48 debug!("init ping service"); 49 debug!("init ping service");
49 state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); 50 state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false });
50 51
51 crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await 52 crate::services::ping::spawn(state.ping_send.clone(), device, uuid_gen.clone(), &state.ping_map, &state.db).await
52 }); 53 });
53 Some(uuid_genc) 54 Some(uuid_genc)
54 } else { None }; 55 } else { None };
55 Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) 56 Ok(Json(json!(StartResponse { id: dev_id, boot: true, uuid })))
56 } else { 57 } else {
57 Err(WebolError::Generic) 58 Err(WebolError::Generic)
58 } 59 }
diff --git a/src/services/ping.rs b/src/services/ping.rs
index 67acc1c..2bff61f 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -5,11 +5,13 @@ use std::sync::Arc;
5use axum::extract::{ws::WebSocket}; 5use axum::extract::{ws::WebSocket};
6use axum::extract::ws::Message; 6use axum::extract::ws::Message;
7use dashmap::DashMap; 7use dashmap::DashMap;
8use sqlx::PgPool;
8use time::{Duration, Instant}; 9use time::{Duration, Instant};
9use tokio::sync::broadcast::{Sender}; 10use tokio::sync::broadcast::{Sender};
10use tracing::{debug, error, trace}; 11use tracing::{debug, error, trace};
11use crate::AppState; 12use crate::AppState;
12use crate::config::SETTINGS; 13use crate::config::SETTINGS;
14use crate::db::Device;
13 15
14pub type PingMap = DashMap<String, PingValue>; 16pub type PingMap = DashMap<String, PingValue>;
15 17
@@ -19,11 +21,11 @@ pub struct PingValue {
19 pub online: bool 21 pub online: bool
20} 22}
21 23
22pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) { 24pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, ping_map: &PingMap, db: &PgPool) {
23 let timer = Instant::now(); 25 let timer = Instant::now();
24 let payload = [0; 8]; 26 let payload = [0; 8];
25 27
26 let ping_ip = IpAddr::from_str(&ip).expect("bad ip"); 28 let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip");
27 29
28 let mut msg: Option<BroadcastCommands> = None; 30 let mut msg: Option<BroadcastCommands> = None;
29 while msg.is_none() { 31 while msg.is_none() {
@@ -52,7 +54,16 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping
52 54
53 let _ = tx.send(msg.clone()); 55 let _ = tx.send(msg.clone());
54 if let BroadcastCommands::Success(..) = msg { 56 if let BroadcastCommands::Success(..) = msg {
55 ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); 57 sqlx::query!(
58 r#"
59 UPDATE devices
60 SET times = array_append(times, $1)
61 WHERE id = $2;
62 "#,
63 timer.elapsed().whole_seconds(),
64 device.id
65 ).execute(db).await.unwrap();
66 ping_map.insert(uuid.clone(), PingValue { ip: device.ip.clone(), online: true });
56 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; 67 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
57 } 68 }
58 trace!("remove {} from ping_map", uuid); 69 trace!("remove {} from ping_map", uuid);
@@ -71,7 +82,10 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
71 let msg = socket.recv().await; 82 let msg = socket.recv().await;
72 let uuid = msg.unwrap().unwrap().into_text().unwrap(); 83 let uuid = msg.unwrap().unwrap().into_text().unwrap();
73 84
74 trace!("Search for uuid: {:?}", uuid); 85 trace!("Search for uuid: {}", uuid);
86
87 let eta = get_eta(&state.db).await;
88 let _ = socket.send(Message::Text(format!("eta_{}_{}", eta, uuid))).await;
75 89
76 let device_exists = state.ping_map.contains_key(&uuid); 90 let device_exists = state.ping_map.contains_key(&uuid);
77 match device_exists { 91 match device_exists {
@@ -87,6 +101,21 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
87 let _ = socket.close().await; 101 let _ = socket.close().await;
88} 102}
89 103
104async fn get_eta(db: &PgPool) -> i64 {
105 let query = sqlx::query!(
106 r#"SELECT times FROM devices;"#
107 ).fetch_optional(db).await.unwrap();
108
109 match query {
110 None => { -1 },
111 Some(rec) => {
112 let times = rec.times.unwrap();
113 times.iter().sum::<i64>() / times.len() as i64
114 }
115 }
116
117}
118
90async fn process_device(state: Arc<AppState>, uuid: String) -> Message { 119async fn process_device(state: Arc<AppState>, uuid: String) -> Message {
91 let pm = state.ping_map.clone().into_read_only(); 120 let pm = state.ping_map.clone().into_read_only();
92 let device = pm.get(&uuid).expect("fatal error"); 121 let device = pm.get(&uuid).expect("fatal error");