1use std::sync::Arc;
2
3use cqrs_es::Query;
4use postgres_es::{PostgresCqrs, PostgresViewRepository, default_postgress_pool, postgres_cqrs};
5use sqlx::{PgPool, migrate, postgres::*};
6use tokio::sync::broadcast;
7use tracing::{error, warn};
8
9use crate::{
10 database::Notification,
11 domain::{Game, GameServices},
12 error::{ServerError, bug},
13 projections::{GameQuery, GameView},
14};
15
16#[derive(Clone)]
18pub struct ServerState {
19 pub pool: Arc<PgPool>,
21
22 pub cqrs: Arc<PostgresCqrs<Game>>,
24
25 pub game_view_repo: Arc<PostgresViewRepository<GameView, Game>>,
27
28 pub database_changes_sender: broadcast::Sender<Notification>,
31}
32
33impl ServerState {
34 pub fn subscribe_database_changes(&self) -> broadcast::Receiver<Notification> {
36 self.database_changes_sender.subscribe()
37 }
38}
39
40pub async fn initialize_server_state() -> Result<ServerState, ServerError> {
49 let database_url = std::env::var("DATABASE_URL").expect("Database url is not specified");
50
51 let pool = default_postgress_pool(&database_url).await;
52 migrate!()
53 .run(&pool)
54 .await
55 .unwrap_or_else(|_| panic!("Failed to migrate data in database: {database_url}"));
56
57 let pool = Arc::new(pool);
58
59 let game_view_repo = Arc::new(PostgresViewRepository::new("game_query", (*pool).clone()));
60
61 let mut game_query = GameQuery::new(game_view_repo.clone());
62 game_query.use_error_handler(Box::new(|e| error!("{e}")));
63
64 let queries: Vec<Box<dyn Query<Game>>> = vec![Box::new(game_query)];
65 let services = GameServices {};
66 let cqrs = postgres_cqrs::<Game>((*pool).clone(), queries, services);
67 let cqrs = Arc::new(cqrs);
68
69 let database_changes_sender = create_database_changes_sender((*pool).clone()).await?;
70
71 let state = ServerState {
72 pool,
73 cqrs,
74 game_view_repo,
75 database_changes_sender,
76 };
77
78 Ok(state)
79}
80
81async fn create_database_changes_sender(
82 postgres_pool: PgPool,
83) -> Result<broadcast::Sender<Notification>, ServerError> {
84 let mut listener = PgListener::connect_with(&postgres_pool)
85 .await
86 .map_err(bug!())?;
87
88 listener
89 .listen_all(["game_query_change"])
90 .await
91 .map_err(bug!())?;
92
93 let (sender, _): (broadcast::Sender<Notification>, _) = broadcast::channel(10);
94 let task_sender = sender.clone();
95
96 tokio::spawn(async move {
97 loop {
98 match listener.recv().await {
99 Ok(notification) => {
100 let payload = serde_json::from_str::<Notification>(notification.payload());
101 match payload {
102 Ok(payload) => {
103 let _ = task_sender.send(payload);
104 }
105 Err(error) => {
106 warn!("server_state: {error}");
107 }
108 }
109 }
110 Err(error) => {
111 error!("server_state:database_listener: failed: {error}");
112 break;
113 }
114 }
115 }
116 });
117
118 Ok(sender)
119}