summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/main.rs7
-rw-r--r--src/play.rs234
-rw-r--r--src/session.rs14
3 files changed, 166 insertions, 89 deletions
diff --git a/src/main.rs b/src/main.rs
index 9e7f21b..7162402 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -3,7 +3,7 @@ mod session;
mod template;
use crate::play::handle_play;
-use crate::session::{HandObject, Seat, Session};
+use crate::session::{HandObject, PlayUpdate, Seat, Session};
use crate::template::{IndexTemplate, SessionTemplate};
use askama::Template;
use axum::extract::{Path, Query, State, WebSocketUpgrade};
@@ -128,10 +128,11 @@ async fn update_hands(
for (color, hand) in payload {
let seat = session
.seats
- .entry(color)
+ .entry(color.to_owned())
.or_insert_with(|| Seat { hand: Vec::new() });
- seat.hand = hand;
+ seat.hand = hand.to_owned();
+ let _ = session.update_tx.send(PlayUpdate::HandUpdate(color, hand));
}
StatusCode::NO_CONTENT
}
diff --git a/src/play.rs b/src/play.rs
index f401fea..82380f9 100644
--- a/src/play.rs
+++ b/src/play.rs
@@ -1,19 +1,13 @@
use crate::AppState;
-use crate::session::HandObject;
+use crate::session::{HandObject, PlayUpdate, Session};
use axum::extract::ws::{Message, Utf8Bytes, WebSocket};
+use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
-use std::error::Error;
-use std::sync::Arc;
-
-macro_rules! send_message_or_break {
- ($socket:expr, $message:expr) => {{
- let result = send_outgoing_message($socket, $message).await;
- if let Err(err) = result {
- eprintln!("Failed to send message to socket: {}", err);
- break;
- }
- }};
-}
+use std::sync::{Arc, Mutex, RwLock, Weak};
+use tokio::sync::broadcast::Receiver;
+use tokio::sync::broadcast::error::RecvError;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::Sender;
#[derive(Deserialize)]
enum IncomingPlayMessage {
@@ -21,97 +15,169 @@ enum IncomingPlayMessage {
Color(String),
}
+// TODO: Maybe derive Clone, reference interior vals
#[derive(Serialize)]
-enum OutgoingPlayMessage<'a> {
- Initialize { colors: Vec<&'a String> },
- Hand(Vec<&'a HandObject>),
+enum OutgoingPlayMessage {
+ Initialize { colors: Vec<String> },
+ Hand(Vec<HandObject>),
// TODO: include error details
Error,
}
-async fn send_outgoing_message(
- socket: &mut WebSocket,
- message: &OutgoingPlayMessage<'_>,
-) -> Result<(), Box<dyn Error>> {
- let serialized = serde_json::to_string(message)?;
- socket
- .send(Message::Text(Utf8Bytes::from(serialized)))
- .await
- .map_err(Box::from)
-}
+pub async fn handle_play(socket: WebSocket, app_state: Arc<AppState>) {
+ let (mut sender, mut receiver) = socket.split();
+ let (sender_tx, mut sender_rx) = mpsc::channel(2);
-pub async fn handle_play(mut socket: WebSocket, app_state: Arc<AppState>) {
- let mut player_session = None;
+ let mut send_task = tokio::spawn(async move {
+ while let Some(message) = sender_rx.recv().await {
+ let serialized = match serde_json::to_string(&message) {
+ Ok(serialized) => serialized,
+ Err(err) => {
+ eprintln!("Failed to serialize outgoing websocket message: {}", err);
+ break;
+ }
+ };
+ if let Err(err) = sender
+ .send(Message::Text(Utf8Bytes::from(serialized)))
+ .await
+ {
+ eprintln!("Failed to send serialized websocket message: {}", err);
+ break;
+ }
+ }
+ });
- while let Some(msg) = socket.recv().await {
- let Ok(Message::Text(text)) = msg else {
- continue;
- };
+ let mut recv_task = {
+ let sender_tx = sender_tx.clone();
+ tokio::spawn(async move {
+ let mut player_session = None;
+ let player_color = Arc::new(RwLock::new(String::new()));
- match serde_json::from_str(text.as_str()) {
- Ok(IncomingPlayMessage::Initialize { id }) => {
- let session = {
- let sessions = app_state.sessions.read().unwrap();
- sessions
- .get(&id)
- .map(Arc::to_owned)
- .ok_or("Session did not exist")
+ while let Some(msg) = receiver.next().await {
+ let Ok(Message::Text(text)) = msg else {
+ continue;
};
- match session {
- Ok(session) => {
- let colors: Vec<String> = session
+ match serde_json::from_str(text.as_str()) {
+ Ok(IncomingPlayMessage::Initialize { id }) => {
+ let session = {
+ let sessions = app_state.sessions.read().unwrap();
+ sessions
+ .get(&id)
+ .map(Arc::to_owned)
+ .ok_or("Session did not exist")
+ };
+
+ match session {
+ Ok(session) => {
+ let (colors, update_rx) = {
+ let session = session.lock().unwrap();
+
+ let colors: Vec<String> =
+ session.seats.keys().cloned().collect();
+ let update_rx = session.update_tx.subscribe();
+
+ (colors, update_rx)
+ };
+
+ player_session = Some(Arc::downgrade(&session));
+ {
+ let sender_tx = sender_tx.clone();
+ let player_session = Arc::downgrade(&session);
+ let player_color = player_color.clone();
+
+ tokio::spawn(async move {
+ handle_update(
+ update_rx,
+ sender_tx,
+ player_session,
+ player_color,
+ )
+ .await
+ });
+ }
+
+ let response = OutgoingPlayMessage::Initialize { colors };
+ if sender_tx.send(response).await.is_err() {
+ break;
+ }
+ }
+ Err(err) => {
+ eprintln!("Failed to access session: {}", err);
+ let response = OutgoingPlayMessage::Error;
+ if sender_tx.send(response).await.is_err() {
+ break;
+ }
+ }
+ }
+ }
+ Ok(IncomingPlayMessage::Color(color)) => {
+ let Some(session) =
+ player_session.clone().and_then(|session| session.upgrade())
+ else {
+ let response = OutgoingPlayMessage::Error;
+ if sender_tx.send(response).await.is_err() {
+ break;
+ }
+ break;
+ };
+ let hand = session
.lock()
.unwrap()
.seats
- .keys()
- .map(String::to_owned)
- .collect();
- player_session = Some(Arc::downgrade(&session));
- let response = OutgoingPlayMessage::Initialize {
- colors: colors.iter().collect(),
+ .get(&color)
+ .map(|seat| seat.hand.to_owned());
+ match hand {
+ Some(hand) => {
+ *player_color.write().unwrap() = color;
+ if sender_tx
+ .send(OutgoingPlayMessage::Hand(hand))
+ .await
+ .is_err()
+ {
+ break;
+ }
+ }
+ None => {
+ if sender_tx.send(OutgoingPlayMessage::Error).await.is_err() {
+ break;
+ }
+ }
};
- send_message_or_break!(&mut socket, &response);
}
Err(err) => {
- eprintln!("Failed to access session: {}", err);
- let response = OutgoingPlayMessage::Error;
- send_message_or_break!(&mut socket, &response);
+ eprintln!(
+ "Encountered an error while handling a message from a player: {}",
+ err
+ );
+ break;
}
}
}
- Ok(IncomingPlayMessage::Color(color)) => {
- let Some(session) = player_session.clone().and_then(|session| session.upgrade())
- else {
- let response = OutgoingPlayMessage::Error;
- send_message_or_break!(&mut socket, &response);
- break;
- };
- let hand = session
- .lock()
- .unwrap()
- .seats
- .get(&color)
- .map(|seat| (&seat.hand).to_owned());
- match hand {
- Some(hand) => {
- // Response constructed here because the inner value of the Option would be dropped outside the match block
- let response = OutgoingPlayMessage::Hand(hand.iter().collect());
- send_message_or_break!(&mut socket, &response);
- }
- None => {
- let response = OutgoingPlayMessage::Error;
- send_message_or_break!(&mut socket, &response);
- }
- };
- }
- Err(err) => {
- eprintln!(
- "Encountered an error while handling a message from a player: {}",
- err
- );
- break;
+ })
+ };
+
+ tokio::select! {
+ _ = &mut send_task => recv_task.abort(),
+ _ = &mut recv_task => send_task.abort(),
+ }
+}
+
+async fn handle_update(
+ mut update_rx: Receiver<PlayUpdate>,
+ sender_tx: Sender<OutgoingPlayMessage>,
+ _player_session: Weak<Mutex<Session>>,
+ player_color: Arc<RwLock<String>>,
+) {
+ loop {
+ match update_rx.recv().await {
+ Ok(PlayUpdate::HandUpdate(color, hand)) => {
+ if *player_color.read().unwrap() == color {
+ let _ = sender_tx.send(OutgoingPlayMessage::Hand(hand)).await;
+ }
}
+ Err(RecvError::Closed) => break,
+ Err(RecvError::Lagged(_)) => continue,
}
}
}
diff --git a/src/session.rs b/src/session.rs
index acd7615..4797c5c 100644
--- a/src/session.rs
+++ b/src/session.rs
@@ -1,19 +1,21 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
+use tokio::sync::broadcast;
pub struct Session {
pub steam_name: String,
pub seats: HashMap<String, Seat>,
+ pub update_tx: broadcast::Sender<PlayUpdate>,
}
-#[derive(Clone, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum HandObject {
CustomDeck(CustomDeck),
}
// TODO: These fields will be used in the future. When they are, the dead_code lint should no longer
// be suppressed.
-#[derive(Clone, Serialize, Deserialize)]
+#[derive(Clone, Debug, Serialize, Deserialize)]
#[allow(dead_code)]
pub struct CustomDeck {
/// The path/URL of the face cardsheet.
@@ -41,11 +43,19 @@ pub struct Seat {
pub hand: Vec<HandObject>,
}
+#[derive(Clone)]
+pub enum PlayUpdate {
+ HandUpdate(String, Vec<HandObject>),
+}
+
impl Session {
pub fn new(steam_name: String) -> Self {
+ let (update_tx, _) = broadcast::channel(10);
+
Session {
steam_name,
seats: HashMap::new(),
+ update_tx,
}
}
}