reactive_graph_dynamic_graph_impl/
dynamic_graph_schema_manager_impl.rs

1use std::ops::Deref;
2use std::sync::Arc;
3use std::sync::RwLock;
4use std::sync::atomic::AtomicBool;
5use std::sync::atomic::Ordering;
6use std::time::Instant;
7
8use async_graphql::dynamic::Schema;
9use async_graphql::dynamic::SchemaBuilder;
10use async_graphql::dynamic::SchemaError;
11use async_trait::async_trait;
12use log::debug;
13use log::error;
14use log::info;
15use log::trace;
16use springtime_di::Component;
17use springtime_di::component_alias;
18use uuid::Uuid;
19
20use crate::interface::component::get_interfaces;
21use crate::object::entity::mutation::register_entity_type_mutation_objects;
22use crate::object::entity::query::register_entity_type_query_objects;
23use crate::object::flow::mutation::register_flow_type_mutation_objects;
24use crate::object::flow::query::register_flow_type_query_objects;
25use crate::object::relation::mutation::register_relation_type_mutation_objects;
26use crate::object::relation::query::register_relation_type_query_objects;
27use crate::root::get_mutation;
28use crate::root::get_query;
29use crate::scalar::get_scalars;
30use crate::union::get_unions;
31use reactive_graph_dynamic_graph_api::DynamicGraphSchemaManager;
32use reactive_graph_dynamic_graph_api::SchemaBuilderContext;
33use reactive_graph_graph::PropertyTypeDefinition;
34use reactive_graph_lifecycle::Lifecycle;
35use reactive_graph_reactive_model_api::ReactivePropertyContainer;
36use reactive_graph_reactive_service_api::ReactiveEntityManager;
37use reactive_graph_reactive_service_api::ReactiveFlowManager;
38use reactive_graph_reactive_service_api::ReactiveRelationManager;
39use reactive_graph_runtime_model::EventProperties::EVENT;
40use reactive_graph_type_system_api::ComponentManager;
41use reactive_graph_type_system_api::EntityTypeManager;
42use reactive_graph_type_system_api::FlowTypeManager;
43use reactive_graph_type_system_api::NamespaceManager;
44use reactive_graph_type_system_api::RelationTypeManager;
45use reactive_graph_type_system_api::TypeSystemEventManager;
46use reactive_graph_type_system_api::TypeSystemEventTypes;
47
48static UUID_TYPE_SYSTEM_CHANGED_EVENT: Uuid = Uuid::from_u128(0x6ba7b8109e1511d150b900c04fe530c7);
49
50fn create_dynamic_schema() -> Arc<RwLock<Option<Arc<Schema>>>> {
51    Arc::new(RwLock::new(None))
52}
53
54fn create_dynamic_schema_modified() -> Arc<AtomicBool> {
55    Arc::new(AtomicBool::new(true))
56}
57
58#[derive(Component)]
59pub struct DynamicGraphSchemaManagerImpl {
60    type_system_event_manager: Arc<dyn TypeSystemEventManager + Send + Sync>,
61
62    component_manager: Arc<dyn ComponentManager + Send + Sync>,
63
64    entity_type_manager: Arc<dyn EntityTypeManager + Send + Sync>,
65
66    relation_type_manager: Arc<dyn RelationTypeManager + Send + Sync>,
67
68    flow_type_manager: Arc<dyn FlowTypeManager + Send + Sync>,
69
70    namespace_manager: Arc<dyn NamespaceManager + Send + Sync>,
71
72    reactive_entity_manager: Arc<dyn ReactiveEntityManager + Send + Sync>,
73
74    reactive_relation_manager: Arc<dyn ReactiveRelationManager + Send + Sync>,
75
76    reactive_flow_manager: Arc<dyn ReactiveFlowManager + Send + Sync>,
77
78    #[component(default = "create_dynamic_schema")]
79    dynamic_schema: Arc<RwLock<Option<Arc<Schema>>>>,
80
81    #[component(default = "create_dynamic_schema_modified")]
82    type_system_modified_state: Arc<AtomicBool>,
83}
84
85async fn build_dynamic_schema(context: SchemaBuilderContext, schema: SchemaBuilder) -> Result<Schema, SchemaError> {
86    let mut schema = get_scalars(schema);
87    schema = get_interfaces(schema, &context);
88    schema = get_unions(schema, &context);
89    schema = register_entity_type_query_objects(schema, &context);
90    schema = register_entity_type_mutation_objects(schema, &context);
91    schema = register_relation_type_query_objects(schema, &context);
92    schema = register_relation_type_mutation_objects(schema, &context);
93    schema = register_flow_type_query_objects(schema, &context);
94    schema = register_flow_type_mutation_objects(schema, &context);
95    schema = get_query(schema, &context);
96    schema = get_mutation(schema, &context);
97    schema.finish()
98}
99
100fn build_dynamic_schema_sync(context: SchemaBuilderContext, schema: SchemaBuilder) -> Result<Schema, SchemaError> {
101    let mut schema = get_scalars(schema);
102    schema = get_interfaces(schema, &context);
103    schema = get_unions(schema, &context);
104    schema = register_entity_type_query_objects(schema, &context);
105    schema = register_entity_type_mutation_objects(schema, &context);
106    schema = register_relation_type_query_objects(schema, &context);
107    schema = register_relation_type_mutation_objects(schema, &context);
108    schema = register_flow_type_query_objects(schema, &context);
109    schema = register_flow_type_mutation_objects(schema, &context);
110    schema = get_query(schema, &context);
111    schema = get_mutation(schema, &context);
112    schema.finish()
113}
114
115impl DynamicGraphSchemaManagerImpl {
116    async fn generate_dynamic_schema(&self) {
117        let context = self.get_schema_builder_context();
118        let schema = self.get_schema_builder();
119        let dynamic_schema_lock = self.dynamic_schema.clone();
120        let type_system_modified_state = self.type_system_modified_state.clone();
121        tokio::spawn(async move {
122            debug!("Start generating dynamic schema");
123            let start = Instant::now();
124            match build_dynamic_schema(context, schema).await {
125                Ok(dynamic_schema) => {
126                    let mut guard = dynamic_schema_lock.write().unwrap();
127                    *guard = Some(Arc::new(dynamic_schema));
128                    type_system_modified_state.store(false, Ordering::Relaxed);
129                    let duration = start.elapsed();
130                    debug!("Successfully generated dynamic schema in {duration:?}");
131                }
132                Err(e) => {
133                    error!("Failed to generate dynamic schema: {e}");
134                }
135            }
136        });
137    }
138}
139
140#[async_trait]
141#[component_alias]
142impl DynamicGraphSchemaManager for DynamicGraphSchemaManagerImpl {
143    fn is_type_system_modified(&self) -> bool {
144        self.type_system_modified_state.load(Ordering::Relaxed)
145    }
146
147    fn get_schema_builder_context(&self) -> SchemaBuilderContext {
148        SchemaBuilderContext::new(
149            self.namespace_manager.clone(),
150            self.component_manager.clone(),
151            self.entity_type_manager.clone(),
152            self.relation_type_manager.clone(),
153            self.flow_type_manager.clone(),
154        )
155    }
156
157    fn get_schema_builder(&self) -> SchemaBuilder {
158        Schema::build("Query", Some("Mutation"), None)
159            .data(self.namespace_manager.clone())
160            .data(self.component_manager.clone())
161            .data(self.entity_type_manager.clone())
162            .data(self.relation_type_manager.clone())
163            .data(self.flow_type_manager.clone())
164            .data(self.reactive_entity_manager.clone())
165            .data(self.reactive_relation_manager.clone())
166            .data(self.reactive_flow_manager.clone())
167    }
168
169    async fn create_dynamic_schema(&self) -> Result<Schema, SchemaError> {
170        build_dynamic_schema(self.get_schema_builder_context(), self.get_schema_builder()).await
171    }
172
173    fn create_dynamic_schema_sync(&self) -> Result<Schema, SchemaError> {
174        build_dynamic_schema_sync(self.get_schema_builder_context(), self.get_schema_builder())
175    }
176
177    async fn regenerate_dynamic_schema(&self) -> Result<(), SchemaError> {
178        trace!("Regenerating dynamic schema");
179        match self.create_dynamic_schema().await {
180            Ok(dynamic_schema) => {
181                info!("Successfully regenerated dynamic schema");
182                trace!("{}", dynamic_schema.sdl());
183                let mut guard = self.dynamic_schema.write().unwrap();
184                *guard = Some(Arc::new(dynamic_schema));
185                self.type_system_modified_state.store(false, Ordering::Relaxed);
186                Ok(())
187            }
188            Err(e) => {
189                error!("Failed to regenerate dynamic schema: {e}");
190                Err(e)
191            }
192        }
193    }
194
195    async fn regenerate_dynamic_schema_if_modified(&self) -> Result<(), SchemaError> {
196        if !self.is_type_system_modified() {
197            return Ok(());
198        }
199        trace!("The type system has been modified. Regenerating the dynamic schema");
200        self.regenerate_dynamic_schema().await
201    }
202
203    async fn get_dynamic_schema(&self) -> Result<Arc<Schema>, SchemaError> {
204        self.regenerate_dynamic_schema_if_modified().await?;
205        let guard = self.dynamic_schema.read().unwrap();
206        match guard.deref() {
207            Some(schema) => Ok(schema.clone()),
208            None => {
209                error!("Can't get dynamic schema!");
210                Err(SchemaError("Dynamic schema is empty".to_string()))
211            }
212        }
213    }
214}
215
216#[async_trait]
217impl Lifecycle for DynamicGraphSchemaManagerImpl {
218    async fn init(&self) {}
219
220    async fn post_init(&self) {
221        // Initially generate dynamic schema concurrently
222        self.generate_dynamic_schema().await;
223
224        // Listen on type system
225        if let Some(event_type_system_changed) = self
226            .type_system_event_manager
227            .get_type_system_event_instance(TypeSystemEventTypes::TypeSystemChanged)
228        {
229            let type_system_modified_state = self.type_system_modified_state.clone();
230            event_type_system_changed.observe_with_handle(
231                &EVENT.property_name(),
232                move |v| {
233                    if v.is_boolean() && v.as_bool().unwrap() {
234                        // The type system has changed -> regenerate the dynamic schema
235                        info!("The type system has changed -> regenerate the dynamic schema");
236                        type_system_modified_state.store(true, Ordering::Relaxed);
237                    }
238                },
239                UUID_TYPE_SYSTEM_CHANGED_EVENT.as_u128(),
240            );
241        }
242    }
243
244    async fn pre_shutdown(&self) {
245        if let Some(event_type_system_changed) = self
246            .type_system_event_manager
247            .get_type_system_event_instance(TypeSystemEventTypes::TypeSystemChanged)
248        {
249            event_type_system_changed.remove_observer(&EVENT.property_name(), UUID_TYPE_SYSTEM_CHANGED_EVENT.as_u128());
250        }
251    }
252
253    async fn shutdown(&self) {}
254}