reactive_graph_runtime_web_impl/
graphql_server_impl.rs1use std::fmt::Debug;
2use std::fs::File;
3use std::io::BufReader;
4use std::sync::Arc;
5use std::sync::atomic::AtomicBool;
6use std::sync::atomic::Ordering;
7
8use actix_cors::Cors;
9use actix_web::App;
10use actix_web::HttpServer;
11use actix_web::ResponseError;
12use actix_web::Result;
13use actix_web::dev::Server;
14use actix_web::guard;
15use actix_web::web;
16use actix_web_extras::middleware::Condition;
17use async_trait::async_trait;
18use crossbeam::channel::Receiver;
19use log::debug;
20use log::error;
21use log::info;
22use log::trace;
23use rustls::ServerConfig;
24use rustls::pki_types::CertificateDer;
25use rustls::pki_types::PrivateKeyDer;
26use rustls_pemfile::Item;
27use rustls_pemfile::certs;
28use rustls_pemfile::read_all;
29use springtime_di::Component;
30use springtime_di::component_alias;
31
32use reactive_graph_config_api::ConfigManager;
33use reactive_graph_dynamic_graph_api::DynamicGraphSchemaManager;
34use reactive_graph_dynamic_graph_web::query_dynamic_graph;
35use reactive_graph_graphql_api::GraphQLSchemaManager;
36use reactive_graph_graphql_web::query_graphql;
37use reactive_graph_graphql_web::subscription_websocket;
38use reactive_graph_lifecycle::Lifecycle;
39use reactive_graph_plugin_graphql_api::PluginSchemaManager;
40use reactive_graph_plugin_graphql_web::query_plugin_graphql;
41use reactive_graph_reactive_service_api::ReactiveEntityManager;
42use reactive_graph_reactive_service_api::ReactiveFlowManager;
43use reactive_graph_reactive_service_api::ReactiveRelationManager;
44use reactive_graph_runtime_graphql_api::RuntimeSchemaManager;
45use reactive_graph_runtime_graphql_web::query_runtime_graphql;
46use reactive_graph_runtime_web_api::GraphQLServer;
47use reactive_graph_runtime_web_api::WebResourceManager;
48use reactive_graph_type_system_api::ComponentManager;
49use reactive_graph_type_system_api::EntityTypeManager;
50use reactive_graph_type_system_api::FlowTypeManager;
51use reactive_graph_type_system_api::RelationTypeManager;
52
53use crate::get_logger_middleware;
54use crate::web_resource_manager_handler::handle_root_web_resource;
55use crate::web_resource_manager_handler::handle_web_resource;
56
57#[derive(Debug, thiserror::Error)]
58pub enum GraphQLServerError {
59 #[error("The certificate chain is empty")]
60 EmptyCertificateChain,
61 #[error("No private key was found")]
62 NoPrivateKeyFound,
63 #[error("rustls error: {0}")]
64 RustlsError(#[from] rustls::Error),
65}
66
67impl ResponseError for GraphQLServerError {}
68
69#[derive(Component)]
70pub struct GraphQLServerImpl {
71 web_resource_manager: Arc<dyn WebResourceManager + Send + Sync>,
72
73 config_manager: Arc<dyn ConfigManager + Send + Sync>,
74
75 component_manager: Arc<dyn ComponentManager + Send + Sync>,
76
77 entity_type_manager: Arc<dyn EntityTypeManager + Send + Sync>,
78
79 relation_type_manager: Arc<dyn RelationTypeManager + Send + Sync>,
80
81 flow_type_manager: Arc<dyn FlowTypeManager + Send + Sync>,
82
83 reactive_entity_manager: Arc<dyn ReactiveEntityManager + Send + Sync>,
84
85 reactive_relation_manager: Arc<dyn ReactiveRelationManager + Send + Sync>,
86
87 reactive_flow_manager: Arc<dyn ReactiveFlowManager + Send + Sync>,
88
89 graphql_schema_manager: Arc<dyn GraphQLSchemaManager + Send + Sync>,
90
91 dynamic_graph_schema_manager: Arc<dyn DynamicGraphSchemaManager + Send + Sync>,
92
93 runtime_schema_manager: Arc<dyn RuntimeSchemaManager + Send + Sync>,
94
95 plugin_schema_manager: Arc<dyn PluginSchemaManager + Send + Sync>,
96}
97
98#[async_trait]
99#[component_alias]
100impl GraphQLServer for GraphQLServerImpl {
101 async fn serve(&self, stopper: Receiver<()>) {
102 trace!("Initialize GraphQL server");
103 let server = match self.setup() {
104 Ok(server) => server,
105 Err(e) => {
106 error!("Failed to setup graphql server: {e}!");
107 return;
108 }
109 };
110 let server_handle = server.handle();
111 let t_server_handle = server_handle.clone();
112
113 let terminate = Arc::new(AtomicBool::new(false));
114 let t_terminate = terminate.clone();
115
116 tokio::spawn(async move {
118 trace!("Waiting for shutdown signal");
119 stopper.recv().unwrap_or(());
121 debug!("Received shutdown signal. Stopping GraphQL server thread.");
122
123 trace!("Stopping server gracefully");
125 t_server_handle.stop(true).await;
126 debug!("Successfully stopped GraphQL server thread.");
127
128 t_terminate.store(true, Ordering::Relaxed);
129 debug!("Stopping actix system.");
130 });
131
132 let _ = server.await;
133 trace!("GraphQL server finished");
134 }
135}
136
137impl GraphQLServerImpl {
138 fn setup(&self) -> Result<Server> {
139 let graphql_server_config = self.config_manager.get_graphql_server_config();
140 let graphql_logging_config = graphql_server_config.logging.as_ref().cloned().unwrap_or_default();
141
142 let schema = self.graphql_schema_manager.get_schema();
144 let schema_data = web::Data::new(schema);
145
146 let runtime_schema = self.runtime_schema_manager.get_schema();
148 let runtime_schema_data = web::Data::new(runtime_schema);
149
150 let plugin_schema = self.plugin_schema_manager.get_schema();
152 let plugin_schema_data = web::Data::new(plugin_schema);
153
154 let component_manager = web::Data::new(self.component_manager.clone());
156 let entity_type_manager = web::Data::new(self.entity_type_manager.clone());
157 let relation_type_manager = web::Data::new(self.relation_type_manager.clone());
158 let flow_type_manager = web::Data::new(self.flow_type_manager.clone());
159
160 let entity_instance_manager = web::Data::new(self.reactive_entity_manager.clone());
161 let relation_instance_manager = web::Data::new(self.reactive_relation_manager.clone());
162 let flow_instance_manager = web::Data::new(self.reactive_flow_manager.clone());
163
164 let web_resource_manager = web::Data::new(self.web_resource_manager.clone());
165
166 let dynamic_graph_schema_manager = web::Data::new(self.dynamic_graph_schema_manager.clone());
167
168 let http_server = HttpServer::new(move || {
169 let graphql_logging_config = graphql_logging_config.clone();
170 App::new()
171 .wrap(Cors::permissive())
172 .wrap(Condition::from_option(get_logger_middleware(&graphql_logging_config)))
173 .app_data(component_manager.clone())
175 .app_data(entity_type_manager.clone())
176 .app_data(relation_type_manager.clone())
177 .app_data(flow_type_manager.clone())
178 .app_data(entity_instance_manager.clone())
180 .app_data(relation_instance_manager.clone())
181 .app_data(flow_instance_manager.clone())
182 .app_data(web_resource_manager.clone())
184 .app_data(schema_data.clone())
186 .app_data(runtime_schema_data.clone())
188 .app_data(plugin_schema_data.clone())
190 .app_data(dynamic_graph_schema_manager.clone())
192 .service(query_graphql)
194 .service(
195 web::resource("/graphql")
196 .guard(guard::Get())
197 .guard(guard::Header("upgrade", "websocket"))
198 .to(subscription_websocket),
199 )
200 .service(query_dynamic_graph)
202 .service(query_runtime_graphql)
204 .service(query_plugin_graphql)
205 .service(reactive_graph_type_system_rest::components::get_components)
207 .service(reactive_graph_type_system_rest::components::get_component)
208 .service(reactive_graph_type_system_rest::entities::get_entity_types)
209 .service(reactive_graph_type_system_rest::entities::get_entity_type)
210 .service(reactive_graph_type_system_rest::relations::get_relation_types)
211 .service(reactive_graph_type_system_rest::relations::get_relation_type)
212 .service(reactive_graph_type_system_rest::flows::get_flow_types)
213 .service(reactive_graph_type_system_rest::flows::get_flow_type)
214 .service(reactive_graph_type_system_rest::components::json_schema_components)
216 .service(reactive_graph_type_system_rest::entities::json_schema_entity_types)
217 .service(reactive_graph_type_system_rest::relations::json_schema_relation_types)
218 .service(reactive_graph_type_system_rest::flows::json_schema_flow_types)
219 .service(reactive_graph_instance_system_rest::entities::json_schema_entity_instances)
221 .service(reactive_graph_instance_system_rest::relations::json_schema_relation_instances)
222 .service(reactive_graph_instance_system_rest::flows::json_schema_flow_instances)
223 .service(web::resource("/{web_resource_context_path}/{path:.*}").route(web::get().to(handle_web_resource)))
227 .service(web::resource("/{path:.*}").route(web::get().to(handle_root_web_resource)))
228 })
229 .disable_signals()
230 .shutdown_timeout(graphql_server_config.shutdown_timeout())
231 .workers(graphql_server_config.workers());
232
233 let r_http_server = if graphql_server_config.is_secure() {
234 let cert_file = File::open(graphql_server_config.ssl_certificate_path())?;
235 let cert_file = &mut BufReader::new(cert_file);
236 let cert_chain: Vec<CertificateDer> = certs(cert_file).filter_map(|cert| cert.ok()).collect();
237 if cert_chain.is_empty() {
238 error!("The certificate chain is empty.");
239 return Err(GraphQLServerError::EmptyCertificateChain.into());
240 }
241
242 let key_file = File::open(graphql_server_config.ssl_private_key_path())?;
243 let key_file = &mut BufReader::new(key_file);
244 let mut keys: Vec<PrivateKeyDer> = read_all(key_file)
245 .filter_map(|item| match item {
246 Ok(Item::Pkcs1Key(key)) => Some(key.into()),
247 Ok(Item::Pkcs8Key(key)) => Some(key.into()),
248 Ok(Item::Sec1Key(key)) => Some(key.into()),
249 Ok(_) => {
250 error!("Could not load private key: The file does not contain a private key in either format PKCS1, PKCS8, SEC1!");
251 None
252 }
253 Err(e) => {
254 error!("Failed to load private key: {e}");
255 None
256 }
257 })
258 .collect();
259 if keys.is_empty() {
260 error!("Could not load private keys.");
261 return Err(GraphQLServerError::NoPrivateKeyFound.into());
262 }
263 if rustls::crypto::aws_lc_rs::default_provider().install_default().is_err() {
264 error!("CryptoProvider Error");
265 return Err(GraphQLServerError::NoPrivateKeyFound.into());
266 }
267 let tls_config = ServerConfig::builder()
268 .with_no_client_auth()
269 .with_single_cert(cert_chain, keys.remove(0))
270 .map_err(GraphQLServerError::RustlsError)?;
271 info!("Starting HTTPS/GraphQL server on {}", graphql_server_config.url());
272 http_server.bind_rustls_0_23(graphql_server_config.addr(), tls_config)?.run()
273 } else {
274 info!("Starting HTTP/GraphQL server on {}", graphql_server_config.url());
275 http_server.bind(graphql_server_config.addr())?.run()
276 };
277 Ok(r_http_server)
278 }
279}
280
281#[async_trait]
282impl Lifecycle for GraphQLServerImpl {
283 async fn init(&self) {}
284
285 async fn post_init(&self) {}
286
287 async fn pre_shutdown(&self) {}
288
289 async fn shutdown(&self) {}
290}