Skip to main content

server/services/stream/
available_game_events.rs

1use std::str::FromStr;
2
3use futures::{Stream, TryStreamExt};
4use tokio_stream::{StreamExt, wrappers::BroadcastStream};
5use tracing::warn;
6
7use crate::{
8    database::{Operation, Timing},
9    domain::{GameId, UserId},
10    error::{ServerError, bug},
11    server_state::ServerState,
12    services::queries::get_game,
13};
14
15/// Represents a change in the list of available games for a user.
16pub enum AvailableGameEvent {
17    /// A new game was created and is available to join (hosted)
18    /// or play (computer).
19    Created {
20        /// The game idenitifier of the game that was created.
21        game_id: GameId,
22
23        /// The game's name.
24        name: String,
25    },
26}
27
28/// Streams updates about games that are available to the given user.
29///
30/// The stream yields `AvailableGameEvent`s when a game is created or removed
31/// from the list of games available to the user.
32///
33/// # Parameters
34///
35/// - `server_state`: The shared server state, including the database change broadcaster.
36/// - `user_id`: The ID of the user for whom available games are tracked.
37///
38/// # Returns
39///
40/// A `Stream` of `AvailableGameEvent`s wrapped in `Result`.
41/// Errors may occur due to internal server issues.
42pub async fn available_game_events(
43    server_state: ServerState,
44    user_id: UserId,
45) -> Result<impl Stream<Item = AvailableGameEvent>, ServerError> {
46    let stream =
47        BroadcastStream::new(server_state.database_changes_sender.subscribe()).map_err(bug!());
48
49    let stream = stream.try_filter_map(move |notification| {
50        let server_state = server_state.clone();
51
52        async move {
53            let is_game_query = notification.table_name == "game_query";
54            let is_after = matches!(notification.timing, Timing::After);
55
56            let may_emit = is_game_query && is_after;
57
58            let event = may_emit
59                .then_some({
60                    let game_id = notification
61                        .primary_key
62                        .iter()
63                        .find(|pk| pk.column == "view_id")
64                        .and_then(|pk| pk.value.as_str())
65                        .map(GameId::from_str)
66                        .transpose()
67                        .map_err(bug!())?;
68
69                    if let Some(game_id) = game_id
70                        && let Some(game) = get_game(server_state, game_id).await?
71                    {
72                        let user_can_join = game.host() != user_id && game.guest().is_none();
73                        let valid_player = game.validate_user(user_id).is_some();
74
75                        match notification.operation {
76                            Operation::Insert if user_can_join || valid_player => {
77                                Some(AvailableGameEvent::Created {
78                                    game_id: game.id(),
79                                    name: String::from(game.name()),
80                                })
81                            }
82                            _ => None,
83                        }
84                    } else {
85                        None
86                    }
87                })
88                .flatten();
89
90            Ok(event)
91        }
92    });
93
94    let stream = stream.filter_map(|result| match result {
95        Ok(event) => Some(event),
96        Err(error) => {
97            warn!("server:services:available_game_events: error: {error}");
98            None
99        }
100    });
101
102    Ok(stream)
103}