reactive_graph_reactive_service_impl/
reactive_flow_manager_impl.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::sync::RwLock;
4
5use async_trait::async_trait;
6use dashmap::DashMap;
7use log::debug;
8use log::error;
9use log::trace;
10use path_tree::PathTree;
11use springtime_di::Component;
12use springtime_di::component_alias;
13use uuid::Uuid;
14
15use reactive_graph_graph::EntityInstance;
16use reactive_graph_graph::EntityInstances;
17use reactive_graph_graph::ExtensionContainer;
18use reactive_graph_graph::FlowInstance;
19use reactive_graph_graph::FlowTypeId;
20use reactive_graph_graph::Mutability::Mutable;
21use reactive_graph_graph::MutablePropertyInstanceSetter;
22use reactive_graph_graph::NamespacedTypeGetter;
23use reactive_graph_graph::PropertyInstanceGetter;
24use reactive_graph_graph::PropertyInstanceSetter;
25use reactive_graph_graph::PropertyInstances;
26use reactive_graph_graph::PropertyTypeDefinition;
27use reactive_graph_graph::RelationInstance;
28use reactive_graph_graph::RelationInstanceId;
29use reactive_graph_graph::RelationInstances;
30use reactive_graph_graph::TypeDefinitionGetter;
31use reactive_graph_lifecycle::Lifecycle;
32use reactive_graph_model_flow::EXTENSION_FLOW_RESOLVE_EXISTING_INSTANCE;
33use reactive_graph_model_flow::EXTENSION_FLOW_UUID_TYPE_EXTENSION;
34use reactive_graph_model_flow::EXTENSION_FLOW_UUID_TYPE_VARIABLE;
35use reactive_graph_reactive_model_api::ReactiveInstance;
36use reactive_graph_reactive_model_api::ReactivePropertyContainer;
37use reactive_graph_reactive_model_impl::ReactiveEntity;
38use reactive_graph_reactive_model_impl::ReactiveFlow;
39use reactive_graph_reactive_model_impl::ReactiveRelation;
40use reactive_graph_reactive_service_api::FlowInstanceProvider;
41use reactive_graph_reactive_service_api::ReactiveEntityManager;
42use reactive_graph_reactive_service_api::ReactiveFlowCreationError;
43use reactive_graph_reactive_service_api::ReactiveFlowManager;
44use reactive_graph_reactive_service_api::ReactiveInstanceEvent;
45use reactive_graph_reactive_service_api::ReactiveInstanceEventManager;
46use reactive_graph_reactive_service_api::ReactiveRelationManager;
47use reactive_graph_runtime_model::LabeledProperties::LABEL;
48use reactive_graph_type_system_api::ComponentManager;
49use reactive_graph_type_system_api::EntityTypeManager;
50use reactive_graph_type_system_api::FlowTypeManager;
51use reactive_graph_type_system_api::RelationTypeManager;
52
53fn create_label_path_tree() -> RwLock<PathTree<Uuid>> {
54    RwLock::new(PathTree::<Uuid>::new())
55}
56
57#[derive(Component)]
58pub struct ReactiveFlowManagerImpl {
59    reactive_instance_event_manager: Arc<dyn ReactiveInstanceEventManager + Send + Sync>,
60
61    // type_system_event_manager: Arc<dyn TypeSystemEventManager + Send + Sync>,
62    component_manager: Arc<dyn ComponentManager + Send + Sync>,
63
64    entity_type_manager: Arc<dyn EntityTypeManager + Send + Sync>,
65
66    relation_type_manager: Arc<dyn RelationTypeManager + Send + Sync>,
67
68    flow_type_manager: Arc<dyn FlowTypeManager + Send + Sync>,
69
70    // flow_instance_manager: Arc<dyn FlowInstanceManager>,
71    reactive_entity_manager: Arc<dyn ReactiveEntityManager + Send + Sync>,
72
73    reactive_relation_manager: Arc<dyn ReactiveRelationManager + Send + Sync>,
74
75    #[component(default = "DashMap::new")]
76    reactive_flow_instances: DashMap<Uuid, ReactiveFlow>, // ReactiveFlowsStorage,
77
78    // TODO: move into own service
79    #[component(default = "DashMap::new")]
80    flow_instance_providers: DashMap<Uuid, Arc<dyn FlowInstanceProvider>>, // FlowInstanceProviders,
81
82    #[component(default = "create_label_path_tree")]
83    label_path_tree: RwLock<PathTree<Uuid>>, // LabelPathTree,
84}
85
86impl ReactiveFlowManagerImpl {
87    fn get_entity_instance_id_by_extension(&self, entity_instance: &EntityInstance, variables: &PropertyInstances) -> Uuid {
88        // Resolve an existing entity instance: Do not replace the uuid
89        if entity_instance.has_own_extension(&EXTENSION_FLOW_RESOLVE_EXISTING_INSTANCE.clone()) {
90            return entity_instance.id;
91        }
92        // Parse the UUID from the variable with the name specified by the extension value.
93        if let Some(id) = entity_instance
94            .get_own_extension(&EXTENSION_FLOW_UUID_TYPE_VARIABLE.clone())
95            .and_then(|extension| extension.extension.as_str().map(|s| s.to_string()))
96            .and_then(|variable_name| variables.get(&variable_name))
97            .and_then(|variable_value| variable_value.as_str().map(|s| s.to_string()))
98            .and_then(|variable_value| Uuid::parse_str(&variable_value).ok())
99        {
100            return id;
101        }
102        // Parse the UUID from the extension value.
103        if let Some(id) = entity_instance
104            .get_own_extension(&EXTENSION_FLOW_UUID_TYPE_EXTENSION.clone())
105            .and_then(|extension| extension.extension.as_str().map(|s| s.to_string()))
106            .and_then(|extension_value| Uuid::parse_str(extension_value.as_str()).ok())
107        {
108            return id;
109        }
110        // Default: Generate a random UUID
111        Uuid::new_v4()
112    }
113}
114
115#[async_trait]
116#[component_alias]
117impl ReactiveFlowManager for ReactiveFlowManagerImpl {
118    fn has(&self, id: Uuid) -> bool {
119        self.reactive_flow_instances.contains_key(&id)
120    }
121
122    fn get(&self, id: Uuid) -> Option<ReactiveFlow> {
123        self.reactive_flow_instances.get(&id).map(|reactive_flow| reactive_flow.value().clone())
124    }
125
126    fn get_by_label(&self, label: &str) -> Option<ReactiveFlow> {
127        let reader = self.label_path_tree.read().unwrap();
128        reader.find(label).and_then(|(id, _path)| self.get(*id))
129    }
130
131    fn get_all(&self) -> Vec<ReactiveFlow> {
132        self.reactive_flow_instances.iter().map(|reactive_flow| reactive_flow.value().clone()).collect()
133    }
134
135    // TODO: Assuming, that an EntityType is used in only one FlowType!
136    fn get_by_type(&self, ty: &FlowTypeId) -> Vec<ReactiveFlow> {
137        let Some(flow_type) = self.flow_type_manager.get(ty) else {
138            return Vec::new();
139        };
140        let entity_ty = flow_type.wrapper_type();
141        self.reactive_flow_instances
142            .iter()
143            .filter(|reactive_flow| reactive_flow.ty == entity_ty)
144            .map(|reactive_flow| reactive_flow.value().clone())
145            .collect()
146    }
147
148    fn count_flow_instances(&self) -> usize {
149        self.reactive_flow_instances.len()
150    }
151
152    fn create_reactive_flow(&self, flow_instance: FlowInstance) -> Result<ReactiveFlow, ReactiveFlowCreationError> {
153        let reactive_flow_instance = ReactiveFlow::try_from(flow_instance);
154        if reactive_flow_instance.is_err() {
155            return Err(ReactiveFlowCreationError::ReactiveFlowConstructionError(reactive_flow_instance.err().unwrap()));
156        }
157        let reactive_flow_instance = reactive_flow_instance.unwrap();
158        self.register_flow_instance_and_reactive_instances(reactive_flow_instance.clone());
159        Ok(reactive_flow_instance)
160    }
161
162    fn create_from_type(
163        &self,
164        ty: &FlowTypeId,
165        id: Option<Uuid>,
166        variables: PropertyInstances,
167        properties: PropertyInstances,
168    ) -> Result<ReactiveFlow, ReactiveFlowCreationError> {
169        let flow_type = self
170            .flow_type_manager
171            .get(ty)
172            .ok_or_else(|| ReactiveFlowCreationError::FlowTypeDoesntExist(ty.clone()))?;
173        for variable in flow_type.variables.iter() {
174            if !variables.contains_key(variable.name.as_str()) {
175                return Err(ReactiveFlowCreationError::MissingVariable(variable.name.clone()));
176            }
177        }
178        let wrapper_entity_type = self
179            .entity_type_manager
180            .get(&flow_type.wrapper_entity_instance.ty)
181            .ok_or_else(|| ReactiveFlowCreationError::EntityTypeDoesntExist(flow_type.wrapper_entity_instance.ty.clone()))?;
182        let mut wrapper_entity_instance = flow_type.wrapper_entity_instance.clone();
183        let wrapper_entity_instance_id = id.unwrap_or_else(|| self.get_entity_instance_id_by_extension(&wrapper_entity_instance, &variables));
184        let mut entity_instance_id_mapping: HashMap<Uuid, Uuid> = HashMap::new();
185        entity_instance_id_mapping.insert(wrapper_entity_instance.id, wrapper_entity_instance_id);
186        wrapper_entity_instance.id = wrapper_entity_instance_id;
187
188        // Add properties from entity_type if not existing
189        for property in wrapper_entity_type.properties.iter() {
190            trace!("Adding property {} from entity type {}", &property.name, &wrapper_entity_type.type_definition().to_string());
191            if !wrapper_entity_instance.properties.contains_key(&property.name) {
192                wrapper_entity_instance
193                    .properties
194                    .insert(property.name.clone(), property.data_type.default_value());
195            }
196        }
197
198        // Add properties from components if not existing
199        for component_ty in wrapper_entity_type.components.iter() {
200            if let Some(component) = self.component_manager.get(&component_ty) {
201                for property in component.properties.iter() {
202                    trace!("Adding property {} from component {}", &property.name, &component_ty.type_definition().to_string());
203                    if !wrapper_entity_instance.properties.contains_key(&property.name) {
204                        //
205                        // TODO: templating using the variables
206                        //
207                        wrapper_entity_instance
208                            .properties
209                            .insert(property.name.clone(), property.data_type.default_value());
210                    }
211                }
212            }
213        }
214
215        for property in properties.iter() {
216            let property_name = property.key();
217            let value = property.value();
218            trace!("Setting property {property_name} with value {value} from parameter");
219            wrapper_entity_instance.set(property_name, value.clone());
220        }
221
222        let entity_instances = EntityInstances::new_with_instance(wrapper_entity_instance);
223        for entity_instance in flow_type.entity_instances.iter() {
224            let entity_type = self
225                .entity_type_manager
226                .get(&entity_instance.ty)
227                .ok_or_else(|| ReactiveFlowCreationError::EntityTypeDoesntExist(entity_instance.ty.clone()))?;
228            let entity_instance_id = self.get_entity_instance_id_by_extension(&entity_instance, &variables);
229            entity_instance_id_mapping.insert(entity_instance.id, entity_instance_id);
230            let mut entity_instance_copy = entity_instance.clone();
231            entity_instance_copy.id = entity_instance_id;
232
233            // Add properties from entity_type if not existing
234            for property in entity_type.properties.iter() {
235                trace!("Adding property {} from entity type {}", &property.name, &entity_type.type_definition().to_string());
236                if !entity_instance_copy.properties.contains_key(&property.name) {
237                    //
238                    // TODO: templating using the variables
239                    //
240                    entity_instance_copy
241                        .properties
242                        .insert(property.name.clone(), property.data_type.default_value());
243                }
244            }
245
246            // Add properties from components if not existing
247            for component_ty in entity_type.components.iter() {
248                if let Some(component) = self.component_manager.get(&component_ty) {
249                    for property in component.properties.iter() {
250                        trace!("Adding property {} from component {}", &property.name, component_ty.type_definition());
251                        if !entity_instance_copy.properties.contains_key(&property.name) {
252                            entity_instance_copy
253                                .properties
254                                .insert(property.name.clone(), property.data_type.default_value());
255                        }
256                    }
257                }
258            }
259
260            // TODO: templating using the variables
261
262            // flow_instance_builder.entity(entity_instance_copy);
263            entity_instances.push(entity_instance_copy);
264        }
265        for (uf, ut) in entity_instance_id_mapping.iter() {
266            trace!("Mapping flow type entity instance id {uf} to actual entity instance id {ut}");
267        }
268
269        let relation_instances = RelationInstances::new();
270
271        for relation_instance in flow_type.relation_instances.iter() {
272            let relation_ty = relation_instance.relation_type_id();
273            trace!("Relation instance type: {}", &relation_instance.ty);
274            let relation_type = self
275                .relation_type_manager
276                .get(&relation_ty)
277                .ok_or(ReactiveFlowCreationError::RelationTypeDoesntExist(relation_ty))?;
278            let mut relation_instance_copy = relation_instance.clone();
279            match entity_instance_id_mapping.get(&relation_instance.outbound_id) {
280                Some(replaced_id) => relation_instance_copy.outbound_id = *replaced_id,
281                None => return Err(ReactiveFlowCreationError::InvalidOutboundId(relation_instance.outbound_id)),
282            }
283            match entity_instance_id_mapping.get(&relation_instance.inbound_id) {
284                Some(replaced_id) => relation_instance_copy.inbound_id = *replaced_id,
285                None => return Err(ReactiveFlowCreationError::InvalidInboundId(relation_instance.inbound_id)),
286            }
287
288            // Add properties from relation type if not existing
289            for property in relation_type.properties.iter() {
290                if !relation_instance_copy.properties.contains_key(&property.name) {
291                    //
292                    // TODO: templating using the variables
293                    //
294                    relation_instance_copy
295                        .properties
296                        .insert(property.name.clone(), property.data_type.default_value());
297                }
298            }
299
300            // Add properties from components if not existing
301            for component_ty in relation_type.components.iter() {
302                if let Some(component) = self.component_manager.get(&component_ty) {
303                    for property in component.properties.iter() {
304                        if !relation_instance_copy.properties.contains_key(&property.name) {
305                            //
306                            // TODO: templating using the variables
307                            //
308                            relation_instance_copy
309                                .properties
310                                .insert(property.name.clone(), property.data_type.default_value());
311                        }
312                    }
313                }
314            }
315
316            // TODO: templating using the variables
317
318            // flow_instance_builder.relation(relation_instance_copy);
319            relation_instances.push(relation_instance_copy);
320        }
321
322        // TODO
323        let flow_instance = FlowInstance::builder()
324            .ty(flow_type.wrapper_type())
325            .id(wrapper_entity_instance_id)
326            .name(flow_type.type_name()) // Default name for the flow instance is the flow type name.
327            .description(flow_type.description.clone())
328            .entity_instances(entity_instances)
329            .relation_instances(relation_instances)
330            .build();
331
332        // let flow_instance = flow_instance_builder.build();
333        trace!("{flow_instance:?}");
334        match ReactiveFlow::try_from(flow_instance) {
335            Ok(reactive_flow_instance) => {
336                self.register_flow_instance_and_reactive_instances(reactive_flow_instance.clone());
337
338                // Set or create properties given with the flow type instantiation
339                if let Some(wrapper_entity_instance) = reactive_flow_instance.get_wrapper_entity_instance() {
340                    for property in properties.iter() {
341                        let property_name = property.key();
342                        let property_value = property.value();
343                        if !wrapper_entity_instance.has_property(property_name) {
344                            trace!("Adding parameter property {property_name} with value {property_value} from parameter");
345                            wrapper_entity_instance.add_property(property_name, Mutable, property_value.clone());
346                        } else {
347                            trace!("Set parameter property {property_name} with value {property_value} from parameter");
348                            wrapper_entity_instance.set(property_name, property_value.clone());
349                        }
350                    }
351                }
352
353                Ok(reactive_flow_instance)
354            }
355            Err(e) => Err(ReactiveFlowCreationError::ReactiveFlowConstructionError(e)),
356        }
357    }
358
359    fn register_flow_instance_and_reactive_instances(&self, reactive_flow_instance: ReactiveFlow) {
360        if !self.has(reactive_flow_instance.id) {
361            {
362                // Step 1: Register all entity instances (if not already registered by uuid)
363                let mut entity_instances = reactive_flow_instance.entity_instances.write().unwrap();
364                let mut replaced_entity_instances = HashMap::<Uuid, ReactiveEntity>::new();
365                for (uuid, entity_instance) in entity_instances.iter() {
366                    // if let Some(entity_type) = self.entity_type_manager.get(&entity_instance.type_name) {
367                    //     for property in entity_type.properties.iter() {}
368                    // }
369                    match self.reactive_entity_manager.register_or_merge_reactive_instance(entity_instance.clone()) {
370                        Ok(entity_instance) => {
371                            // Replace the entity instance with the actual registered instance instead
372                            replaced_entity_instances.insert(*uuid, entity_instance);
373                        }
374                        Err(e) => {
375                            // This happens when a entity instance doesn't exist and cannot be created
376                            debug!("Failed to register entity instance {uuid}: {e:?}");
377                        }
378                    }
379                    // let entity_instance = self
380                    //     .reactive_entity_manager
381                    //     .register_or_merge_reactive_instance(entity_instance.clone());
382                    // // Replace the entity instance with the actual registered instance instead
383                    // replaced_entity_instances.insert(*uuid, entity_instance);
384                }
385
386                // Step 2: Replace the entity instances of the flow instance with the actual registered entity instances
387                entity_instances.clear();
388                for (uuid, entity_instance) in replaced_entity_instances.iter() {
389                    entity_instances.insert(*uuid, entity_instance.clone());
390                }
391
392                // Step 3: Recreate the reactive relation instances
393                // Because the entity instances might have been replaced by the actual registered entity instances
394                let mut relation_instances = reactive_flow_instance.relation_instances.write().unwrap();
395                let mut replaced_relation_instances = HashMap::<RelationInstanceId, ReactiveRelation>::new();
396                for (relation_instance_id, relation_instance) in relation_instances.iter() {
397                    let inbound_id = relation_instance.inbound.id;
398                    let outbound_id = relation_instance.outbound.id;
399
400                    let recreated_relation_instance = ReactiveRelation::new_from_instance(
401                        entity_instances.get(&outbound_id).unwrap().clone(),
402                        entity_instances.get(&inbound_id).unwrap().clone(),
403                        RelationInstance::from(relation_instance.clone()),
404                    );
405                    replaced_relation_instances.insert(relation_instance_id.clone(), recreated_relation_instance);
406                    // relation_instance.inbound = entity_instances.get(&inbound_id).unwrap().clone();
407                    // relation_instance.outbound = entity_instances.get(&outbound_id).unwrap().clone();
408                }
409
410                // Step 4: Replace the relation instances of the flow instance with the recreated relation instances
411                relation_instances.clear();
412                for (relation_instance_id, relation_instance) in replaced_relation_instances.iter() {
413                    relation_instances.insert(relation_instance_id.clone(), relation_instance.clone());
414                }
415
416                // Step 5: Register all (recreated) relation instances (if not already registered by relation_instance_id)
417                let mut replaced_relation_instances = HashMap::<RelationInstanceId, ReactiveRelation>::new();
418                for (relation_instance_id, relation_instance) in relation_instances.iter() {
419                    match self.reactive_relation_manager.register_or_merge_reactive_instance(relation_instance.clone()) {
420                        Ok(relation_instance) => {
421                            // Replace the relation instance with the actual registered instance
422                            replaced_relation_instances.insert(relation_instance_id.clone(), relation_instance);
423                        }
424                        Err(e) => {
425                            // This happens when a relation instance doesn't exist and cannot be created
426                            debug!("Failed to register relation instance {relation_instance_id:?}: {e:?}");
427                        }
428                    }
429                }
430
431                // Step 6: Replace the relation instances of the flow instance with the actual registered relation instances
432                relation_instances.clear();
433                for (relation_instance_id, relation_instance) in replaced_relation_instances.iter() {
434                    relation_instances.insert(relation_instance_id.clone(), relation_instance.clone());
435                }
436            } // Drop rwlock
437            self.register_flow_instance(reactive_flow_instance);
438        }
439    }
440
441    fn register_flow_instance(&self, reactive_flow_instance: ReactiveFlow) {
442        if !self.reactive_entity_manager.has(reactive_flow_instance.id) {
443            if let Some(wrapper_entity_instance) = reactive_flow_instance.get_entity(reactive_flow_instance.id) {
444                if let Err(e) = self.reactive_entity_manager.register_reactive_instance(wrapper_entity_instance) {
445                    error!("Failed to register wrapper entity instance of flow {}: {:?}", reactive_flow_instance.id, e);
446                }
447            }
448        }
449        self.reactive_flow_instances.insert(reactive_flow_instance.id, reactive_flow_instance.clone());
450        // self.reactive_flow_instances
451        //     .write()
452        //     .unwrap()
453        //     .insert(reactive_flow_instance.id, reactive_flow_instance.clone());
454        // Register label
455        if let Some(value) = reactive_flow_instance.get(LABEL.property_name()) {
456            if let Some(label) = value.as_str() {
457                let mut writer = self.label_path_tree.write().unwrap();
458                let _ = writer.insert(label, reactive_flow_instance.id);
459            }
460        }
461        self.reactive_instance_event_manager
462            .emit_event(ReactiveInstanceEvent::FlowInstanceCreated(reactive_flow_instance.id))
463    }
464
465    // // TODO: how to detect if the flow instance has removed an entity? => remove behaviour
466    // // TODO: how to detect if the flow instance has removed an relation? => remove behaviour
467    // fn commit(&self, id: Uuid) {
468    //     if let Some(reactive_flow_instance) = self.get(id) {
469    //         // Unregister removed relations
470    //         for relation_instance_id in reactive_flow_instance.relations_removed.read().unwrap().iter() {
471    //             self.reactive_relation_manager.unregister_reactive_instance(relation_instance_id);
472    //         }
473    //         reactive_flow_instance.relations_removed.write().unwrap().clear();
474    //
475    //         // Unregister removed entities
476    //         for id in reactive_flow_instance.entities_removed.read().unwrap().iter() {
477    //             self.reactive_entity_manager.unregister_reactive_instance(*id);
478    //         }
479    //         reactive_flow_instance.entities_removed.write().unwrap().clear();
480    //
481    //         // Register added entities
482    //         for id in reactive_flow_instance.entities_added.read().unwrap().iter() {
483    //             if let Some(entity_instance) = reactive_flow_instance.get_entity(*id) {
484    //                 // TODO: How to handle reactive if registering an entity instance wasn't successful?
485    //                 let _ = self.reactive_entity_manager.register_reactive_instance(entity_instance.clone());
486    //             }
487    //         }
488    //         reactive_flow_instance.entities_added.write().unwrap().clear();
489    //
490    //         // Register added relations
491    //         for relation_instance_id in reactive_flow_instance.relations_added.read().unwrap().iter() {
492    //             if let Some(relation_instance) = reactive_flow_instance.get_relation(relation_instance_id) {
493    //                 // TODO: How to handle reactive if registering a relation instance wasn't successful?
494    //                 let _ = self.reactive_relation_manager.register_reactive_instance(relation_instance.clone());
495    //             }
496    //         }
497    //         reactive_flow_instance.relations_added.write().unwrap().clear();
498    //
499    //         // for (_, entity_instance) in reactive_flow.entity_instances.read().unwrap().iter() {
500    //         //     if !self.reactive_entity_manager.has(entity_instance.id) {
501    //         //         self.reactive_entity_manager.register_reactive_instance(entity_instance.clone());
502    //         //     }
503    //         // }
504    //         // for (_, relation_instance) in reactive_flow.relation_instances.read().unwrap().iter() {
505    //         //     let relation_instance_id = relation_instance.get_key();
506    //         //     if relation_instance_id.is_some() {
507    //         //         let relation_instance_id = relation_instance_id.unwrap();
508    //         //         if !self.reactive_relation_manager.has(relation_instance_id.clone()) {
509    //         //             self.reactive_relation_manager.register_reactive_instance(relation_instance.clone());
510    //         //         }
511    //         //     }
512    //         // }
513    //
514    //         if let Ok(flow_instance) = FlowInstance::try_from(reactive_flow_instance) {
515    //             self.flow_instance_manager.commit(flow_instance);
516    //         }
517    //     }
518    // }
519
520    fn delete(&self, id: Uuid) -> bool {
521        let Some(reactive_flow_instance) = self.get(id) else {
522            return false;
523        };
524        for (_, entity_instance) in reactive_flow_instance.entity_instances.read().unwrap().iter() {
525            self.reactive_entity_manager.unregister_reactive_instance(entity_instance.id);
526        }
527        for (_, relation_instance) in reactive_flow_instance.relation_instances.read().unwrap().iter() {
528            self.reactive_relation_manager.unregister_reactive_instance(&relation_instance.id());
529        }
530        let result = self.reactive_flow_instances.remove(&id).is_some();
531        // let result = self.reactive_flow_instances.write().unwrap().remove(&id).is_some();
532        // TODO: remove label
533        self.reactive_instance_event_manager.emit_event(ReactiveInstanceEvent::FlowInstanceDeleted(id));
534        result
535    }
536
537    // fn import(&self, path: &str) -> Result<ReactiveFlow, ReactiveFlowImportError> {
538    //     if let Ok(flow_instance) = self.flow_instance_manager.import(path) {
539    //         if let Ok(reactive_flow_instance) = self.create(flow_instance) {
540    //             return Ok(reactive_flow_instance);
541    //         }
542    //     }
543    //     Err(ReactiveFlowImportError)
544    // }
545    //
546    // fn export(&self, id: Uuid, path: &str) {
547    //     if self.has(id) {
548    //         self.commit(id);
549    //         if let Ok(flow_instance) = FlowInstance::try_from(self.get(id).unwrap()) {
550    //             self.flow_instance_manager.export(flow_instance, path);
551    //         }
552    //     }
553    // }
554
555    fn register_provider(&self, id: Uuid, provider: Arc<dyn FlowInstanceProvider>) {
556        self.flow_instance_providers.insert(id, provider);
557    }
558
559    fn unregister_provider(&self, id: &Uuid) {
560        self.flow_instance_providers.remove(id);
561    }
562}
563
564#[async_trait]
565impl Lifecycle for ReactiveFlowManagerImpl {
566    async fn init(&self) {
567        debug!("Importing provided flow instances");
568        for flow_instance_provider in self.flow_instance_providers.iter() {
569            for flow_instance in flow_instance_provider.get_flow_instances() {
570                debug!("Creating provided flow instance {}", flow_instance.id);
571                let reactive_flow_instance = self.create_reactive_flow(flow_instance.clone());
572                match reactive_flow_instance {
573                    Ok(reactive_flow_instance) => {
574                        let created_flow_instance: Result<FlowInstance, _> = reactive_flow_instance.try_into();
575                        match created_flow_instance {
576                            Ok(created_flow_instance) => {
577                                let json = serde_json::to_string_pretty(&created_flow_instance).unwrap();
578                                debug!("Successfully created reactive flow instance:\r\n{json}");
579                            }
580                            Err(err) => {
581                                debug!("Successfully created reactive flow instance {}, but failed to serialize: {:?}", flow_instance.id, err);
582                            }
583                        }
584                    }
585                    Err(err) => {
586                        error!("Failed to create provided flow instance {}: {}", flow_instance.id, err);
587                    }
588                }
589            }
590        }
591    }
592
593    async fn shutdown(&self) {
594        // self.reactive_flow_instances.write().unwrap().clear();
595        // self.flow_instance_providers.write().unwrap().clear();
596    }
597}