Skip to main content

server/services/stream/
game_stream.rs

1use futures::{Stream, TryStreamExt};
2use tokio_stream::{StreamExt, wrappers::BroadcastStream};
3use tracing::warn;
4
5use crate::{
6    database::{Operation, Timing},
7    domain::{Game, GameId},
8    error::{ServerError, bug},
9    server_state::ServerState,
10    services::queries::get_game,
11};
12
13/// Streams updates for a specific game.
14///
15/// This function returns a stream of `Game` objects representing changes
16/// to the specified game. Each item in the stream reflects the current
17/// state of the game after a change (creation, update, or deletion).
18///
19/// # Parameters
20///
21/// - `server_state`: The shared server state, including the database change broadcaster.
22/// - `game_id`: The ID of the game to track.
23///
24/// # Returns
25///
26/// A `Stream` of `Game`s wrapped in `Result`. Errors may occur due to
27/// internal server issues, in which case a `ServerError` is returned.
28pub async fn game_stream(
29    server_state: ServerState,
30    game_id: GameId,
31) -> Result<impl Stream<Item = Game>, ServerError> {
32    let stream =
33        BroadcastStream::new(server_state.database_changes_sender.subscribe()).map_err(bug!());
34
35    let stream = stream.try_filter_map(move |notification| {
36        let server_state = server_state.clone();
37
38        async move {
39            let is_game_query = notification.table_name == "game_query";
40            let is_after = matches!(notification.timing, Timing::After);
41            let is_upsert = !matches!(notification.operation, Operation::Delete);
42            let matches_game = notification
43                .primary_key
44                .iter()
45                .find(|pk| pk.column == "view_id")
46                .and_then(|pk| pk.value.as_str())
47                .map(|v| v == game_id.value().to_string())
48                .unwrap_or(false);
49
50            let should_emit = is_game_query && is_after && is_upsert && matches_game;
51
52            let game = if should_emit {
53                get_game(server_state, game_id).await?
54            } else {
55                None
56            };
57
58            Ok(game)
59        }
60    });
61
62    let stream = stream.filter_map(|result| match result {
63        Ok(game) => Some(game),
64        Err(error) => {
65            warn!("server:services:game_stream: error: {error}");
66            None
67        }
68    });
69
70    Ok(stream)
71}