server/services/stream/
game_stream.rs1use futures::{Stream, TryStreamExt};
2use tokio_stream::{StreamExt, wrappers::BroadcastStream};
3use tracing::warn;
4
5use crate::{
6 database::{Operation, Timing},
7 domain::{Game, GameId},
8 error::{ServerError, bug},
9 server_state::ServerState,
10 services::queries::get_game,
11};
12
13pub async fn game_stream(
29 server_state: ServerState,
30 game_id: GameId,
31) -> Result<impl Stream<Item = Game>, ServerError> {
32 let stream =
33 BroadcastStream::new(server_state.database_changes_sender.subscribe()).map_err(bug!());
34
35 let stream = stream.try_filter_map(move |notification| {
36 let server_state = server_state.clone();
37
38 async move {
39 let is_game_query = notification.table_name == "game_query";
40 let is_after = matches!(notification.timing, Timing::After);
41 let is_upsert = !matches!(notification.operation, Operation::Delete);
42 let matches_game = notification
43 .primary_key
44 .iter()
45 .find(|pk| pk.column == "view_id")
46 .and_then(|pk| pk.value.as_str())
47 .map(|v| v == game_id.value().to_string())
48 .unwrap_or(false);
49
50 let should_emit = is_game_query && is_after && is_upsert && matches_game;
51
52 let game = if should_emit {
53 get_game(server_state, game_id).await?
54 } else {
55 None
56 };
57
58 Ok(game)
59 }
60 });
61
62 let stream = stream.filter_map(|result| match result {
63 Ok(game) => Some(game),
64 Err(error) => {
65 warn!("server:services:game_stream: error: {error}");
66 None
67 }
68 });
69
70 Ok(stream)
71}