diff options
Diffstat (limited to 'src/services/ping.rs')
-rw-r--r-- | src/services/ping.rs | 154 |
1 files changed, 68 insertions, 86 deletions
diff --git a/src/services/ping.rs b/src/services/ping.rs index 9b164c8..9191f86 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs | |||
@@ -1,59 +1,58 @@ | |||
1 | use std::str::FromStr; | 1 | use crate::config::Config; |
2 | use std::net::IpAddr; | 2 | use crate::db::Device; |
3 | use std::sync::Arc; | ||
4 | |||
5 | use axum::extract::ws::WebSocket; | ||
6 | use axum::extract::ws::Message; | ||
7 | use dashmap::DashMap; | 3 | use dashmap::DashMap; |
4 | use ipnetwork::IpNetwork; | ||
8 | use sqlx::PgPool; | 5 | use sqlx::PgPool; |
6 | use std::fmt::Display; | ||
9 | use time::{Duration, Instant}; | 7 | use time::{Duration, Instant}; |
10 | use tokio::sync::broadcast::Sender; | 8 | use tokio::sync::broadcast::Sender; |
11 | use tracing::{debug, error, trace}; | 9 | use tracing::{debug, error, trace}; |
12 | use crate::AppState; | ||
13 | use crate::config::Config; | ||
14 | use crate::db::Device; | ||
15 | 10 | ||
16 | pub type StatusMap = DashMap<String, Value>; | 11 | pub type StatusMap = DashMap<String, Value>; |
17 | 12 | ||
18 | #[derive(Debug, Clone)] | 13 | #[derive(Debug, Clone)] |
19 | pub struct Value { | 14 | pub struct Value { |
20 | pub ip: String, | 15 | pub ip: IpNetwork, |
21 | pub online: bool | 16 | pub online: bool, |
22 | } | 17 | } |
23 | 18 | ||
24 | pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Device, uuid: String, ping_map: &StatusMap, db: &PgPool) { | 19 | pub async fn spawn( |
20 | tx: Sender<BroadcastCommand>, | ||
21 | config: &Config, | ||
22 | device: Device, | ||
23 | uuid: String, | ||
24 | ping_map: &StatusMap, | ||
25 | db: &PgPool, | ||
26 | ) { | ||
25 | let timer = Instant::now(); | 27 | let timer = Instant::now(); |
26 | let payload = [0; 8]; | 28 | let payload = [0; 8]; |
27 | 29 | ||
28 | let ping_ip = IpAddr::from_str(&device.ip).expect("bad ip"); | 30 | let mut msg: Option<BroadcastCommand> = None; |
29 | |||
30 | let mut msg: Option<BroadcastCommands> = None; | ||
31 | while msg.is_none() { | 31 | while msg.is_none() { |
32 | let ping = surge_ping::ping( | 32 | let ping = surge_ping::ping(device.ip.ip(), &payload).await; |
33 | ping_ip, | ||
34 | &payload | ||
35 | ).await; | ||
36 | 33 | ||
37 | if let Err(ping) = ping { | 34 | if let Err(ping) = ping { |
38 | let ping_timeout = matches!(ping, surge_ping::SurgeError::Timeout { .. }); | 35 | let ping_timeout = matches!(ping, surge_ping::SurgeError::Timeout { .. }); |
39 | if !ping_timeout { | 36 | if !ping_timeout { |
40 | error!("{}", ping.to_string()); | 37 | error!("{}", ping.to_string()); |
41 | msg = Some(BroadcastCommands::Error(uuid.clone())); | 38 | msg = Some(BroadcastCommand::error(uuid.clone())); |
42 | } | 39 | } |
43 | if timer.elapsed() >= Duration::minutes(config.pingtimeout) { | 40 | if timer.elapsed() >= Duration::minutes(config.pingtimeout) { |
44 | msg = Some(BroadcastCommands::Timeout(uuid.clone())); | 41 | msg = Some(BroadcastCommand::timeout(uuid.clone())); |
45 | } | 42 | } |
46 | } else { | 43 | } else { |
47 | let (_, duration) = ping.map_err(|err| error!("{}", err.to_string())).expect("fatal error"); | 44 | let (_, duration) = ping |
45 | .map_err(|err| error!("{}", err.to_string())) | ||
46 | .expect("fatal error"); | ||
48 | debug!("ping took {:?}", duration); | 47 | debug!("ping took {:?}", duration); |
49 | msg = Some(BroadcastCommands::Success(uuid.clone())); | 48 | msg = Some(BroadcastCommand::success(uuid.clone())); |
50 | }; | 49 | }; |
51 | } | 50 | } |
52 | 51 | ||
53 | let msg = msg.expect("fatal error"); | 52 | let msg = msg.expect("fatal error"); |
54 | 53 | ||
55 | let _ = tx.send(msg.clone()); | 54 | let _ = tx.send(msg.clone()); |
56 | if let BroadcastCommands::Success(..) = msg { | 55 | if let BroadcastCommands::Success = msg.command { |
57 | sqlx::query!( | 56 | sqlx::query!( |
58 | r#" | 57 | r#" |
59 | UPDATE devices | 58 | UPDATE devices |
@@ -62,8 +61,17 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Devic | |||
62 | "#, | 61 | "#, |
63 | timer.elapsed().whole_seconds(), | 62 | timer.elapsed().whole_seconds(), |
64 | device.id | 63 | device.id |
65 | ).execute(db).await.unwrap(); | 64 | ) |
66 | ping_map.insert(uuid.clone(), Value { ip: device.ip.clone(), online: true }); | 65 | .execute(db) |
66 | .await | ||
67 | .unwrap(); | ||
68 | ping_map.insert( | ||
69 | uuid.clone(), | ||
70 | Value { | ||
71 | ip: device.ip, | ||
72 | online: true, | ||
73 | }, | ||
74 | ); | ||
67 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; | 75 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; |
68 | } | 76 | } |
69 | trace!("remove {} from ping_map", uuid); | 77 | trace!("remove {} from ping_map", uuid); |
@@ -72,74 +80,48 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, config: &Config, device: Devic | |||
72 | 80 | ||
73 | #[derive(Clone, Debug, PartialEq)] | 81 | #[derive(Clone, Debug, PartialEq)] |
74 | pub enum BroadcastCommands { | 82 | pub enum BroadcastCommands { |
75 | Success(String), | 83 | Success, |
76 | Timeout(String), | 84 | Timeout, |
77 | Error(String), | 85 | Error, |
78 | } | 86 | } |
79 | 87 | ||
80 | pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { | 88 | #[derive(Clone, Debug, PartialEq)] |
81 | trace!("wait for ws message (uuid)"); | 89 | pub struct BroadcastCommand { |
82 | let msg = socket.recv().await; | 90 | pub uuid: String, |
83 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); | 91 | pub command: BroadcastCommands, |
84 | 92 | } | |
85 | trace!("Search for uuid: {}", uuid); | ||
86 | |||
87 | let eta = get_eta(&state.db).await; | ||
88 | let _ = socket.send(Message::Text(format!("eta_{eta}_{uuid}"))).await; | ||
89 | 93 | ||
90 | let device_exists = state.ping_map.contains_key(&uuid); | 94 | impl Display for BroadcastCommand { |
91 | if device_exists { | 95 | fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { |
92 | let _ = socket.send(process_device(state.clone(), uuid).await).await; | 96 | let prefix = match self.command { |
93 | } else { | 97 | BroadcastCommands::Success => "start", |
94 | debug!("didn't find any device"); | 98 | BroadcastCommands::Timeout => "timeout", |
95 | let _ = socket.send(Message::Text(format!("notfound_{uuid}"))).await; | 99 | BroadcastCommands::Error => "error", |
96 | }; | 100 | }; |
97 | 101 | ||
98 | let _ = socket.close().await; | 102 | f.write_str(format!("{prefix}_{}", self.uuid).as_str()) |
103 | } | ||
99 | } | 104 | } |
100 | 105 | ||
101 | async fn get_eta(db: &PgPool) -> i64 { | 106 | impl BroadcastCommand { |
102 | let query = sqlx::query!( | 107 | pub fn success(uuid: String) -> Self { |
103 | r#"SELECT times FROM devices;"# | 108 | Self { |
104 | ).fetch_one(db).await.unwrap(); | 109 | uuid, |
105 | 110 | command: BroadcastCommands::Success, | |
106 | let times = match query.times { | 111 | } |
107 | None => { vec![0] }, | 112 | } |
108 | Some(t) => t, | ||
109 | }; | ||
110 | times.iter().sum::<i64>() / i64::try_from(times.len()).unwrap() | ||
111 | 113 | ||
112 | } | 114 | pub fn timeout(uuid: String) -> Self { |
115 | Self { | ||
116 | uuid, | ||
117 | command: BroadcastCommands::Timeout, | ||
118 | } | ||
119 | } | ||
113 | 120 | ||
114 | async fn process_device(state: Arc<AppState>, uuid: String) -> Message { | 121 | pub fn error(uuid: String) -> Self { |
115 | let pm = state.ping_map.clone().into_read_only(); | 122 | Self { |
116 | let device = pm.get(&uuid).expect("fatal error"); | 123 | uuid, |
117 | debug!("got device: {} (online: {})", device.ip, device.online); | 124 | command: BroadcastCommands::Error, |
118 | if device.online { | ||
119 | debug!("already started"); | ||
120 | Message::Text(format!("start_{uuid}")) | ||
121 | } else { | ||
122 | loop { | ||
123 | trace!("wait for tx message"); | ||
124 | let message = state.ping_send.subscribe().recv().await.expect("fatal error"); | ||
125 | trace!("got message {:?}", message); | ||
126 | return match message { | ||
127 | BroadcastCommands::Success(msg_uuid) => { | ||
128 | if msg_uuid != uuid { continue; } | ||
129 | trace!("message == uuid success"); | ||
130 | Message::Text(format!("start_{uuid}")) | ||
131 | }, | ||
132 | BroadcastCommands::Timeout(msg_uuid) => { | ||
133 | if msg_uuid != uuid { continue; } | ||
134 | trace!("message == uuid timeout"); | ||
135 | Message::Text(format!("timeout_{uuid}")) | ||
136 | }, | ||
137 | BroadcastCommands::Error(msg_uuid) => { | ||
138 | if msg_uuid != uuid { continue; } | ||
139 | trace!("message == uuid error"); | ||
140 | Message::Text(format!("error_{uuid}")) | ||
141 | } | ||
142 | } | ||
143 | } | 125 | } |
144 | } | 126 | } |
145 | } | 127 | } |