aboutsummaryrefslogtreecommitdiff
path: root/src/services/ping.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/services/ping.rs')
-rw-r--r--src/services/ping.rs154
1 files changed, 68 insertions, 86 deletions
diff --git a/src/services/ping.rs b/src/services/ping.rs
index 9b164c8..9191f86 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -1,59 +1,58 @@
1use std::str::FromStr; 1use crate::config::Config;
2use std::net::IpAddr; 2use crate::db::Device;
3use std::sync::Arc;
4
5use axum::extract::ws::WebSocket;
6use axum::extract::ws::Message;
7use dashmap::DashMap; 3use dashmap::DashMap;
4use ipnetwork::IpNetwork;
8use sqlx::PgPool; 5use sqlx::PgPool;
6use std::fmt::Display;
9use time::{Duration, Instant}; 7use time::{Duration, Instant};
10use tokio::sync::broadcast::Sender; 8use tokio::sync::broadcast::Sender;
11use tracing::{debug, error, trace}; 9use tracing::{debug, error, trace};
12use crate::AppState;
13use crate::config::Config;
14use crate::db::Device;
15 10
16pub type StatusMap = DashMap<String, Value>; 11pub type StatusMap = DashMap<String, Value>;
17 12
18#[derive(Debug, Clone)] 13#[derive(Debug, Clone)]
19pub struct Value { 14pub struct Value {
20 pub ip: String, 15 pub ip: IpNetwork,
21 pub online: bool 16 pub online: bool,
22} 17}
23 18
24pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Device, uuid: String, ping_map: &StatusMap, db: &PgPool) { 19pub async fn spawn(
20 tx: Sender<BroadcastCommand>,
21 config: &Config,
22 device: Device,
23 uuid: String,
24 ping_map: &StatusMap,
25 db: &PgPool,
26) {
25 let timer = Instant::now(); 27 let timer = Instant::now();
26 let payload = [0; 8]; 28 let payload = [0; 8];
27 29
28 let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip"); 30 let mut msg: Option<BroadcastCommand> = None;
29
30 let mut msg: Option<BroadcastCommands> = None;
31 while msg.is_none() { 31 while msg.is_none() {
32 let ping = surge_ping::ping( 32 let ping = surge_ping::ping(device.ip.ip(), &payload).await;
33 ping_ip,
34 &payload
35 ).await;
36 33
37 if let Err(ping) = ping { 34 if let Err(ping) = ping {
38 let ping_timeout = matches!(ping, surge_ping::SurgeError::Timeout { .. }); 35 let ping_timeout = matches!(ping, surge_ping::SurgeError::Timeout { .. });
39 if !ping_timeout { 36 if !ping_timeout {
40 error!("{}", ping.to_string()); 37 error!("{}", ping.to_string());
41 msg = Some(BroadcastCommands::Error(uuid.clone())); 38 msg = Some(BroadcastCommand::error(uuid.clone()));
42 } 39 }
43 if timer.elapsed() >= Duration::minutes(config.pingtimeout) { 40 if timer.elapsed() >= Duration::minutes(config.pingtimeout) {
44 msg = Some(BroadcastCommands::Timeout(uuid.clone())); 41 msg = Some(BroadcastCommand::timeout(uuid.clone()));
45 } 42 }
46 } else { 43 } else {
47 let (_, duration) = ping.map_err(|err| error!("{}", err.to_string())).expect("fatal error"); 44 let (_, duration) = ping
45 .map_err(|err| error!("{}", err.to_string()))
46 .expect("fatal error");
48 debug!("ping took {:?}", duration); 47 debug!("ping took {:?}", duration);
49 msg = Some(BroadcastCommands::Success(uuid.clone())); 48 msg = Some(BroadcastCommand::success(uuid.clone()));
50 }; 49 };
51 } 50 }
52 51
53 let msg = msg.expect("fatal error"); 52 let msg = msg.expect("fatal error");
54 53
55 let _ = tx.send(msg.clone()); 54 let _ = tx.send(msg.clone());
56 if let BroadcastCommands::Success(..) = msg { 55 if let BroadcastCommands::Success = msg.command {
57 sqlx::query!( 56 sqlx::query!(
58 r#" 57 r#"
59 UPDATE devices 58 UPDATE devices
@@ -62,8 +61,17 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Devic
62 "#, 61 "#,
63 timer.elapsed().whole_seconds(), 62 timer.elapsed().whole_seconds(),
64 device.id 63 device.id
65 ).execute(db).await.unwrap(); 64 )
66 ping_map.insert(uuid.clone(), Value { ip: device.ip.clone(), online: true }); 65 .execute(db)
66 .await
67 .unwrap();
68 ping_map.insert(
69 uuid.clone(),
70 Value {
71 ip: device.ip,
72 online: true,
73 },
74 );
67 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; 75 tokio::time::sleep(tokio::time::Duration::from_secs(60)).await;
68 } 76 }
69 trace!("remove {} from ping_map", uuid); 77 trace!("remove {} from ping_map", uuid);
@@ -72,74 +80,48 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Devic
72 80
73#[derive(Clone, Debug, PartialEq)] 81#[derive(Clone, Debug, PartialEq)]
74pub enum BroadcastCommands { 82pub enum BroadcastCommands {
75 Success(String), 83 Success,
76 Timeout(String), 84 Timeout,
77 Error(String), 85 Error,
78} 86}
79 87
80pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { 88#[derive(Clone, Debug, PartialEq)]
81 trace!("wait for ws message (uuid)"); 89pub struct BroadcastCommand {
82 let msg = socket.recv().await; 90 pub uuid: String,
83 let uuid = msg.unwrap().unwrap().into_text().unwrap(); 91 pub command: BroadcastCommands,
84 92}
85 trace!("Search for uuid: {}", uuid);
86
87 let eta = get_eta(&state.db).await;
88 let _ = socket.send(Message::Text(format!("eta_{eta}_{uuid}"))).await;
89 93
90 let device_exists = state.ping_map.contains_key(&uuid); 94impl Display for BroadcastCommand {
91 if device_exists { 95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
92 let _ = socket.send(process_device(state.clone(), uuid).await).await; 96 let prefix = match self.command {
93 } else { 97 BroadcastCommands::Success => "start",
94 debug!("didn't find any device"); 98 BroadcastCommands::Timeout => "timeout",
95 let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await; 99 BroadcastCommands::Error => "error",
96 }; 100 };
97 101
98 let _ = socket.close().await; 102 f.write_str(format!("{prefix}_{}", self.uuid).as_str())
103 }
99} 104}
100 105
101async fn get_eta(db: &PgPool) -> i64 { 106impl BroadcastCommand {
102 let query = sqlx::query!( 107 pub fn success(uuid: String) -> Self {
103 r#"SELECT times FROM devices;"# 108 Self {
104 ).fetch_one(db).await.unwrap(); 109 uuid,
105 110 command: BroadcastCommands::Success,
106 let times = match query.times { 111 }
107 None => { vec![0] }, 112 }
108 Some(t) => t,
109 };
110 times.iter().sum::<i64>() / i64::try_from(times.len()).unwrap()
111 113
112} 114 pub fn timeout(uuid: String) -> Self {
115 Self {
116 uuid,
117 command: BroadcastCommands::Timeout,
118 }
119 }
113 120
114async fn process_device(state: Arc<AppState>, uuid: String) -> Message { 121 pub fn error(uuid: String) -> Self {
115 let pm = state.ping_map.clone().into_read_only(); 122 Self {
116 let device = pm.get(&uuid).expect("fatal error"); 123 uuid,
117 debug!("got device: {} (online: {})", device.ip, device.online); 124 command: BroadcastCommands::Error,
118 if device.online {
119 debug!("already started");
120 Message::Text(format!("start_{uuid}"))
121 } else {
122 loop {
123 trace!("wait for tx message");
124 let message = state.ping_send.subscribe().recv().await.expect("fatal error");
125 trace!("got message {:?}", message);
126 return match message {
127 BroadcastCommands::Success(msg_uuid) => {
128 if msg_uuid != uuid { continue; }
129 trace!("message == uuid success");
130 Message::Text(format!("start_{uuid}"))
131 },
132 BroadcastCommands::Timeout(msg_uuid) => {
133 if msg_uuid != uuid { continue; }
134 trace!("message == uuid timeout");
135 Message::Text(format!("timeout_{uuid}"))
136 },
137 BroadcastCommands::Error(msg_uuid) => {
138 if msg_uuid != uuid { continue; }
139 trace!("message == uuid error");
140 Message::Text(format!("error_{uuid}"))
141 }
142 }
143 } 125 }
144 } 126 }
145} 127}