server/services/stream/
available_game_events.rs1use std::str::FromStr;
2
3use futures::{Stream, TryStreamExt};
4use tokio_stream::{StreamExt, wrappers::BroadcastStream};
5use tracing::warn;
6
7use crate::{
8 database::{Operation, Timing},
9 domain::{GameId, UserId},
10 error::{ServerError, bug},
11 server_state::ServerState,
12 services::queries::get_game,
13};
14
15pub enum AvailableGameEvent {
17 Created {
20 game_id: GameId,
22
23 name: String,
25 },
26}
27
28pub async fn available_game_events(
43 server_state: ServerState,
44 user_id: UserId,
45) -> Result<impl Stream<Item = AvailableGameEvent>, ServerError> {
46 let stream =
47 BroadcastStream::new(server_state.database_changes_sender.subscribe()).map_err(bug!());
48
49 let stream = stream.try_filter_map(move |notification| {
50 let server_state = server_state.clone();
51
52 async move {
53 let is_game_query = notification.table_name == "game_query";
54 let is_after = matches!(notification.timing, Timing::After);
55
56 let may_emit = is_game_query && is_after;
57
58 let event = may_emit
59 .then_some({
60 let game_id = notification
61 .primary_key
62 .iter()
63 .find(|pk| pk.column == "view_id")
64 .and_then(|pk| pk.value.as_str())
65 .map(GameId::from_str)
66 .transpose()
67 .map_err(bug!())?;
68
69 if let Some(game_id) = game_id
70 && let Some(game) = get_game(server_state, game_id).await?
71 {
72 let user_can_join = game.host() != user_id && game.guest().is_none();
73 let valid_player = game.validate_user(user_id).is_some();
74
75 match notification.operation {
76 Operation::Insert if user_can_join || valid_player => {
77 Some(AvailableGameEvent::Created {
78 game_id: game.id(),
79 name: String::from(game.name()),
80 })
81 }
82 _ => None,
83 }
84 } else {
85 None
86 }
87 })
88 .flatten();
89
90 Ok(event)
91 }
92 });
93
94 let stream = stream.filter_map(|result| match result {
95 Ok(event) => Some(event),
96 Err(error) => {
97 warn!("server:services:available_game_events: error: {error}");
98 None
99 }
100 });
101
102 Ok(stream)
103}