reactive_graph_reactive_service_impl/
reactive_flow_manager_impl.rs1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::RwLock;
4
5use async_trait::async_trait;
6use dashmap::DashMap;
7use log::debug;
8use log::error;
9use log::trace;
10use path_tree::PathTree;
11use springtime_di::Component;
12use springtime_di::component_alias;
13use uuid::Uuid;
14
15use reactive_graph_graph::EntityInstance;
16use reactive_graph_graph::EntityInstances;
17use reactive_graph_graph::ExtensionContainer;
18use reactive_graph_graph::FlowInstance;
19use reactive_graph_graph::FlowTypeId;
20use reactive_graph_graph::Mutability::Mutable;
21use reactive_graph_graph::MutablePropertyInstanceSetter;
22use reactive_graph_graph::NamespacedTypeGetter;
23use reactive_graph_graph::PropertyInstanceGetter;
24use reactive_graph_graph::PropertyInstanceSetter;
25use reactive_graph_graph::PropertyInstances;
26use reactive_graph_graph::PropertyTypeDefinition;
27use reactive_graph_graph::RelationInstance;
28use reactive_graph_graph::RelationInstanceId;
29use reactive_graph_graph::RelationInstances;
30use reactive_graph_graph::TypeDefinitionGetter;
31use reactive_graph_lifecycle::Lifecycle;
32use reactive_graph_model_flow::EXTENSION_FLOW_RESOLVE_EXISTING_INSTANCE;
33use reactive_graph_model_flow::EXTENSION_FLOW_UUID_TYPE_EXTENSION;
34use reactive_graph_model_flow::EXTENSION_FLOW_UUID_TYPE_VARIABLE;
35use reactive_graph_reactive_model_api::ReactiveInstance;
36use reactive_graph_reactive_model_api::ReactivePropertyContainer;
37use reactive_graph_reactive_model_impl::ReactiveEntity;
38use reactive_graph_reactive_model_impl::ReactiveFlow;
39use reactive_graph_reactive_model_impl::ReactiveRelation;
40use reactive_graph_reactive_service_api::FlowInstanceProvider;
41use reactive_graph_reactive_service_api::ReactiveEntityManager;
42use reactive_graph_reactive_service_api::ReactiveFlowCreationError;
43use reactive_graph_reactive_service_api::ReactiveFlowManager;
44use reactive_graph_reactive_service_api::ReactiveInstanceEvent;
45use reactive_graph_reactive_service_api::ReactiveInstanceEventManager;
46use reactive_graph_reactive_service_api::ReactiveRelationManager;
47use reactive_graph_runtime_model::LabeledProperties::LABEL;
48use reactive_graph_type_system_api::ComponentManager;
49use reactive_graph_type_system_api::EntityTypeManager;
50use reactive_graph_type_system_api::FlowTypeManager;
51use reactive_graph_type_system_api::RelationTypeManager;
52
53fn create_label_path_tree() -> RwLock<PathTree<Uuid>> {
54 RwLock::new(PathTree::<Uuid>::new())
55}
56
57#[derive(Component)]
58pub struct ReactiveFlowManagerImpl {
59 reactive_instance_event_manager: Arc<dyn ReactiveInstanceEventManager + Send + Sync>,
60
61 component_manager: Arc<dyn ComponentManager + Send + Sync>,
63
64 entity_type_manager: Arc<dyn EntityTypeManager + Send + Sync>,
65
66 relation_type_manager: Arc<dyn RelationTypeManager + Send + Sync>,
67
68 flow_type_manager: Arc<dyn FlowTypeManager + Send + Sync>,
69
70 reactive_entity_manager: Arc<dyn ReactiveEntityManager + Send + Sync>,
72
73 reactive_relation_manager: Arc<dyn ReactiveRelationManager + Send + Sync>,
74
75 #[component(default = "DashMap::new")]
76 reactive_flow_instances: DashMap<Uuid, ReactiveFlow>, #[component(default = "DashMap::new")]
80 flow_instance_providers: DashMap<Uuid, Arc<dyn FlowInstanceProvider>>, #[component(default = "create_label_path_tree")]
83 label_path_tree: RwLock<PathTree<Uuid>>, }
85
86impl ReactiveFlowManagerImpl {
87 fn get_entity_instance_id_by_extension(&self, entity_instance: &EntityInstance, variables: &PropertyInstances) -> Uuid {
88 if entity_instance.has_own_extension(&EXTENSION_FLOW_RESOLVE_EXISTING_INSTANCE.clone()) {
90 return entity_instance.id;
91 }
92 if let Some(id) = entity_instance
94 .get_own_extension(&EXTENSION_FLOW_UUID_TYPE_VARIABLE.clone())
95 .and_then(|extension| extension.extension.as_str().map(|s| s.to_string()))
96 .and_then(|variable_name| variables.get(&variable_name))
97 .and_then(|variable_value| variable_value.as_str().map(|s| s.to_string()))
98 .and_then(|variable_value| Uuid::parse_str(&variable_value).ok())
99 {
100 return id;
101 }
102 if let Some(id) = entity_instance
104 .get_own_extension(&EXTENSION_FLOW_UUID_TYPE_EXTENSION.clone())
105 .and_then(|extension| extension.extension.as_str().map(|s| s.to_string()))
106 .and_then(|extension_value| Uuid::parse_str(extension_value.as_str()).ok())
107 {
108 return id;
109 }
110 Uuid::new_v4()
112 }
113}
114
115#[async_trait]
116#[component_alias]
117impl ReactiveFlowManager for ReactiveFlowManagerImpl {
118 fn has(&self, id: Uuid) -> bool {
119 self.reactive_flow_instances.contains_key(&id)
120 }
121
122 fn get(&self, id: Uuid) -> Option<ReactiveFlow> {
123 self.reactive_flow_instances.get(&id).map(|reactive_flow| reactive_flow.value().clone())
124 }
125
126 fn get_by_label(&self, label: &str) -> Option<ReactiveFlow> {
127 let reader = self.label_path_tree.read().unwrap();
128 reader.find(label).and_then(|(id, _path)| self.get(*id))
129 }
130
131 fn get_all(&self) -> Vec<ReactiveFlow> {
132 self.reactive_flow_instances.iter().map(|reactive_flow| reactive_flow.value().clone()).collect()
133 }
134
135 fn get_by_type(&self, ty: &FlowTypeId) -> Vec<ReactiveFlow> {
137 let Some(flow_type) = self.flow_type_manager.get(ty) else {
138 return Vec::new();
139 };
140 let entity_ty = flow_type.wrapper_type();
141 self.reactive_flow_instances
142 .iter()
143 .filter(|reactive_flow| reactive_flow.ty == entity_ty)
144 .map(|reactive_flow| reactive_flow.value().clone())
145 .collect()
146 }
147
148 fn count_flow_instances(&self) -> usize {
149 self.reactive_flow_instances.len()
150 }
151
152 fn create_reactive_flow(&self, flow_instance: FlowInstance) -> Result<ReactiveFlow, ReactiveFlowCreationError> {
153 let reactive_flow_instance = ReactiveFlow::try_from(flow_instance);
154 if reactive_flow_instance.is_err() {
155 return Err(ReactiveFlowCreationError::ReactiveFlowConstructionError(reactive_flow_instance.err().unwrap()));
156 }
157 let reactive_flow_instance = reactive_flow_instance.unwrap();
158 self.register_flow_instance_and_reactive_instances(reactive_flow_instance.clone());
159 Ok(reactive_flow_instance)
160 }
161
162 fn create_from_type(
163 &self,
164 ty: &FlowTypeId,
165 id: Option<Uuid>,
166 variables: PropertyInstances,
167 properties: PropertyInstances,
168 ) -> Result<ReactiveFlow, ReactiveFlowCreationError> {
169 let flow_type = self
170 .flow_type_manager
171 .get(ty)
172 .ok_or_else(|| ReactiveFlowCreationError::FlowTypeDoesntExist(ty.clone()))?;
173 for variable in flow_type.variables.iter() {
174 if !variables.contains_key(variable.name.as_str()) {
175 return Err(ReactiveFlowCreationError::MissingVariable(variable.name.clone()));
176 }
177 }
178 let wrapper_entity_type = self
179 .entity_type_manager
180 .get(&flow_type.wrapper_entity_instance.ty)
181 .ok_or_else(|| ReactiveFlowCreationError::EntityTypeDoesntExist(flow_type.wrapper_entity_instance.ty.clone()))?;
182 let mut wrapper_entity_instance = flow_type.wrapper_entity_instance.clone();
183 let wrapper_entity_instance_id = id.unwrap_or_else(|| self.get_entity_instance_id_by_extension(&wrapper_entity_instance, &variables));
184 let mut entity_instance_id_mapping: HashMap<Uuid, Uuid> = HashMap::new();
185 entity_instance_id_mapping.insert(wrapper_entity_instance.id, wrapper_entity_instance_id);
186 wrapper_entity_instance.id = wrapper_entity_instance_id;
187
188 for property in wrapper_entity_type.properties.iter() {
190 trace!("Adding property {} from entity type {}", &property.name, &wrapper_entity_type.type_definition().to_string());
191 if !wrapper_entity_instance.properties.contains_key(&property.name) {
192 wrapper_entity_instance
193 .properties
194 .insert(property.name.clone(), property.data_type.default_value());
195 }
196 }
197
198 for component_ty in wrapper_entity_type.components.iter() {
200 if let Some(component) = self.component_manager.get(&component_ty) {
201 for property in component.properties.iter() {
202 trace!("Adding property {} from component {}", &property.name, &component_ty.type_definition().to_string());
203 if !wrapper_entity_instance.properties.contains_key(&property.name) {
204 wrapper_entity_instance
208 .properties
209 .insert(property.name.clone(), property.data_type.default_value());
210 }
211 }
212 }
213 }
214
215 for property in properties.iter() {
216 let property_name = property.key();
217 let value = property.value();
218 trace!("Setting property {property_name} with value {value} from parameter");
219 wrapper_entity_instance.set(property_name, value.clone());
220 }
221
222 let entity_instances = EntityInstances::new_with_instance(wrapper_entity_instance);
223 for entity_instance in flow_type.entity_instances.iter() {
224 let entity_type = self
225 .entity_type_manager
226 .get(&entity_instance.ty)
227 .ok_or_else(|| ReactiveFlowCreationError::EntityTypeDoesntExist(entity_instance.ty.clone()))?;
228 let entity_instance_id = self.get_entity_instance_id_by_extension(&entity_instance, &variables);
229 entity_instance_id_mapping.insert(entity_instance.id, entity_instance_id);
230 let mut entity_instance_copy = entity_instance.clone();
231 entity_instance_copy.id = entity_instance_id;
232
233 for property in entity_type.properties.iter() {
235 trace!("Adding property {} from entity type {}", &property.name, &entity_type.type_definition().to_string());
236 if !entity_instance_copy.properties.contains_key(&property.name) {
237 entity_instance_copy
241 .properties
242 .insert(property.name.clone(), property.data_type.default_value());
243 }
244 }
245
246 for component_ty in entity_type.components.iter() {
248 if let Some(component) = self.component_manager.get(&component_ty) {
249 for property in component.properties.iter() {
250 trace!("Adding property {} from component {}", &property.name, component_ty.type_definition());
251 if !entity_instance_copy.properties.contains_key(&property.name) {
252 entity_instance_copy
253 .properties
254 .insert(property.name.clone(), property.data_type.default_value());
255 }
256 }
257 }
258 }
259
260 entity_instances.push(entity_instance_copy);
264 }
265 for (uf, ut) in entity_instance_id_mapping.iter() {
266 trace!("Mapping flow type entity instance id {uf} to actual entity instance id {ut}");
267 }
268
269 let relation_instances = RelationInstances::new();
270
271 for relation_instance in flow_type.relation_instances.iter() {
272 let relation_ty = relation_instance.relation_type_id();
273 trace!("Relation instance type: {}", &relation_instance.ty);
274 let relation_type = self
275 .relation_type_manager
276 .get(&relation_ty)
277 .ok_or(ReactiveFlowCreationError::RelationTypeDoesntExist(relation_ty))?;
278 let mut relation_instance_copy = relation_instance.clone();
279 match entity_instance_id_mapping.get(&relation_instance.outbound_id) {
280 Some(replaced_id) => relation_instance_copy.outbound_id = *replaced_id,
281 None => return Err(ReactiveFlowCreationError::InvalidOutboundId(relation_instance.outbound_id)),
282 }
283 match entity_instance_id_mapping.get(&relation_instance.inbound_id) {
284 Some(replaced_id) => relation_instance_copy.inbound_id = *replaced_id,
285 None => return Err(ReactiveFlowCreationError::InvalidInboundId(relation_instance.inbound_id)),
286 }
287
288 for property in relation_type.properties.iter() {
290 if !relation_instance_copy.properties.contains_key(&property.name) {
291 relation_instance_copy
295 .properties
296 .insert(property.name.clone(), property.data_type.default_value());
297 }
298 }
299
300 for component_ty in relation_type.components.iter() {
302 if let Some(component) = self.component_manager.get(&component_ty) {
303 for property in component.properties.iter() {
304 if !relation_instance_copy.properties.contains_key(&property.name) {
305 relation_instance_copy
309 .properties
310 .insert(property.name.clone(), property.data_type.default_value());
311 }
312 }
313 }
314 }
315
316 relation_instances.push(relation_instance_copy);
320 }
321
322 let flow_instance = FlowInstance::builder()
324 .ty(flow_type.wrapper_type())
325 .id(wrapper_entity_instance_id)
326 .name(flow_type.type_name()) .description(flow_type.description.clone())
328 .entity_instances(entity_instances)
329 .relation_instances(relation_instances)
330 .build();
331
332 trace!("{flow_instance:?}");
334 match ReactiveFlow::try_from(flow_instance) {
335 Ok(reactive_flow_instance) => {
336 self.register_flow_instance_and_reactive_instances(reactive_flow_instance.clone());
337
338 if let Some(wrapper_entity_instance) = reactive_flow_instance.get_wrapper_entity_instance() {
340 for property in properties.iter() {
341 let property_name = property.key();
342 let property_value = property.value();
343 if !wrapper_entity_instance.has_property(property_name) {
344 trace!("Adding parameter property {property_name} with value {property_value} from parameter");
345 wrapper_entity_instance.add_property(property_name, Mutable, property_value.clone());
346 } else {
347 trace!("Set parameter property {property_name} with value {property_value} from parameter");
348 wrapper_entity_instance.set(property_name, property_value.clone());
349 }
350 }
351 }
352
353 Ok(reactive_flow_instance)
354 }
355 Err(e) => Err(ReactiveFlowCreationError::ReactiveFlowConstructionError(e)),
356 }
357 }
358
359 fn register_flow_instance_and_reactive_instances(&self, reactive_flow_instance: ReactiveFlow) {
360 if !self.has(reactive_flow_instance.id) {
361 {
362 let mut entity_instances = reactive_flow_instance.entity_instances.write().unwrap();
364 let mut replaced_entity_instances = HashMap::<Uuid, ReactiveEntity>::new();
365 for (uuid, entity_instance) in entity_instances.iter() {
366 match self.reactive_entity_manager.register_or_merge_reactive_instance(entity_instance.clone()) {
370 Ok(entity_instance) => {
371 replaced_entity_instances.insert(*uuid, entity_instance);
373 }
374 Err(e) => {
375 debug!("Failed to register entity instance {uuid}: {e:?}");
377 }
378 }
379 }
385
386 entity_instances.clear();
388 for (uuid, entity_instance) in replaced_entity_instances.iter() {
389 entity_instances.insert(*uuid, entity_instance.clone());
390 }
391
392 let mut relation_instances = reactive_flow_instance.relation_instances.write().unwrap();
395 let mut replaced_relation_instances = HashMap::<RelationInstanceId, ReactiveRelation>::new();
396 for (relation_instance_id, relation_instance) in relation_instances.iter() {
397 let inbound_id = relation_instance.inbound.id;
398 let outbound_id = relation_instance.outbound.id;
399
400 let recreated_relation_instance = ReactiveRelation::new_from_instance(
401 entity_instances.get(&outbound_id).unwrap().clone(),
402 entity_instances.get(&inbound_id).unwrap().clone(),
403 RelationInstance::from(relation_instance.clone()),
404 );
405 replaced_relation_instances.insert(relation_instance_id.clone(), recreated_relation_instance);
406 }
409
410 relation_instances.clear();
412 for (relation_instance_id, relation_instance) in replaced_relation_instances.iter() {
413 relation_instances.insert(relation_instance_id.clone(), relation_instance.clone());
414 }
415
416 let mut replaced_relation_instances = HashMap::<RelationInstanceId, ReactiveRelation>::new();
418 for (relation_instance_id, relation_instance) in relation_instances.iter() {
419 match self.reactive_relation_manager.register_or_merge_reactive_instance(relation_instance.clone()) {
420 Ok(relation_instance) => {
421 replaced_relation_instances.insert(relation_instance_id.clone(), relation_instance);
423 }
424 Err(e) => {
425 debug!("Failed to register relation instance {relation_instance_id:?}: {e:?}");
427 }
428 }
429 }
430
431 relation_instances.clear();
433 for (relation_instance_id, relation_instance) in replaced_relation_instances.iter() {
434 relation_instances.insert(relation_instance_id.clone(), relation_instance.clone());
435 }
436 } self.register_flow_instance(reactive_flow_instance);
438 }
439 }
440
441 fn register_flow_instance(&self, reactive_flow_instance: ReactiveFlow) {
442 if !self.reactive_entity_manager.has(reactive_flow_instance.id) {
443 if let Some(wrapper_entity_instance) = reactive_flow_instance.get_entity(reactive_flow_instance.id) {
444 if let Err(e) = self.reactive_entity_manager.register_reactive_instance(wrapper_entity_instance) {
445 error!("Failed to register wrapper entity instance of flow {}: {:?}", reactive_flow_instance.id, e);
446 }
447 }
448 }
449 self.reactive_flow_instances.insert(reactive_flow_instance.id, reactive_flow_instance.clone());
450 if let Some(value) = reactive_flow_instance.get(LABEL.property_name()) {
456 if let Some(label) = value.as_str() {
457 let mut writer = self.label_path_tree.write().unwrap();
458 let _ = writer.insert(label, reactive_flow_instance.id);
459 }
460 }
461 self.reactive_instance_event_manager
462 .emit_event(ReactiveInstanceEvent::FlowInstanceCreated(reactive_flow_instance.id))
463 }
464
465 fn delete(&self, id: Uuid) -> bool {
521 let Some(reactive_flow_instance) = self.get(id) else {
522 return false;
523 };
524 for (_, entity_instance) in reactive_flow_instance.entity_instances.read().unwrap().iter() {
525 self.reactive_entity_manager.unregister_reactive_instance(entity_instance.id);
526 }
527 for (_, relation_instance) in reactive_flow_instance.relation_instances.read().unwrap().iter() {
528 self.reactive_relation_manager.unregister_reactive_instance(&relation_instance.id());
529 }
530 let result = self.reactive_flow_instances.remove(&id).is_some();
531 self.reactive_instance_event_manager.emit_event(ReactiveInstanceEvent::FlowInstanceDeleted(id));
534 result
535 }
536
537 fn register_provider(&self, id: Uuid, provider: Arc<dyn FlowInstanceProvider>) {
556 self.flow_instance_providers.insert(id, provider);
557 }
558
559 fn unregister_provider(&self, id: &Uuid) {
560 self.flow_instance_providers.remove(id);
561 }
562}
563
564#[async_trait]
565impl Lifecycle for ReactiveFlowManagerImpl {
566 async fn init(&self) {
567 debug!("Importing provided flow instances");
568 for flow_instance_provider in self.flow_instance_providers.iter() {
569 for flow_instance in flow_instance_provider.get_flow_instances() {
570 debug!("Creating provided flow instance {}", flow_instance.id);
571 let reactive_flow_instance = self.create_reactive_flow(flow_instance.clone());
572 match reactive_flow_instance {
573 Ok(reactive_flow_instance) => {
574 let created_flow_instance: Result<FlowInstance, _> = reactive_flow_instance.try_into();
575 match created_flow_instance {
576 Ok(created_flow_instance) => {
577 let json = serde_json::to_string_pretty(&created_flow_instance).unwrap();
578 debug!("Successfully created reactive flow instance:\r\n{json}");
579 }
580 Err(err) => {
581 debug!("Successfully created reactive flow instance {}, but failed to serialize: {:?}", flow_instance.id, err);
582 }
583 }
584 }
585 Err(err) => {
586 error!("Failed to create provided flow instance {}: {}", flow_instance.id, err);
587 }
588 }
589 }
590 }
591 }
592
593 async fn shutdown(&self) {
594 }
597}