reactive_graph_dynamic_graph_impl/
dynamic_graph_schema_manager_impl.rs1use std::ops::Deref;
2use std::sync::Arc;
3use std::sync::RwLock;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::Ordering;
6use std::time::Instant;
7
8use async_graphql::dynamic::Schema;
9use async_graphql::dynamic::SchemaBuilder;
10use async_graphql::dynamic::SchemaError;
11use async_trait::async_trait;
12use log::debug;
13use log::error;
14use log::info;
15use log::trace;
16use springtime_di::Component;
17use springtime_di::component_alias;
18use uuid::Uuid;
19
20use crate::interface::component::get_interfaces;
21use crate::object::entity::mutation::register_entity_type_mutation_objects;
22use crate::object::entity::query::register_entity_type_query_objects;
23use crate::object::flow::mutation::register_flow_type_mutation_objects;
24use crate::object::flow::query::register_flow_type_query_objects;
25use crate::object::relation::mutation::register_relation_type_mutation_objects;
26use crate::object::relation::query::register_relation_type_query_objects;
27use crate::root::get_mutation;
28use crate::root::get_query;
29use crate::scalar::get_scalars;
30use crate::union::get_unions;
31use reactive_graph_dynamic_graph_api::DynamicGraphSchemaManager;
32use reactive_graph_dynamic_graph_api::SchemaBuilderContext;
33use reactive_graph_graph::PropertyTypeDefinition;
34use reactive_graph_lifecycle::Lifecycle;
35use reactive_graph_reactive_model_api::ReactivePropertyContainer;
36use reactive_graph_reactive_service_api::ReactiveEntityManager;
37use reactive_graph_reactive_service_api::ReactiveFlowManager;
38use reactive_graph_reactive_service_api::ReactiveRelationManager;
39use reactive_graph_runtime_model::EventProperties::EVENT;
40use reactive_graph_type_system_api::ComponentManager;
41use reactive_graph_type_system_api::EntityTypeManager;
42use reactive_graph_type_system_api::FlowTypeManager;
43use reactive_graph_type_system_api::NamespaceManager;
44use reactive_graph_type_system_api::RelationTypeManager;
45use reactive_graph_type_system_api::TypeSystemEventManager;
46use reactive_graph_type_system_api::TypeSystemEventTypes;
47
48static UUID_TYPE_SYSTEM_CHANGED_EVENT: Uuid = Uuid::from_u128(0x6ba7b8109e1511d150b900c04fe530c7);
49
50fn create_dynamic_schema() -> Arc<RwLock<Option<Arc<Schema>>>> {
51 Arc::new(RwLock::new(None))
52}
53
54fn create_dynamic_schema_modified() -> Arc<AtomicBool> {
55 Arc::new(AtomicBool::new(true))
56}
57
58#[derive(Component)]
59pub struct DynamicGraphSchemaManagerImpl {
60 type_system_event_manager: Arc<dyn TypeSystemEventManager + Send + Sync>,
61
62 component_manager: Arc<dyn ComponentManager + Send + Sync>,
63
64 entity_type_manager: Arc<dyn EntityTypeManager + Send + Sync>,
65
66 relation_type_manager: Arc<dyn RelationTypeManager + Send + Sync>,
67
68 flow_type_manager: Arc<dyn FlowTypeManager + Send + Sync>,
69
70 namespace_manager: Arc<dyn NamespaceManager + Send + Sync>,
71
72 reactive_entity_manager: Arc<dyn ReactiveEntityManager + Send + Sync>,
73
74 reactive_relation_manager: Arc<dyn ReactiveRelationManager + Send + Sync>,
75
76 reactive_flow_manager: Arc<dyn ReactiveFlowManager + Send + Sync>,
77
78 #[component(default = "create_dynamic_schema")]
79 dynamic_schema: Arc<RwLock<Option<Arc<Schema>>>>,
80
81 #[component(default = "create_dynamic_schema_modified")]
82 type_system_modified_state: Arc<AtomicBool>,
83}
84
85async fn build_dynamic_schema(context: SchemaBuilderContext, schema: SchemaBuilder) -> Result<Schema, SchemaError> {
86 let mut schema = get_scalars(schema);
87 schema = get_interfaces(schema, &context);
88 schema = get_unions(schema, &context);
89 schema = register_entity_type_query_objects(schema, &context);
90 schema = register_entity_type_mutation_objects(schema, &context);
91 schema = register_relation_type_query_objects(schema, &context);
92 schema = register_relation_type_mutation_objects(schema, &context);
93 schema = register_flow_type_query_objects(schema, &context);
94 schema = register_flow_type_mutation_objects(schema, &context);
95 schema = get_query(schema, &context);
96 schema = get_mutation(schema, &context);
97 schema.finish()
98}
99
100fn build_dynamic_schema_sync(context: SchemaBuilderContext, schema: SchemaBuilder) -> Result<Schema, SchemaError> {
101 let mut schema = get_scalars(schema);
102 schema = get_interfaces(schema, &context);
103 schema = get_unions(schema, &context);
104 schema = register_entity_type_query_objects(schema, &context);
105 schema = register_entity_type_mutation_objects(schema, &context);
106 schema = register_relation_type_query_objects(schema, &context);
107 schema = register_relation_type_mutation_objects(schema, &context);
108 schema = register_flow_type_query_objects(schema, &context);
109 schema = register_flow_type_mutation_objects(schema, &context);
110 schema = get_query(schema, &context);
111 schema = get_mutation(schema, &context);
112 schema.finish()
113}
114
115impl DynamicGraphSchemaManagerImpl {
116 async fn generate_dynamic_schema(&self) {
117 let context = self.get_schema_builder_context();
118 let schema = self.get_schema_builder();
119 let dynamic_schema_lock = self.dynamic_schema.clone();
120 let type_system_modified_state = self.type_system_modified_state.clone();
121 tokio::spawn(async move {
122 debug!("Start generating dynamic schema");
123 let start = Instant::now();
124 match build_dynamic_schema(context, schema).await {
125 Ok(dynamic_schema) => {
126 let mut guard = dynamic_schema_lock.write().unwrap();
127 *guard = Some(Arc::new(dynamic_schema));
128 type_system_modified_state.store(false, Ordering::Relaxed);
129 let duration = start.elapsed();
130 debug!("Successfully generated dynamic schema in {duration:?}");
131 }
132 Err(e) => {
133 error!("Failed to generate dynamic schema: {e}");
134 }
135 }
136 });
137 }
138}
139
140#[async_trait]
141#[component_alias]
142impl DynamicGraphSchemaManager for DynamicGraphSchemaManagerImpl {
143 fn is_type_system_modified(&self) -> bool {
144 self.type_system_modified_state.load(Ordering::Relaxed)
145 }
146
147 fn get_schema_builder_context(&self) -> SchemaBuilderContext {
148 SchemaBuilderContext::new(
149 self.namespace_manager.clone(),
150 self.component_manager.clone(),
151 self.entity_type_manager.clone(),
152 self.relation_type_manager.clone(),
153 self.flow_type_manager.clone(),
154 )
155 }
156
157 fn get_schema_builder(&self) -> SchemaBuilder {
158 Schema::build("Query", Some("Mutation"), None)
159 .data(self.namespace_manager.clone())
160 .data(self.component_manager.clone())
161 .data(self.entity_type_manager.clone())
162 .data(self.relation_type_manager.clone())
163 .data(self.flow_type_manager.clone())
164 .data(self.reactive_entity_manager.clone())
165 .data(self.reactive_relation_manager.clone())
166 .data(self.reactive_flow_manager.clone())
167 }
168
169 async fn create_dynamic_schema(&self) -> Result<Schema, SchemaError> {
170 build_dynamic_schema(self.get_schema_builder_context(), self.get_schema_builder()).await
171 }
172
173 fn create_dynamic_schema_sync(&self) -> Result<Schema, SchemaError> {
174 build_dynamic_schema_sync(self.get_schema_builder_context(), self.get_schema_builder())
175 }
176
177 async fn regenerate_dynamic_schema(&self) -> Result<(), SchemaError> {
178 trace!("Regenerating dynamic schema");
179 match self.create_dynamic_schema().await {
180 Ok(dynamic_schema) => {
181 info!("Successfully regenerated dynamic schema");
182 trace!("{}", dynamic_schema.sdl());
183 let mut guard = self.dynamic_schema.write().unwrap();
184 *guard = Some(Arc::new(dynamic_schema));
185 self.type_system_modified_state.store(false, Ordering::Relaxed);
186 Ok(())
187 }
188 Err(e) => {
189 error!("Failed to regenerate dynamic schema: {e}");
190 Err(e)
191 }
192 }
193 }
194
195 async fn regenerate_dynamic_schema_if_modified(&self) -> Result<(), SchemaError> {
196 if !self.is_type_system_modified() {
197 return Ok(());
198 }
199 trace!("The type system has been modified. Regenerating the dynamic schema");
200 self.regenerate_dynamic_schema().await
201 }
202
203 async fn get_dynamic_schema(&self) -> Result<Arc<Schema>, SchemaError> {
204 self.regenerate_dynamic_schema_if_modified().await?;
205 let guard = self.dynamic_schema.read().unwrap();
206 match guard.deref() {
207 Some(schema) => Ok(schema.clone()),
208 None => {
209 error!("Can't get dynamic schema!");
210 Err(SchemaError("Dynamic schema is empty".to_string()))
211 }
212 }
213 }
214}
215
216#[async_trait]
217impl Lifecycle for DynamicGraphSchemaManagerImpl {
218 async fn init(&self) {}
219
220 async fn post_init(&self) {
221 self.generate_dynamic_schema().await;
223
224 if let Some(event_type_system_changed) = self
226 .type_system_event_manager
227 .get_type_system_event_instance(TypeSystemEventTypes::TypeSystemChanged)
228 {
229 let type_system_modified_state = self.type_system_modified_state.clone();
230 event_type_system_changed.observe_with_handle(
231 &EVENT.property_name(),
232 move |v| {
233 if v.is_boolean() && v.as_bool().unwrap() {
234 info!("The type system has changed -> regenerate the dynamic schema");
236 type_system_modified_state.store(true, Ordering::Relaxed);
237 }
238 },
239 UUID_TYPE_SYSTEM_CHANGED_EVENT.as_u128(),
240 );
241 }
242 }
243
244 async fn pre_shutdown(&self) {
245 if let Some(event_type_system_changed) = self
246 .type_system_event_manager
247 .get_type_system_event_instance(TypeSystemEventTypes::TypeSystemChanged)
248 {
249 event_type_system_changed.remove_observer(&EVENT.property_name(), UUID_TYPE_SYSTEM_CHANGED_EVENT.as_u128());
250 }
251 }
252
253 async fn shutdown(&self) {}
254}