diff options
Diffstat (limited to 'src/services/ping.rs')
-rw-r--r-- | src/services/ping.rs | 37 |
1 files changed, 33 insertions, 4 deletions
diff --git a/src/services/ping.rs b/src/services/ping.rs index 67acc1c..2bff61f 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs | |||
@@ -5,11 +5,13 @@ use std::sync::Arc; | |||
5 | use axum::extract::{ws::WebSocket}; | 5 | use axum::extract::{ws::WebSocket}; |
6 | use axum::extract::ws::Message; | 6 | use axum::extract::ws::Message; |
7 | use dashmap::DashMap; | 7 | use dashmap::DashMap; |
8 | use sqlx::PgPool; | ||
8 | use time::{Duration, Instant}; | 9 | use time::{Duration, Instant}; |
9 | use tokio::sync::broadcast::{Sender}; | 10 | use tokio::sync::broadcast::{Sender}; |
10 | use tracing::{debug, error, trace}; | 11 | use tracing::{debug, error, trace}; |
11 | use crate::AppState; | 12 | use crate::AppState; |
12 | use crate::config::SETTINGS; | 13 | use crate::config::SETTINGS; |
14 | use crate::db::Device; | ||
13 | 15 | ||
14 | pub type PingMap = DashMap<String, PingValue>; | 16 | pub type PingMap = DashMap<String, PingValue>; |
15 | 17 | ||
@@ -19,11 +21,11 @@ pub struct PingValue { | |||
19 | pub online: bool | 21 | pub online: bool |
20 | } | 22 | } |
21 | 23 | ||
22 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) { | 24 | pub async fn spawn(tx: Sender<BroadcastCommands>, device: Device, uuid: String, ping_map: &PingMap, db: &PgPool) { |
23 | let timer = Instant::now(); | 25 | let timer = Instant::now(); |
24 | let payload = [0; 8]; | 26 | let payload = [0; 8]; |
25 | 27 | ||
26 | let ping_ip = IpAddr::from_str(&ip).expect("bad ip"); | 28 | let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip"); |
27 | 29 | ||
28 | let mut msg: Option<BroadcastCommands> = None; | 30 | let mut msg: Option<BroadcastCommands> = None; |
29 | while msg.is_none() { | 31 | while msg.is_none() { |
@@ -52,7 +54,16 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping | |||
52 | 54 | ||
53 | let _ = tx.send(msg.clone()); | 55 | let _ = tx.send(msg.clone()); |
54 | if let BroadcastCommands::Success(..) = msg { | 56 | if let BroadcastCommands::Success(..) = msg { |
55 | ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); | 57 | sqlx::query!( |
58 | r#" | ||
59 | UPDATE devices | ||
60 | SET times = array_append(times, $1) | ||
61 | WHERE id = $2; | ||
62 | "#, | ||
63 | timer.elapsed().whole_seconds(), | ||
64 | device.id | ||
65 | ).execute(db).await.unwrap(); | ||
66 | ping_map.insert(uuid.clone(), PingValue { ip: device.ip.clone(), online: true }); | ||
56 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; | 67 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; |
57 | } | 68 | } |
58 | trace!("remove {} from ping_map", uuid); | 69 | trace!("remove {} from ping_map", uuid); |
@@ -71,7 +82,10 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { | |||
71 | let msg = socket.recv().await; | 82 | let msg = socket.recv().await; |
72 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); | 83 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); |
73 | 84 | ||
74 | trace!("Search for uuid: {:?}", uuid); | 85 | trace!("Search for uuid: {}", uuid); |
86 | |||
87 | let eta = get_eta(&state.db).await; | ||
88 | let _ = socket.send(Message::Text(format!("eta_{}_{}", eta, uuid))).await; | ||
75 | 89 | ||
76 | let device_exists = state.ping_map.contains_key(&uuid); | 90 | let device_exists = state.ping_map.contains_key(&uuid); |
77 | match device_exists { | 91 | match device_exists { |
@@ -87,6 +101,21 @@ pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { | |||
87 | let _ = socket.close().await; | 101 | let _ = socket.close().await; |
88 | } | 102 | } |
89 | 103 | ||
104 | async fn get_eta(db: &PgPool) -> i64 { | ||
105 | let query = sqlx::query!( | ||
106 | r#"SELECT times FROM devices;"# | ||
107 | ).fetch_optional(db).await.unwrap(); | ||
108 | |||
109 | match query { | ||
110 | None => { -1 }, | ||
111 | Some(rec) => { | ||
112 | let times = rec.times.unwrap(); | ||
113 | times.iter().sum::<i64>() / times.len() as i64 | ||
114 | } | ||
115 | } | ||
116 | |||
117 | } | ||
118 | |||
90 | async fn process_device(state: Arc<AppState>, uuid: String) -> Message { | 119 | async fn process_device(state: Arc<AppState>, uuid: String) -> Message { |
91 | let pm = state.ping_map.clone().into_read_only(); | 120 | let pm = state.ping_map.clone().into_read_only(); |
92 | let device = pm.get(&uuid).expect("fatal error"); | 121 | let device = pm.get(&uuid).expect("fatal error"); |