reactive_graph_runtime_web_impl/
graphql_server_impl.rs

1use std::fmt::Debug;
2use std::fs::File;
3use std::io::BufReader;
4use std::sync::Arc;
5use std::sync::atomic::AtomicBool;
6use std::sync::atomic::Ordering;
7
8use actix_cors::Cors;
9use actix_web::App;
10use actix_web::HttpServer;
11use actix_web::ResponseError;
12use actix_web::Result;
13use actix_web::dev::Server;
14use actix_web::guard;
15use actix_web::web;
16use actix_web_extras::middleware::Condition;
17use async_trait::async_trait;
18use crossbeam::channel::Receiver;
19use log::debug;
20use log::error;
21use log::info;
22use log::trace;
23use rustls::ServerConfig;
24use rustls::pki_types::CertificateDer;
25use rustls::pki_types::PrivateKeyDer;
26use rustls_pemfile::Item;
27use rustls_pemfile::certs;
28use rustls_pemfile::read_all;
29use springtime_di::Component;
30use springtime_di::component_alias;
31
32use reactive_graph_config_api::ConfigManager;
33use reactive_graph_dynamic_graph_api::DynamicGraphSchemaManager;
34use reactive_graph_dynamic_graph_web::query_dynamic_graph;
35use reactive_graph_graphql_api::GraphQLSchemaManager;
36use reactive_graph_graphql_web::query_graphql;
37use reactive_graph_graphql_web::subscription_websocket;
38use reactive_graph_lifecycle::Lifecycle;
39use reactive_graph_plugin_graphql_api::PluginSchemaManager;
40use reactive_graph_plugin_graphql_web::query_plugin_graphql;
41use reactive_graph_reactive_service_api::ReactiveEntityManager;
42use reactive_graph_reactive_service_api::ReactiveFlowManager;
43use reactive_graph_reactive_service_api::ReactiveRelationManager;
44use reactive_graph_runtime_graphql_api::RuntimeSchemaManager;
45use reactive_graph_runtime_graphql_web::query_runtime_graphql;
46use reactive_graph_runtime_web_api::GraphQLServer;
47use reactive_graph_runtime_web_api::WebResourceManager;
48use reactive_graph_type_system_api::ComponentManager;
49use reactive_graph_type_system_api::EntityTypeManager;
50use reactive_graph_type_system_api::FlowTypeManager;
51use reactive_graph_type_system_api::RelationTypeManager;
52
53use crate::get_logger_middleware;
54use crate::web_resource_manager_handler::handle_root_web_resource;
55use crate::web_resource_manager_handler::handle_web_resource;
56
57#[derive(Debug, thiserror::Error)]
58pub enum GraphQLServerError {
59    #[error("The certificate chain is empty")]
60    EmptyCertificateChain,
61    #[error("No private key was found")]
62    NoPrivateKeyFound,
63    #[error("rustls error: {0}")]
64    RustlsError(#[from] rustls::Error),
65}
66
67impl ResponseError for GraphQLServerError {}
68
69#[derive(Component)]
70pub struct GraphQLServerImpl {
71    web_resource_manager: Arc<dyn WebResourceManager + Send + Sync>,
72
73    config_manager: Arc<dyn ConfigManager + Send + Sync>,
74
75    component_manager: Arc<dyn ComponentManager + Send + Sync>,
76
77    entity_type_manager: Arc<dyn EntityTypeManager + Send + Sync>,
78
79    relation_type_manager: Arc<dyn RelationTypeManager + Send + Sync>,
80
81    flow_type_manager: Arc<dyn FlowTypeManager + Send + Sync>,
82
83    reactive_entity_manager: Arc<dyn ReactiveEntityManager + Send + Sync>,
84
85    reactive_relation_manager: Arc<dyn ReactiveRelationManager + Send + Sync>,
86
87    reactive_flow_manager: Arc<dyn ReactiveFlowManager + Send + Sync>,
88
89    graphql_schema_manager: Arc<dyn GraphQLSchemaManager + Send + Sync>,
90
91    dynamic_graph_schema_manager: Arc<dyn DynamicGraphSchemaManager + Send + Sync>,
92
93    runtime_schema_manager: Arc<dyn RuntimeSchemaManager + Send + Sync>,
94
95    plugin_schema_manager: Arc<dyn PluginSchemaManager + Send + Sync>,
96}
97
98#[async_trait]
99#[component_alias]
100impl GraphQLServer for GraphQLServerImpl {
101    async fn serve(&self, stopper: Receiver<()>) {
102        trace!("Initialize GraphQL server");
103        let server = match self.setup() {
104            Ok(server) => server,
105            Err(e) => {
106                error!("Failed to setup graphql server: {e}!");
107                return;
108            }
109        };
110        let server_handle = server.handle();
111        let t_server_handle = server_handle.clone();
112
113        let terminate = Arc::new(AtomicBool::new(false));
114        let t_terminate = terminate.clone();
115
116        // This thread handles the server stop routine from the main thread
117        tokio::spawn(async move {
118            trace!("Waiting for shutdown signal");
119            // wait for shutdown signal
120            stopper.recv().unwrap_or(());
121            debug!("Received shutdown signal. Stopping GraphQL server thread.");
122
123            // stop server gracefully
124            trace!("Stopping server gracefully");
125            t_server_handle.stop(true).await;
126            debug!("Successfully stopped GraphQL server thread.");
127
128            t_terminate.store(true, Ordering::Relaxed);
129            debug!("Stopping actix system.");
130        });
131
132        let _ = server.await;
133        trace!("GraphQL server finished");
134    }
135}
136
137impl GraphQLServerImpl {
138    fn setup(&self) -> Result<Server> {
139        let graphql_server_config = self.config_manager.get_graphql_server_config();
140        let graphql_logging_config = graphql_server_config.logging.as_ref().cloned().unwrap_or_default();
141
142        // GraphQL Schema
143        let schema = self.graphql_schema_manager.get_schema();
144        let schema_data = web::Data::new(schema);
145
146        // Runtime GraphQL Schema
147        let runtime_schema = self.runtime_schema_manager.get_schema();
148        let runtime_schema_data = web::Data::new(runtime_schema);
149
150        // Plugin GraphQL Schema
151        let plugin_schema = self.plugin_schema_manager.get_schema();
152        let plugin_schema_data = web::Data::new(plugin_schema);
153
154        // REST SERVICES
155        let component_manager = web::Data::new(self.component_manager.clone());
156        let entity_type_manager = web::Data::new(self.entity_type_manager.clone());
157        let relation_type_manager = web::Data::new(self.relation_type_manager.clone());
158        let flow_type_manager = web::Data::new(self.flow_type_manager.clone());
159
160        let entity_instance_manager = web::Data::new(self.reactive_entity_manager.clone());
161        let relation_instance_manager = web::Data::new(self.reactive_relation_manager.clone());
162        let flow_instance_manager = web::Data::new(self.reactive_flow_manager.clone());
163
164        let web_resource_manager = web::Data::new(self.web_resource_manager.clone());
165
166        let dynamic_graph_schema_manager = web::Data::new(self.dynamic_graph_schema_manager.clone());
167
168        let http_server = HttpServer::new(move || {
169            let graphql_logging_config = graphql_logging_config.clone();
170            App::new()
171                .wrap(Cors::permissive())
172                .wrap(Condition::from_option(get_logger_middleware(&graphql_logging_config)))
173                // Type System
174                .app_data(component_manager.clone())
175                .app_data(entity_type_manager.clone())
176                .app_data(relation_type_manager.clone())
177                .app_data(flow_type_manager.clone())
178                // Instance System
179                .app_data(entity_instance_manager.clone())
180                .app_data(relation_instance_manager.clone())
181                .app_data(flow_instance_manager.clone())
182                // Web Resources
183                .app_data(web_resource_manager.clone())
184                // GraphQL Schema
185                .app_data(schema_data.clone())
186                // Runtime GraphQL Schema
187                .app_data(runtime_schema_data.clone())
188                // Plugin GraphQL Schema
189                .app_data(plugin_schema_data.clone())
190                // Dynamic Graph
191                .app_data(dynamic_graph_schema_manager.clone())
192                // GraphQL API
193                .service(query_graphql)
194                .service(
195                    web::resource("/graphql")
196                        .guard(guard::Get())
197                        .guard(guard::Header("upgrade", "websocket"))
198                        .to(subscription_websocket),
199                )
200                // Dynamic GraphQL API
201                .service(query_dynamic_graph)
202                // Runtime GraphQL API
203                .service(query_runtime_graphql)
204                .service(query_plugin_graphql)
205                // Type System REST API
206                .service(reactive_graph_type_system_rest::components::get_components)
207                .service(reactive_graph_type_system_rest::components::get_component)
208                .service(reactive_graph_type_system_rest::entities::get_entity_types)
209                .service(reactive_graph_type_system_rest::entities::get_entity_type)
210                .service(reactive_graph_type_system_rest::relations::get_relation_types)
211                .service(reactive_graph_type_system_rest::relations::get_relation_type)
212                .service(reactive_graph_type_system_rest::flows::get_flow_types)
213                .service(reactive_graph_type_system_rest::flows::get_flow_type)
214                // Type System JSON Schema
215                .service(reactive_graph_type_system_rest::components::json_schema_components)
216                .service(reactive_graph_type_system_rest::entities::json_schema_entity_types)
217                .service(reactive_graph_type_system_rest::relations::json_schema_relation_types)
218                .service(reactive_graph_type_system_rest::flows::json_schema_flow_types)
219                // Instance System JSON Schema
220                .service(reactive_graph_instance_system_rest::entities::json_schema_entity_instances)
221                .service(reactive_graph_instance_system_rest::relations::json_schema_relation_instances)
222                .service(reactive_graph_instance_system_rest::flows::json_schema_flow_instances)
223                // Dynamic Graph JSON Schema
224                // .service(reactive_graph_dynamic_graph_json_schema::schema_)
225                // Web Resource API
226                .service(web::resource("/{web_resource_context_path}/{path:.*}").route(web::get().to(handle_web_resource)))
227                .service(web::resource("/{path:.*}").route(web::get().to(handle_root_web_resource)))
228        })
229        .disable_signals()
230        .shutdown_timeout(graphql_server_config.shutdown_timeout())
231        .workers(graphql_server_config.workers());
232
233        let r_http_server = if graphql_server_config.is_secure() {
234            let cert_file = File::open(graphql_server_config.ssl_certificate_path())?;
235            let cert_file = &mut BufReader::new(cert_file);
236            let cert_chain: Vec<CertificateDer> = certs(cert_file).filter_map(|cert| cert.ok()).collect();
237            if cert_chain.is_empty() {
238                error!("The certificate chain is empty.");
239                return Err(GraphQLServerError::EmptyCertificateChain.into());
240            }
241
242            let key_file = File::open(graphql_server_config.ssl_private_key_path())?;
243            let key_file = &mut BufReader::new(key_file);
244            let mut keys: Vec<PrivateKeyDer> = read_all(key_file)
245                .filter_map(|item| match item {
246                    Ok(Item::Pkcs1Key(key)) => Some(key.into()),
247                    Ok(Item::Pkcs8Key(key)) => Some(key.into()),
248                    Ok(Item::Sec1Key(key)) => Some(key.into()),
249                    Ok(_) => {
250                        error!("Could not load private key: The file does not contain a private key in either format PKCS1, PKCS8, SEC1!");
251                        None
252                    }
253                    Err(e) => {
254                        error!("Failed to load private key: {e}");
255                        None
256                    }
257                })
258                .collect();
259            if keys.is_empty() {
260                error!("Could not load private keys.");
261                return Err(GraphQLServerError::NoPrivateKeyFound.into());
262            }
263            if rustls::crypto::aws_lc_rs::default_provider().install_default().is_err() {
264                error!("CryptoProvider Error");
265                return Err(GraphQLServerError::NoPrivateKeyFound.into());
266            }
267            let tls_config = ServerConfig::builder()
268                .with_no_client_auth()
269                .with_single_cert(cert_chain, keys.remove(0))
270                .map_err(GraphQLServerError::RustlsError)?;
271            info!("Starting HTTPS/GraphQL server on {}", graphql_server_config.url());
272            http_server.bind_rustls_0_23(graphql_server_config.addr(), tls_config)?.run()
273        } else {
274            info!("Starting HTTP/GraphQL server on {}", graphql_server_config.url());
275            http_server.bind(graphql_server_config.addr())?.run()
276        };
277        Ok(r_http_server)
278    }
279}
280
281#[async_trait]
282impl Lifecycle for GraphQLServerImpl {
283    async fn init(&self) {}
284
285    async fn post_init(&self) {}
286
287    async fn pre_shutdown(&self) {}
288
289    async fn shutdown(&self) {}
290}