diff options
Diffstat (limited to 'src/services/ping.rs')
-rw-r--r-- | src/services/ping.rs | 92 |
1 files changed, 48 insertions, 44 deletions
diff --git a/src/services/ping.rs b/src/services/ping.rs index f0cc4a3..a26dacc 100644 --- a/src/services/ping.rs +++ b/src/services/ping.rs | |||
@@ -1,14 +1,13 @@ | |||
1 | use std::borrow::Cow; | ||
2 | use std::sync::Arc; | 1 | use std::sync::Arc; |
3 | 2 | ||
4 | use axum::extract::{ws::WebSocket}; | 3 | use axum::extract::{ws::WebSocket}; |
5 | use axum::extract::ws::{CloseFrame, Message}; | 4 | use axum::extract::ws::Message; |
6 | use dashmap::DashMap; | 5 | use dashmap::DashMap; |
6 | use time::{Duration, Instant}; | ||
7 | use tokio::sync::broadcast::{Sender}; | 7 | use tokio::sync::broadcast::{Sender}; |
8 | use tracing::{debug, trace, warn}; | 8 | use tracing::{debug, error, trace}; |
9 | use crate::AppState; | 9 | use crate::AppState; |
10 | 10 | use crate::config::SETTINGS; | |
11 | use crate::error::WebolError; | ||
12 | 11 | ||
13 | pub type PingMap = DashMap<String, PingValue>; | 12 | pub type PingMap = DashMap<String, PingValue>; |
14 | 13 | ||
@@ -18,92 +17,97 @@ pub struct PingValue { | |||
18 | pub online: bool | 17 | pub online: bool |
19 | } | 18 | } |
20 | 19 | ||
21 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) -> Result<(), WebolError> { | 20 | pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) { |
21 | let timer = Instant::now(); | ||
22 | let payload = [0; 8]; | 22 | let payload = [0; 8]; |
23 | 23 | ||
24 | // TODO: Better while | ||
25 | let mut cont = true; | 24 | let mut cont = true; |
26 | while cont { | 25 | while cont { |
27 | let ping = surge_ping::ping( | 26 | let ping = surge_ping::ping( |
28 | ip.parse().map_err(WebolError::IpParse)?, | 27 | ip.parse().expect("bad ip"), |
29 | &payload | 28 | &payload |
30 | ).await; | 29 | ).await; |
31 | 30 | ||
32 | if let Err(ping) = ping { | 31 | if let Err(ping) = ping { |
33 | cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); | 32 | cont = matches!(ping, surge_ping::SurgeError::Timeout { .. }); |
34 | if !cont { | 33 | if !cont { |
35 | return Err(ping).map_err(WebolError::Ping) | 34 | error!("{}", ping.to_string()); |
35 | } | ||
36 | if timer.elapsed() >= Duration::minutes(SETTINGS.get_int("pingtimeout").unwrap_or(10)) { | ||
37 | let _ = tx.send(BroadcastCommands::PingTimeout(uuid.clone())); | ||
38 | trace!("remove {} from ping_map after timeout", uuid); | ||
39 | ping_map.remove(&uuid); | ||
40 | cont = false; | ||
36 | } | 41 | } |
37 | } else { | 42 | } else { |
38 | let (_, duration) = ping.unwrap(); | 43 | let (_, duration) = ping.map_err(|err| error!("{}", err.to_string())).expect("fatal error"); |
39 | debug!("Ping took {:?}", duration); | 44 | debug!("Ping took {:?}", duration); |
40 | cont = false; | 45 | cont = false; |
41 | handle_broadcast_send(&tx, ip.clone(), ping_map, uuid.clone()).await; | 46 | handle_broadcast_send(&tx, ip.clone(), ping_map, uuid.clone()).await; |
42 | }; | 47 | }; |
43 | } | 48 | } |
44 | |||
45 | Ok(()) | ||
46 | } | 49 | } |
47 | 50 | ||
48 | async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: &PingMap, uuid: String) { | 51 | async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: &PingMap, uuid: String) { |
49 | debug!("sending pingsuccess message"); | 52 | debug!("send pingsuccess message"); |
50 | ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); | 53 | ping_map.insert(uuid.clone(), PingValue { ip: ip.clone(), online: true }); |
51 | let _ = tx.send(BroadcastCommands::PingSuccess(ip)); | 54 | let _ = tx.send(BroadcastCommands::PingSuccess(uuid.clone())); |
52 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; | 55 | tokio::time::sleep(tokio::time::Duration::from_secs(60)).await; |
53 | trace!("remove {} from ping_map", uuid); | 56 | trace!("remove {} from ping_map after success", uuid); |
54 | ping_map.remove(&uuid); | 57 | ping_map.remove(&uuid); |
55 | } | 58 | } |
56 | 59 | ||
57 | #[derive(Clone, Debug)] | 60 | #[derive(Clone, Debug)] |
58 | pub enum BroadcastCommands { | 61 | pub enum BroadcastCommands { |
59 | PingSuccess(String) | 62 | PingSuccess(String), |
63 | PingTimeout(String) | ||
60 | } | 64 | } |
61 | 65 | ||
62 | pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { | 66 | pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) { |
63 | warn!("{:?}", state.ping_map); | ||
64 | |||
65 | trace!("wait for ws message (uuid)"); | 67 | trace!("wait for ws message (uuid)"); |
66 | let msg = socket.recv().await; | 68 | let msg = socket.recv().await; |
67 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); | 69 | let uuid = msg.unwrap().unwrap().into_text().unwrap(); |
68 | 70 | ||
69 | trace!("Search for uuid: {:?}", uuid); | 71 | trace!("Search for uuid: {:?}", uuid); |
70 | 72 | ||
71 | // TODO: Handle Error | 73 | match state.ping_map.get(&uuid) { |
72 | let device = state.ping_map.get(&uuid).unwrap().to_owned(); | 74 | Some(device) => { |
75 | debug!("got device: {} (online: {})", device.ip, device.online); | ||
76 | let _ = socket.send(process_device(state.clone(), uuid, device.to_owned()).await).await; | ||
77 | }, | ||
78 | None => { | ||
79 | debug!("didn't find any device"); | ||
80 | let _ = socket.send(Message::Text(format!("notfound_{}", uuid))).await; | ||
81 | }, | ||
82 | }; | ||
73 | 83 | ||
74 | trace!("got device: {:?}", device); | 84 | let _ = socket.close().await; |
85 | } | ||
75 | 86 | ||
87 | async fn process_device(state: Arc<AppState>, uuid: String, device: PingValue) -> Message { | ||
76 | match device.online { | 88 | match device.online { |
77 | true => { | 89 | true => { |
78 | debug!("already started"); | 90 | debug!("already started"); |
79 | // TODO: What's better? | 91 | Message::Text(format!("start_{}", uuid)) |
80 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); | ||
81 | // socket.close().await.unwrap(); | ||
82 | socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); | ||
83 | }, | 92 | }, |
84 | false => { | 93 | false => { |
85 | let ip = device.ip.to_owned(); | ||
86 | loop{ | 94 | loop{ |
87 | trace!("wait for tx message"); | 95 | trace!("wait for tx message"); |
88 | let message = state.ping_send.subscribe().recv().await.unwrap(); | 96 | let message = state.ping_send.subscribe().recv().await.expect("fatal error"); |
89 | trace!("GOT = {:?}", message); | 97 | trace!("got message {:?}", message); |
90 | // if let BroadcastCommands::PingSuccess(msg_ip) = message { | 98 | return match message { |
91 | // if msg_ip == ip { | 99 | BroadcastCommands::PingSuccess(msg_uuid) => { |
92 | // trace!("message == ip"); | 100 | if msg_uuid != uuid { continue; } |
93 | // break; | 101 | trace!("message == uuid success"); |
94 | // } | 102 | Message::Text(format!("start_{}", uuid)) |
95 | // } | 103 | }, |
96 | let BroadcastCommands::PingSuccess(msg_ip) = message; | 104 | BroadcastCommands::PingTimeout(msg_uuid) => { |
97 | if msg_ip == ip { | 105 | if msg_uuid != uuid { continue; } |
98 | trace!("message == ip"); | 106 | trace!("message == uuid timeout"); |
99 | break; | 107 | Message::Text(format!("timeout_{}", uuid)) |
108 | } | ||
100 | } | 109 | } |
101 | }; | 110 | } |
102 | |||
103 | socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); | ||
104 | // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); | ||
105 | // socket.close().await.unwrap(); | ||
106 | warn!("{:?}", state.ping_map); | ||
107 | } | 111 | } |
108 | } | 112 | } |
109 | } \ No newline at end of file | 113 | } \ No newline at end of file |