From 067e9602ba4feccf947a138c9e73462c446268bc Mon Sep 17 00:00:00 2001 From: Jomar Milan Date: Wed, 10 Jun 2026 14:59:07 -0700 Subject: Add syncing of player hand updates to browsers --- Cargo.lock | 13 ++++ Cargo.toml | 1 + src/main.rs | 7 +- src/play.rs | 234 ++++++++++++++++++++++++++++++++++++--------------------- src/session.rs | 14 +++- 5 files changed, 180 insertions(+), 89 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2ee6374..6f40677 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -227,6 +227,17 @@ version = "0.3.32" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7e3450815272ef58cec6d564423f6e755e25379b217b0bc688e295ba24df6b1d" +[[package]] +name = "futures-macro" +version = "0.3.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e835b70203e41293343137df5c0664546da5745f82ec9b84d40be8336958447b" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.32" @@ -246,6 +257,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "389ca41296e6190b48053de0321d02a77f32f8a5d2461dd38762c0593805c6d6" dependencies = [ "futures-core", + "futures-macro", "futures-sink", "futures-task", "pin-project-lite", @@ -574,6 +586,7 @@ version = "0.1.0" dependencies = [ "askama", "axum", + "futures-util", "rust-embed", "serde", "serde_json", diff --git a/Cargo.toml b/Cargo.toml index f612f2e..a95c7e0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2024" [dependencies] askama = "0.16.0" axum = { version = "0.8.9", features = ["ws"] } +futures-util = "0.3.32" rust-embed = "8.11.0" serde = { version = "1.0.228", features = ["derive"] } serde_json = "1.0.150" diff --git a/src/main.rs b/src/main.rs index 9e7f21b..7162402 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,7 +3,7 @@ mod session; mod template; use crate::play::handle_play; -use crate::session::{HandObject, Seat, Session}; +use crate::session::{HandObject, PlayUpdate, Seat, Session}; use crate::template::{IndexTemplate, SessionTemplate}; use askama::Template; use axum::extract::{Path, Query, State, WebSocketUpgrade}; @@ -128,10 +128,11 @@ async fn update_hands( for (color, hand) in payload { let seat = session .seats - .entry(color) + .entry(color.to_owned()) .or_insert_with(|| Seat { hand: Vec::new() }); - seat.hand = hand; + seat.hand = hand.to_owned(); + let _ = session.update_tx.send(PlayUpdate::HandUpdate(color, hand)); } StatusCode::NO_CONTENT } diff --git a/src/play.rs b/src/play.rs index f401fea..82380f9 100644 --- a/src/play.rs +++ b/src/play.rs @@ -1,19 +1,13 @@ use crate::AppState; -use crate::session::HandObject; +use crate::session::{HandObject, PlayUpdate, Session}; use axum::extract::ws::{Message, Utf8Bytes, WebSocket}; +use futures_util::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; -use std::error::Error; -use std::sync::Arc; - -macro_rules! send_message_or_break { - ($socket:expr, $message:expr) => {{ - let result = send_outgoing_message($socket, $message).await; - if let Err(err) = result { - eprintln!("Failed to send message to socket: {}", err); - break; - } - }}; -} +use std::sync::{Arc, Mutex, RwLock, Weak}; +use tokio::sync::broadcast::Receiver; +use tokio::sync::broadcast::error::RecvError; +use tokio::sync::mpsc; +use tokio::sync::mpsc::Sender; #[derive(Deserialize)] enum IncomingPlayMessage { @@ -21,97 +15,169 @@ enum IncomingPlayMessage { Color(String), } +// TODO: Maybe derive Clone, reference interior vals #[derive(Serialize)] -enum OutgoingPlayMessage<'a> { - Initialize { colors: Vec<&'a String> }, - Hand(Vec<&'a HandObject>), +enum OutgoingPlayMessage { + Initialize { colors: Vec }, + Hand(Vec), // TODO: include error details Error, } -async fn send_outgoing_message( - socket: &mut WebSocket, - message: &OutgoingPlayMessage<'_>, -) -> Result<(), Box> { - let serialized = serde_json::to_string(message)?; - socket - .send(Message::Text(Utf8Bytes::from(serialized))) - .await - .map_err(Box::from) -} +pub async fn handle_play(socket: WebSocket, app_state: Arc) { + let (mut sender, mut receiver) = socket.split(); + let (sender_tx, mut sender_rx) = mpsc::channel(2); -pub async fn handle_play(mut socket: WebSocket, app_state: Arc) { - let mut player_session = None; + let mut send_task = tokio::spawn(async move { + while let Some(message) = sender_rx.recv().await { + let serialized = match serde_json::to_string(&message) { + Ok(serialized) => serialized, + Err(err) => { + eprintln!("Failed to serialize outgoing websocket message: {}", err); + break; + } + }; + if let Err(err) = sender + .send(Message::Text(Utf8Bytes::from(serialized))) + .await + { + eprintln!("Failed to send serialized websocket message: {}", err); + break; + } + } + }); - while let Some(msg) = socket.recv().await { - let Ok(Message::Text(text)) = msg else { - continue; - }; + let mut recv_task = { + let sender_tx = sender_tx.clone(); + tokio::spawn(async move { + let mut player_session = None; + let player_color = Arc::new(RwLock::new(String::new())); - match serde_json::from_str(text.as_str()) { - Ok(IncomingPlayMessage::Initialize { id }) => { - let session = { - let sessions = app_state.sessions.read().unwrap(); - sessions - .get(&id) - .map(Arc::to_owned) - .ok_or("Session did not exist") + while let Some(msg) = receiver.next().await { + let Ok(Message::Text(text)) = msg else { + continue; }; - match session { - Ok(session) => { - let colors: Vec = session + match serde_json::from_str(text.as_str()) { + Ok(IncomingPlayMessage::Initialize { id }) => { + let session = { + let sessions = app_state.sessions.read().unwrap(); + sessions + .get(&id) + .map(Arc::to_owned) + .ok_or("Session did not exist") + }; + + match session { + Ok(session) => { + let (colors, update_rx) = { + let session = session.lock().unwrap(); + + let colors: Vec = + session.seats.keys().cloned().collect(); + let update_rx = session.update_tx.subscribe(); + + (colors, update_rx) + }; + + player_session = Some(Arc::downgrade(&session)); + { + let sender_tx = sender_tx.clone(); + let player_session = Arc::downgrade(&session); + let player_color = player_color.clone(); + + tokio::spawn(async move { + handle_update( + update_rx, + sender_tx, + player_session, + player_color, + ) + .await + }); + } + + let response = OutgoingPlayMessage::Initialize { colors }; + if sender_tx.send(response).await.is_err() { + break; + } + } + Err(err) => { + eprintln!("Failed to access session: {}", err); + let response = OutgoingPlayMessage::Error; + if sender_tx.send(response).await.is_err() { + break; + } + } + } + } + Ok(IncomingPlayMessage::Color(color)) => { + let Some(session) = + player_session.clone().and_then(|session| session.upgrade()) + else { + let response = OutgoingPlayMessage::Error; + if sender_tx.send(response).await.is_err() { + break; + } + break; + }; + let hand = session .lock() .unwrap() .seats - .keys() - .map(String::to_owned) - .collect(); - player_session = Some(Arc::downgrade(&session)); - let response = OutgoingPlayMessage::Initialize { - colors: colors.iter().collect(), + .get(&color) + .map(|seat| seat.hand.to_owned()); + match hand { + Some(hand) => { + *player_color.write().unwrap() = color; + if sender_tx + .send(OutgoingPlayMessage::Hand(hand)) + .await + .is_err() + { + break; + } + } + None => { + if sender_tx.send(OutgoingPlayMessage::Error).await.is_err() { + break; + } + } }; - send_message_or_break!(&mut socket, &response); } Err(err) => { - eprintln!("Failed to access session: {}", err); - let response = OutgoingPlayMessage::Error; - send_message_or_break!(&mut socket, &response); + eprintln!( + "Encountered an error while handling a message from a player: {}", + err + ); + break; } } } - Ok(IncomingPlayMessage::Color(color)) => { - let Some(session) = player_session.clone().and_then(|session| session.upgrade()) - else { - let response = OutgoingPlayMessage::Error; - send_message_or_break!(&mut socket, &response); - break; - }; - let hand = session - .lock() - .unwrap() - .seats - .get(&color) - .map(|seat| (&seat.hand).to_owned()); - match hand { - Some(hand) => { - // Response constructed here because the inner value of the Option would be dropped outside the match block - let response = OutgoingPlayMessage::Hand(hand.iter().collect()); - send_message_or_break!(&mut socket, &response); - } - None => { - let response = OutgoingPlayMessage::Error; - send_message_or_break!(&mut socket, &response); - } - }; - } - Err(err) => { - eprintln!( - "Encountered an error while handling a message from a player: {}", - err - ); - break; + }) + }; + + tokio::select! { + _ = &mut send_task => recv_task.abort(), + _ = &mut recv_task => send_task.abort(), + } +} + +async fn handle_update( + mut update_rx: Receiver, + sender_tx: Sender, + _player_session: Weak>, + player_color: Arc>, +) { + loop { + match update_rx.recv().await { + Ok(PlayUpdate::HandUpdate(color, hand)) => { + if *player_color.read().unwrap() == color { + let _ = sender_tx.send(OutgoingPlayMessage::Hand(hand)).await; + } } + Err(RecvError::Closed) => break, + Err(RecvError::Lagged(_)) => continue, } } } diff --git a/src/session.rs b/src/session.rs index acd7615..4797c5c 100644 --- a/src/session.rs +++ b/src/session.rs @@ -1,19 +1,21 @@ use serde::{Deserialize, Serialize}; use std::collections::HashMap; +use tokio::sync::broadcast; pub struct Session { pub steam_name: String, pub seats: HashMap, + pub update_tx: broadcast::Sender, } -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub enum HandObject { CustomDeck(CustomDeck), } // TODO: These fields will be used in the future. When they are, the dead_code lint should no longer // be suppressed. -#[derive(Clone, Serialize, Deserialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] #[allow(dead_code)] pub struct CustomDeck { /// The path/URL of the face cardsheet. @@ -41,11 +43,19 @@ pub struct Seat { pub hand: Vec, } +#[derive(Clone)] +pub enum PlayUpdate { + HandUpdate(String, Vec), +} + impl Session { pub fn new(steam_name: String) -> Self { + let (update_tx, _) = broadcast::channel(10); + Session { steam_name, seats: HashMap::new(), + update_tx, } } } -- cgit v1.2.3