summaryrefslogtreecommitdiff
path: root/src/play.rs
diff options
context:
space:
mode:
authorJomar Milan <jomarm@jomarm.com>2026-06-10 14:59:07 -0700
committerJomar Milan <jomarm@jomarm.com>2026-06-10 14:59:07 -0700
commit067e9602ba4feccf947a138c9e73462c446268bc (patch)
tree7f9320d3d6bbe735d1bee519109c24661353eabd /src/play.rs
parent13374b7928788e8cdc6c7905209bafdf943dc02e (diff)
Add syncing of player hand updates to browsers
Diffstat (limited to 'src/play.rs')
-rw-r--r--src/play.rs234
1 files changed, 150 insertions, 84 deletions
diff --git a/src/play.rs b/src/play.rs
index f401fea..82380f9 100644
--- a/src/play.rs
+++ b/src/play.rs
@@ -1,19 +1,13 @@
use crate::AppState;
-use crate::session::HandObject;
+use crate::session::{HandObject, PlayUpdate, Session};
use axum::extract::ws::{Message, Utf8Bytes, WebSocket};
+use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
-use std::error::Error;
-use std::sync::Arc;
-
-macro_rules! send_message_or_break {
- ($socket:expr, $message:expr) => {{
- let result = send_outgoing_message($socket, $message).await;
- if let Err(err) = result {
- eprintln!("Failed to send message to socket: {}", err);
- break;
- }
- }};
-}
+use std::sync::{Arc, Mutex, RwLock, Weak};
+use tokio::sync::broadcast::Receiver;
+use tokio::sync::broadcast::error::RecvError;
+use tokio::sync::mpsc;
+use tokio::sync::mpsc::Sender;
#[derive(Deserialize)]
enum IncomingPlayMessage {
@@ -21,97 +15,169 @@ enum IncomingPlayMessage {
Color(String),
}
+// TODO: Maybe derive Clone, reference interior vals
#[derive(Serialize)]
-enum OutgoingPlayMessage<'a> {
- Initialize { colors: Vec<&'a String> },
- Hand(Vec<&'a HandObject>),
+enum OutgoingPlayMessage {
+ Initialize { colors: Vec<String> },
+ Hand(Vec<HandObject>),
// TODO: include error details
Error,
}
-async fn send_outgoing_message(
- socket: &mut WebSocket,
- message: &OutgoingPlayMessage<'_>,
-) -> Result<(), Box<dyn Error>> {
- let serialized = serde_json::to_string(message)?;
- socket
- .send(Message::Text(Utf8Bytes::from(serialized)))
- .await
- .map_err(Box::from)
-}
+pub async fn handle_play(socket: WebSocket, app_state: Arc<AppState>) {
+ let (mut sender, mut receiver) = socket.split();
+ let (sender_tx, mut sender_rx) = mpsc::channel(2);
-pub async fn handle_play(mut socket: WebSocket, app_state: Arc<AppState>) {
- let mut player_session = None;
+ let mut send_task = tokio::spawn(async move {
+ while let Some(message) = sender_rx.recv().await {
+ let serialized = match serde_json::to_string(&message) {
+ Ok(serialized) => serialized,
+ Err(err) => {
+ eprintln!("Failed to serialize outgoing websocket message: {}", err);
+ break;
+ }
+ };
+ if let Err(err) = sender
+ .send(Message::Text(Utf8Bytes::from(serialized)))
+ .await
+ {
+ eprintln!("Failed to send serialized websocket message: {}", err);
+ break;
+ }
+ }
+ });
- while let Some(msg) = socket.recv().await {
- let Ok(Message::Text(text)) = msg else {
- continue;
- };
+ let mut recv_task = {
+ let sender_tx = sender_tx.clone();
+ tokio::spawn(async move {
+ let mut player_session = None;
+ let player_color = Arc::new(RwLock::new(String::new()));
- match serde_json::from_str(text.as_str()) {
- Ok(IncomingPlayMessage::Initialize { id }) => {
- let session = {
- let sessions = app_state.sessions.read().unwrap();
- sessions
- .get(&id)
- .map(Arc::to_owned)
- .ok_or("Session did not exist")
+ while let Some(msg) = receiver.next().await {
+ let Ok(Message::Text(text)) = msg else {
+ continue;
};
- match session {
- Ok(session) => {
- let colors: Vec<String> = session
+ match serde_json::from_str(text.as_str()) {
+ Ok(IncomingPlayMessage::Initialize { id }) => {
+ let session = {
+ let sessions = app_state.sessions.read().unwrap();
+ sessions
+ .get(&id)
+ .map(Arc::to_owned)
+ .ok_or("Session did not exist")
+ };
+
+ match session {
+ Ok(session) => {
+ let (colors, update_rx) = {
+ let session = session.lock().unwrap();
+
+ let colors: Vec<String> =
+ session.seats.keys().cloned().collect();
+ let update_rx = session.update_tx.subscribe();
+
+ (colors, update_rx)
+ };
+
+ player_session = Some(Arc::downgrade(&session));
+ {
+ let sender_tx = sender_tx.clone();
+ let player_session = Arc::downgrade(&session);
+ let player_color = player_color.clone();
+
+ tokio::spawn(async move {
+ handle_update(
+ update_rx,
+ sender_tx,
+ player_session,
+ player_color,
+ )
+ .await
+ });
+ }
+
+ let response = OutgoingPlayMessage::Initialize { colors };
+ if sender_tx.send(response).await.is_err() {
+ break;
+ }
+ }
+ Err(err) => {
+ eprintln!("Failed to access session: {}", err);
+ let response = OutgoingPlayMessage::Error;
+ if sender_tx.send(response).await.is_err() {
+ break;
+ }
+ }
+ }
+ }
+ Ok(IncomingPlayMessage::Color(color)) => {
+ let Some(session) =
+ player_session.clone().and_then(|session| session.upgrade())
+ else {
+ let response = OutgoingPlayMessage::Error;
+ if sender_tx.send(response).await.is_err() {
+ break;
+ }
+ break;
+ };
+ let hand = session
.lock()
.unwrap()
.seats
- .keys()
- .map(String::to_owned)
- .collect();
- player_session = Some(Arc::downgrade(&session));
- let response = OutgoingPlayMessage::Initialize {
- colors: colors.iter().collect(),
+ .get(&color)
+ .map(|seat| seat.hand.to_owned());
+ match hand {
+ Some(hand) => {
+ *player_color.write().unwrap() = color;
+ if sender_tx
+ .send(OutgoingPlayMessage::Hand(hand))
+ .await
+ .is_err()
+ {
+ break;
+ }
+ }
+ None => {
+ if sender_tx.send(OutgoingPlayMessage::Error).await.is_err() {
+ break;
+ }
+ }
};
- send_message_or_break!(&mut socket, &response);
}
Err(err) => {
- eprintln!("Failed to access session: {}", err);
- let response = OutgoingPlayMessage::Error;
- send_message_or_break!(&mut socket, &response);
+ eprintln!(
+ "Encountered an error while handling a message from a player: {}",
+ err
+ );
+ break;
}
}
}
- Ok(IncomingPlayMessage::Color(color)) => {
- let Some(session) = player_session.clone().and_then(|session| session.upgrade())
- else {
- let response = OutgoingPlayMessage::Error;
- send_message_or_break!(&mut socket, &response);
- break;
- };
- let hand = session
- .lock()
- .unwrap()
- .seats
- .get(&color)
- .map(|seat| (&seat.hand).to_owned());
- match hand {
- Some(hand) => {
- // Response constructed here because the inner value of the Option would be dropped outside the match block
- let response = OutgoingPlayMessage::Hand(hand.iter().collect());
- send_message_or_break!(&mut socket, &response);
- }
- None => {
- let response = OutgoingPlayMessage::Error;
- send_message_or_break!(&mut socket, &response);
- }
- };
- }
- Err(err) => {
- eprintln!(
- "Encountered an error while handling a message from a player: {}",
- err
- );
- break;
+ })
+ };
+
+ tokio::select! {
+ _ = &mut send_task => recv_task.abort(),
+ _ = &mut recv_task => send_task.abort(),
+ }
+}
+
+async fn handle_update(
+ mut update_rx: Receiver<PlayUpdate>,
+ sender_tx: Sender<OutgoingPlayMessage>,
+ _player_session: Weak<Mutex<Session>>,
+ player_color: Arc<RwLock<String>>,
+) {
+ loop {
+ match update_rx.recv().await {
+ Ok(PlayUpdate::HandUpdate(color, hand)) => {
+ if *player_color.read().unwrap() == color {
+ let _ = sender_tx.send(OutgoingPlayMessage::Hand(hand)).await;
+ }
}
+ Err(RecvError::Closed) => break,
+ Err(RecvError::Lagged(_)) => continue,
}
}
}