aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/db.rs3
-rw-r--r--src/routes/device.rs4
-rw-r--r--src/routes/start.rs22
-rw-r--r--src/services/ping.rs35
4 files changed, 52 insertions, 12 deletions
diff --git a/src/db.rs b/src/db.rs
index c012b47..8a6b16e 100644
--- a/src/db.rs
+++ b/src/db.rs
@@ -13,7 +13,8 @@ pub struct Device {
13 pub id: String, 13 pub id: String,
14 pub mac: String, 14 pub mac: String,
15 pub broadcast_addr: String, 15 pub broadcast_addr: String,
16 pub ip: String 16 pub ip: String,
17 pub times: Option<Vec<i64>>
17} 18}
18 19
19pub async fn init_db_pool() -> PgPool { 20pub async fn init_db_pool() -> PgPool {
diff --git a/src/routes/device.rs b/src/routes/device.rs
index 1eeff0b..678d117 100644
--- a/src/routes/device.rs
+++ b/src/routes/device.rs
@@ -16,7 +16,7 @@ pub async fn get_device(State(state): State<Arc<crate::AppState>>, headers: Head
16 let device = sqlx::query_as!( 16 let device = sqlx::query_as!(
17 Device, 17 Device,
18 r#" 18 r#"
19 SELECT id, mac, broadcast_addr, ip 19 SELECT id, mac, broadcast_addr, ip, times
20 FROM devices 20 FROM devices
21 WHERE id = $1; 21 WHERE id = $1;
22 "#, 22 "#,
@@ -79,7 +79,7 @@ pub async fn post_device(State(state): State<Arc<crate::AppState>>, headers: Hea
79 r#" 79 r#"
80 UPDATE devices 80 UPDATE devices
81 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4 81 SET mac = $1, broadcast_addr = $2, ip = $3 WHERE id = $4
82 RETURNING id, mac, broadcast_addr, ip; 82 RETURNING id, mac, broadcast_addr, ip, times;
83 "#, 83 "#,
84 payload.mac, 84 payload.mac,
85 payload.broadcast_addr, 85 payload.broadcast_addr,
diff --git a/src/routes/start.rs b/src/routes/start.rs
index 271f924..1555db3 100644
--- a/src/routes/start.rs
+++ b/src/routes/start.rs
@@ -22,7 +22,7 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
22 let device = sqlx::query_as!( 22 let device = sqlx::query_as!(
23 Device, 23 Device,
24 r#" 24 r#"
25 SELECT id, mac, broadcast_addr, ip 25 SELECT id, mac, broadcast_addr, ip, times
26 FROM devices 26 FROM devices
27 WHERE id = $1; 27 WHERE id = $1;
28 "#, 28 "#,
@@ -40,19 +40,31 @@ pub async fn start(State(state): State<Arc<crate::AppState>>, headers: HeaderMap
40 &device.broadcast_addr.parse().map_err(WebolError::IpParse)?, 40 &device.broadcast_addr.parse().map_err(WebolError::IpParse)?,
41 create_buffer(&device.mac)? 41 create_buffer(&device.mac)?
42 )?; 42 )?;
43 43 let dev_id = device.id.clone();
44 let uuid = if payload.ping.is_some_and(|ping| ping) { 44 let uuid = if payload.ping.is_some_and(|ping| ping) {
45 let uuid_gen = Uuid::new_v4().to_string(); 45 let mut uuid: Option<String> = None;
46 for (key, value) in state.ping_map.clone() {
47 if value.ip == device.ip {
48 debug!("service already exists");
49 uuid = Some(key);
50 break;
51 }
52 };
53 let uuid_gen = match uuid {
54 Some(u) => u,
55 None => Uuid::new_v4().to_string(),
56 };
46 let uuid_genc = uuid_gen.clone(); 57 let uuid_genc = uuid_gen.clone();
58
47 tokio::spawn(async move { 59 tokio::spawn(async move {
48 debug!("init ping service"); 60 debug!("init ping service");
49 state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false }); 61 state.ping_map.insert(uuid_gen.clone(), PingValue { ip: device.ip.clone(), online: false });
50 62
51 crate::services::ping::spawn(state.ping_send.clone(), device.ip, uuid_gen.clone(), &state.ping_map).await 63 crate::services::ping::spawn(state.ping_send.clone(), device, uuid_gen.clone(), &state.ping_map, &state.db).await
52 }); 64 });
53 Some(uuid_genc) 65 Some(uuid_genc)
54 } else { None }; 66 } else { None };
55 Ok(Json(json!(StartResponse { id: device.id, boot: true, uuid }))) 67 Ok(Json(json!(StartResponse { id: dev_id, boot: true, uuid })))
56 } else { 68 } else {
57 Err(WebolError::Generic) 69 Err(WebolError::Generic)
58 } 70 }
diff --git a/src/services/ping.rs b/src/services/ping.rs
index 67acc1c..c3bdced 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -5,11 +5,13 @@ use std::sync::Arc;
5use axum::extract::{ws::WebSocket}; 5use axum::extract::{ws::WebSocket};
6use axum::extract::ws::Message; 6use axum::extract::ws::Message;
7use dashmap::DashMap; 7use dashmap::DashMap;
8use sqlx::PgPool;
8use time::{Duration, Instant}; 9use time::{Duration, Instant};
9use tokio::sync::broadcast::{Sender}; 10use tokio::sync::broadcast::{Sender};
10use tracing::{debug, error, trace}; 11use tracing::{debug, error, trace};
11use crate::AppState; 12use crate::AppState;
12use crate::config::SETTINGS; 13use crate::config::SETTINGS;
14use crate::db::Device;
13 15
14pub type PingMap = DashMap<String, PingValue>; 16pub type PingMap = DashMap<String, PingValue>;
15 17
@@ -19,11 +21,11 @@ pub struct PingValue {
19 pub online: bool 21 pub online: bool
20} 22}
21 23
22pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) { 24pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, ping_map: &PingMap, db: &PgPool) {
23 let timer = Instant::now(); 25 let timer = Instant::now();
24 let payload = [0; 8]; 26 let payload = [0; 8];
25 27
26 let ping_ip = IpAddr::from_str(&ip).expect("bad ip"); 28 let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip");
27 29
28 let mut msg: Option<BroadcastCommands> = None; 30 let mut msg: Option<BroadcastCommands> = None;
29 while msg.is_none() { 31 while msg.is_none() {
@@ -52,7 +54,16 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping
52 54
53 let _ = tx.send(msg.clone()); 55 let _ = tx.send(msg.clone());
54 if let BroadcastCommands::Success(..) = msg { 56 if let BroadcastCommands::Success(..) = msg {
55 ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); 57 sqlx::query!(
58 r#"
59 UPDATE devices
60 SET times = array_append(times, $1)
61 WHERE id = $2;
62 "#,
63 timer.elapsed().whole_seconds(),
64 device.id
65 ).execute(db).await.unwrap();
66 ping_map.insert(uuid.clone(), PingValue { ip: device.ip.clone(), online: true });
56 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; 67 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
57 } 68 }
58 trace!("remove {} from ping_map", uuid); 69 trace!("remove {} from ping_map", uuid);
@@ -71,7 +82,10 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
71 let msg = socket.recv().await; 82 let msg = socket.recv().await;
72 let uuid = msg.unwrap().unwrap().into_text().unwrap(); 83 let uuid = msg.unwrap().unwrap().into_text().unwrap();
73 84
74 trace!("Search for uuid: {:?}", uuid); 85 trace!("Search for uuid: {}", uuid);
86
87 let eta = get_eta(&state.db).await;
88 let _ = socket.send(Message::Text(format!("eta_{}_{}", eta, uuid))).await;
75 89
76 let device_exists = state.ping_map.contains_key(&uuid); 90 let device_exists = state.ping_map.contains_key(&uuid);
77 match device_exists { 91 match device_exists {
@@ -87,6 +101,19 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
87 let _ = socket.close().await; 101 let _ = socket.close().await;
88} 102}
89 103
104async fn get_eta(db: &PgPool) -> i64 {
105 let query = sqlx::query!(
106 r#"SELECT times FROM devices;"#
107 ).fetch_one(db).await.unwrap();
108
109 let times = match query.times {
110 None => { vec![0] },
111 Some(t) => t,
112 };
113 times.iter().sum::<i64>() / times.len() as i64
114
115}
116
90async fn process_device(state: Arc<AppState>, uuid: String) -> Message { 117async fn process_device(state: Arc<AppState>, uuid: String) -> Message {
91 let pm = state.ping_map.clone().into_read_only(); 118 let pm = state.ping_map.clone().into_read_only();
92 let device = pm.get(&uuid).expect("fatal error"); 119 let device = pm.get(&uuid).expect("fatal error");