Skip to main content

server/
server_state.rs

1use std::sync::Arc;
2
3use cqrs_es::Query;
4use postgres_es::{PostgresCqrs, PostgresViewRepository, default_postgress_pool, postgres_cqrs};
5use sqlx::{PgPool, migrate, postgres::*};
6use tokio::sync::broadcast;
7use tracing::{error, warn};
8
9use crate::{
10    database::Notification,
11    domain::{Game, GameServices},
12    error::{ServerError, bug},
13    projections::{GameQuery, GameView},
14};
15
16/// Shared application state for the server.
17#[derive(Clone)]
18pub struct ServerState {
19    /// SQLx connection pool to PostgreSQL
20    pub pool: Arc<PgPool>,
21
22    /// CQRS/es instance the `Game` aggregate
23    pub cqrs: Arc<PostgresCqrs<Game>>,
24
25    /// Repository that maintains the `GameView` read model
26    pub game_view_repo: Arc<PostgresViewRepository<GameView, Game>>,
27
28    /// Broadcast channel – subscribers receive a `Notification` whenever
29    /// the database changes (e.g. new event persisted)
30    pub database_changes_sender: broadcast::Sender<Notification>,
31}
32
33impl ServerState {
34    /// Returns a new subscriber to database change notifications.
35    pub fn subscribe_database_changes(&self) -> broadcast::Receiver<Notification> {
36        self.database_changes_sender.subscribe()
37    }
38}
39
40/// Constructs the shared `ServerState`.
41///
42/// - Loads `DATABASE_URL` from environment
43/// - Creates and migrates the PostgreSQL pool
44/// - Sets up the CQRS framework with `GameQuery` read model
45/// - Starts the database change broadcaster
46///
47/// Panics if the database URL is missing or migrations fail.
48pub async fn initialize_server_state() -> Result<ServerState, ServerError> {
49    let database_url = std::env::var("DATABASE_URL").expect("Database url is not specified");
50
51    let pool = default_postgress_pool(&database_url).await;
52    migrate!()
53        .run(&pool)
54        .await
55        .unwrap_or_else(|_| panic!("Failed to migrate data in database: {database_url}"));
56
57    let pool = Arc::new(pool);
58
59    let game_view_repo = Arc::new(PostgresViewRepository::new("game_query", (*pool).clone()));
60
61    let mut game_query = GameQuery::new(game_view_repo.clone());
62    game_query.use_error_handler(Box::new(|e| error!("{e}")));
63
64    let queries: Vec<Box<dyn Query<Game>>> = vec![Box::new(game_query)];
65    let services = GameServices {};
66    let cqrs = postgres_cqrs::<Game>((*pool).clone(), queries, services);
67    let cqrs = Arc::new(cqrs);
68
69    let database_changes_sender = create_database_changes_sender((*pool).clone()).await?;
70
71    let state = ServerState {
72        pool,
73        cqrs,
74        game_view_repo,
75        database_changes_sender,
76    };
77
78    Ok(state)
79}
80
81async fn create_database_changes_sender(
82    postgres_pool: PgPool,
83) -> Result<broadcast::Sender<Notification>, ServerError> {
84    let mut listener = PgListener::connect_with(&postgres_pool)
85        .await
86        .map_err(bug!())?;
87
88    listener
89        .listen_all(["game_query_change"])
90        .await
91        .map_err(bug!())?;
92
93    let (sender, _): (broadcast::Sender<Notification>, _) = broadcast::channel(10);
94    let task_sender = sender.clone();
95
96    tokio::spawn(async move {
97        loop {
98            match listener.recv().await {
99                Ok(notification) => {
100                    let payload = serde_json::from_str::<Notification>(notification.payload());
101                    match payload {
102                        Ok(payload) => {
103                            let _ = task_sender.send(payload);
104                        }
105                        Err(error) => {
106                            warn!("server_state: {error}");
107                        }
108                    }
109                }
110                Err(error) => {
111                    error!("server_state:database_listener: failed: {error}");
112                    break;
113                }
114            }
115        }
116    });
117
118    Ok(sender)
119}