aboutsummaryrefslogtreecommitdiff
path: root/src/services
diff options
context:
space:
mode:
Diffstat (limited to 'src/services')
-rw-r--r--src/services/ping.rs20
1 files changed, 11 insertions, 9 deletions
diff --git a/src/services/ping.rs b/src/services/ping.rs
index ed848fc..04ad511 100644
--- a/src/services/ping.rs
+++ b/src/services/ping.rs
@@ -6,12 +6,13 @@ use axum::extract::ws::{CloseFrame, Message};
6use dashmap::DashMap; 6use dashmap::DashMap;
7use tokio::sync::broadcast::{Sender}; 7use tokio::sync::broadcast::{Sender};
8use tracing::{debug, trace, warn}; 8use tracing::{debug, trace, warn};
9use crate::AppState;
9 10
10use crate::error::WebolError; 11use crate::error::WebolError;
11 12
12pub type PingMap = Arc<DashMap<String, (String, bool)>>; 13pub type PingMap = DashMap<String, (String, bool)>;
13 14
14pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: PingMap) -> Result<(), WebolError> { 15pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping_map: &PingMap) -> Result<(), WebolError> {
15 let payload = [0; 8]; 16 let payload = [0; 8];
16 17
17 // TODO: Better while 18 // TODO: Better while
@@ -31,14 +32,14 @@ pub async fn spawn(tx: Sender<BroadcastCommands>, ip: String, uuid: String, ping
31 let (_, duration) = ping.unwrap(); 32 let (_, duration) = ping.unwrap();
32 debug!("Ping took {:?}", duration); 33 debug!("Ping took {:?}", duration);
33 cont = false; 34 cont = false;
34 handle_broadcast_send(&tx, ip.clone(), ping_map.clone(), uuid.clone()).await; 35 handle_broadcast_send(&tx, ip.clone(), &ping_map, uuid.clone()).await;
35 }; 36 };
36 } 37 }
37 38
38 Ok(()) 39 Ok(())
39} 40}
40 41
41async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: PingMap, uuid: String) { 42async fn handle_broadcast_send(tx: &Sender<BroadcastCommands>, ip: String, ping_map: &PingMap, uuid: String) {
42 debug!("sending pingsuccess message"); 43 debug!("sending pingsuccess message");
43 ping_map.insert(uuid.clone(), (ip.clone(), true)); 44 ping_map.insert(uuid.clone(), (ip.clone(), true));
44 let _ = tx.send(BroadcastCommands::PingSuccess(ip)); 45 let _ = tx.send(BroadcastCommands::PingSuccess(ip));
@@ -52,8 +53,8 @@ pub enum BroadcastCommands {
52 PingSuccess(String) 53 PingSuccess(String)
53} 54}
54 55
55pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommands>, ping_map: PingMap) { 56pub async fn status_websocket(mut socket: WebSocket, state: Arc<AppState>) {
56 warn!("{:?}", ping_map); 57 warn!("{:?}", state.ping_map);
57 58
58 trace!("wait for ws message (uuid)"); 59 trace!("wait for ws message (uuid)");
59 let msg = socket.recv().await; 60 let msg = socket.recv().await;
@@ -62,13 +63,14 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand
62 trace!("Search for uuid: {:?}", uuid); 63 trace!("Search for uuid: {:?}", uuid);
63 64
64 // TODO: Handle Error 65 // TODO: Handle Error
65 let device = ping_map.get(&uuid).unwrap().to_owned(); 66 let device = state.ping_map.get(&uuid).unwrap().to_owned();
66 67
67 trace!("got device: {:?}", device); 68 trace!("got device: {:?}", device);
68 69
69 match device.1 { 70 match device.1 {
70 true => { 71 true => {
71 debug!("already started"); 72 debug!("already started");
73 // TODO: What's better?
72 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); 74 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap();
73 // socket.close().await.unwrap(); 75 // socket.close().await.unwrap();
74 socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); 76 socket.send(Message::Close(Some(CloseFrame { code: 4001, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap();
@@ -77,7 +79,7 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand
77 let ip = device.0.to_owned(); 79 let ip = device.0.to_owned();
78 loop{ 80 loop{
79 trace!("wait for tx message"); 81 trace!("wait for tx message");
80 let message = tx.subscribe().recv().await.unwrap(); 82 let message = state.ping_send.subscribe().recv().await.unwrap();
81 trace!("GOT = {:?}", message); 83 trace!("GOT = {:?}", message);
82 // if let BroadcastCommands::PingSuccess(msg_ip) = message { 84 // if let BroadcastCommands::PingSuccess(msg_ip) = message {
83 // if msg_ip == ip { 85 // if msg_ip == ip {
@@ -95,7 +97,7 @@ pub async fn status_websocket(mut socket: WebSocket, tx: Sender<BroadcastCommand
95 socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap(); 97 socket.send(Message::Close(Some(CloseFrame { code: 4000, reason: Cow::from(format!("start_{}", uuid)) }))).await.unwrap();
96 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap(); 98 // socket.send(Message::Text(format!("start_{}", uuid))).await.unwrap();
97 // socket.close().await.unwrap(); 99 // socket.close().await.unwrap();
98 warn!("{:?}", ping_map); 100 warn!("{:?}", state.ping_map);
99 } 101 }
100 } 102 }
101} \ No newline at end of file 103} \ No newline at end of file