reactive_graph_reactive_service_impl/
reactive_relation_manager_impl.rs

1use std::ops::Deref;
2use std::sync::Arc;
3use std::sync::atomic::AtomicBool;
4use std::sync::atomic::Ordering;
5
6use async_trait::async_trait;
7use dashmap::DashMap;
8use dashmap::DashSet;
9use reactive_graph_reactive_service_api::EventChannels;
10use reactive_graph_reactive_service_api::ReactiveInstanceEvent;
11use reactive_graph_reactive_service_api::ReactiveInstanceEventManager;
12use reactive_graph_reactive_service_api::ReactiveRelationComponentAddError;
13use reactive_graph_reactive_service_api::ReactiveRelationComponentRemoveError;
14use reactive_graph_reactive_service_api::ReactiveRelationCreationError;
15use reactive_graph_reactive_service_api::ReactiveRelationPropertyAddError;
16use reactive_graph_reactive_service_api::ReactiveRelationPropertyRemoveError;
17use reactive_graph_reactive_service_api::ReactiveRelationRegistrationError;
18use serde_json::Value;
19use springtime_di::Component;
20use tokio::time::Duration;
21use tokio::time::sleep;
22use uuid::Uuid;
23
24use reactive_graph_behaviour_model_api::BehaviourTypeId;
25use reactive_graph_behaviour_model_api::BehaviourTypesContainer;
26use reactive_graph_behaviour_model_api::ComponentBehaviourTypeId;
27use reactive_graph_behaviour_model_api::RelationBehaviourTypeId;
28use reactive_graph_behaviour_service_api::RelationBehaviourManager;
29use reactive_graph_behaviour_service_api::RelationComponentBehaviourManager;
30use reactive_graph_graph::ComponentContainer;
31use reactive_graph_graph::ComponentOrEntityTypeId;
32use reactive_graph_graph::ComponentTypeId;
33use reactive_graph_graph::Mutability;
34use reactive_graph_graph::NamespacedTypeGetter;
35use reactive_graph_graph::PropertyInstances;
36use reactive_graph_graph::PropertyTypeContainer;
37use reactive_graph_graph::PropertyTypeDefinition;
38use reactive_graph_graph::RelationInstance;
39use reactive_graph_graph::RelationInstanceId;
40use reactive_graph_graph::RelationTypeId;
41use reactive_graph_graph::TypeDefinitionComponent;
42use reactive_graph_graph::TypeDefinitionGetter;
43use reactive_graph_graph::TypeDefinitionProperty;
44use reactive_graph_lifecycle::Lifecycle;
45use reactive_graph_reactive_model_api::ReactiveInstance;
46use reactive_graph_reactive_model_api::ReactivePropertyContainer;
47use reactive_graph_reactive_model_impl::ReactiveRelation;
48use reactive_graph_reactive_service_api::ReactiveEntityManager;
49use reactive_graph_reactive_service_api::ReactiveRelationManager;
50use reactive_graph_runtime_model::EventProperties::EVENT;
51use reactive_graph_type_system_api::ComponentManager;
52use reactive_graph_type_system_api::RelationTypeManager;
53use reactive_graph_type_system_api::TypeSystemEventManager;
54use reactive_graph_type_system_api::TypeSystemEventSubscriber;
55use reactive_graph_type_system_api::TypeSystemEventTypes;
56use springtime_di::component_alias;
57
58static HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED: u128 = 0x6ba7b9210e1513d350b300c04fe530c7;
59static HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED: u128 = 0x6ba8b8119e1513ee59b300c04fe630c7;
60static HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED: u128 = 0x6bb9b9232e1513d350b300c04fe530c7;
61static HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED: u128 = 0x6ba8b8339e1535ee5bd300c0410630c7;
62
63pub struct OutboundInstances(DashMap<Uuid, DashSet<RelationInstanceId>>);
64
65impl OutboundInstances {
66    pub fn new() -> Self {
67        OutboundInstances(DashMap::new())
68    }
69
70    pub fn insert(&self, id: &RelationInstanceId) {
71        match self.0.get(&id.outbound_id) {
72            Some(outbound_instances) => {
73                outbound_instances.insert(id.clone());
74            }
75            None => {
76                let outbound_instances = DashSet::new();
77                outbound_instances.insert(id.clone());
78                self.0.insert(id.outbound_id, outbound_instances);
79            }
80        }
81    }
82
83    pub fn remove(&self, id: &RelationInstanceId) {
84        self.0.get(&id.outbound_id).and_then(|outbound_instances| outbound_instances.remove(id));
85    }
86
87    pub fn get(&self, id: Uuid) -> Option<DashSet<RelationInstanceId>> {
88        self.0.get(&id).map(|entry| entry.value().clone())
89    }
90}
91
92impl Default for OutboundInstances {
93    fn default() -> Self {
94        Self::new()
95    }
96}
97
98pub struct InboundInstances(DashMap<Uuid, DashSet<RelationInstanceId>>);
99
100impl InboundInstances {
101    pub fn new() -> Self {
102        InboundInstances(DashMap::new())
103    }
104
105    pub fn insert(&self, id: &RelationInstanceId) {
106        match self.0.get(&id.inbound_id) {
107            Some(inbound_instances) => {
108                inbound_instances.insert(id.clone());
109            }
110            None => {
111                let inbound_instances = DashSet::new();
112                inbound_instances.insert(id.clone());
113                self.0.insert(id.inbound_id, inbound_instances);
114            }
115        }
116    }
117
118    pub fn remove(&self, id: &RelationInstanceId) {
119        self.0.get(&id.inbound_id).and_then(|inbound_instances| inbound_instances.remove(id));
120    }
121
122    pub fn get(&self, id: Uuid) -> Option<DashSet<RelationInstanceId>> {
123        self.0.get(&id).map(|entry| entry.value().clone())
124    }
125}
126
127impl Default for InboundInstances {
128    fn default() -> Self {
129        Self::new()
130    }
131}
132
133fn create_running_state() -> Arc<AtomicBool> {
134    Arc::new(AtomicBool::new(true))
135}
136
137fn create_event_channels() -> EventChannels {
138    let event_channels = EventChannels::new();
139    event_channels.insert(HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED, crossbeam::channel::unbounded());
140    event_channels.insert(HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED, crossbeam::channel::unbounded());
141    event_channels.insert(HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED, crossbeam::channel::unbounded());
142    event_channels.insert(HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED, crossbeam::channel::unbounded());
143    event_channels
144}
145
146#[derive(Component)]
147pub struct ReactiveRelationManagerImpl {
148    reactive_instance_event_manager: Arc<dyn ReactiveInstanceEventManager + Send + Sync>,
149
150    type_system_event_manager: Arc<dyn TypeSystemEventManager + Send + Sync>,
151
152    component_manager: Arc<dyn ComponentManager + Send + Sync>,
153
154    relation_type_manager: Arc<dyn RelationTypeManager + Send + Sync>,
155
156    reactive_entity_manager: Arc<dyn ReactiveEntityManager + Send + Sync>,
157
158    relation_behaviour_manager: Arc<dyn RelationBehaviourManager + Send + Sync>,
159
160    relation_component_behaviour_manager: Arc<dyn RelationComponentBehaviourManager + Send + Sync>,
161
162    #[component(default = "DashMap::new")]
163    reactive_relation_instances: DashMap<RelationInstanceId, ReactiveRelation>, // ReactiveRelations,
164
165    #[component(default = "OutboundInstances::new")]
166    outbound_instances: OutboundInstances, // DashMap<Uuid, DashSet<RelationInstanceId>>, //
167
168    #[component(default = "InboundInstances::new")]
169    inbound_instances: InboundInstances, // DashMap<Uuid, DashSet<RelationInstanceId>>, //
170
171    #[component(default = "create_running_state")]
172    running: Arc<AtomicBool>, // RunningState,
173
174    #[component(default = "create_event_channels")]
175    event_channels: EventChannels,
176}
177
178#[async_trait]
179#[component_alias]
180impl ReactiveRelationManager for ReactiveRelationManagerImpl {
181    fn has(&self, id: &RelationInstanceId) -> bool {
182        self.reactive_relation_instances.contains_key(id)
183    }
184
185    fn get(&self, id: &RelationInstanceId) -> Option<ReactiveRelation> {
186        self.reactive_relation_instances.get(id).map(|r| r.value().clone())
187    }
188
189    fn get_by_outbound_entity(&self, outbound_entity_id: Uuid) -> Vec<ReactiveRelation> {
190        self.outbound_instances
191            .get(outbound_entity_id)
192            .and_then(|outbound_instances| outbound_instances.iter().map(|id| self.get(id.deref())).collect())
193            .unwrap_or_default()
194    }
195
196    fn get_by_inbound_entity(&self, inbound_entity_id: Uuid) -> Vec<ReactiveRelation> {
197        self.inbound_instances
198            .get(inbound_entity_id)
199            .and_then(|inbound_instances| inbound_instances.iter().map(|id| self.get(id.deref())).collect())
200            .unwrap_or_default()
201    }
202
203    fn get_all(&self) -> Vec<ReactiveRelation> {
204        self.reactive_relation_instances.iter().map(|r| r.value().clone()).collect()
205    }
206
207    fn get_by_type(&self, ty: &RelationTypeId) -> Vec<ReactiveRelation> {
208        self.reactive_relation_instances
209            .iter()
210            .filter(|r| &r.relation_type_id() == ty)
211            .map(|r| r.value().clone())
212            .collect()
213    }
214
215    fn get_by_component(&self, ty: &ComponentTypeId) -> Vec<ReactiveRelation> {
216        self.reactive_relation_instances
217            .iter()
218            .filter(|e| e.is_a(ty))
219            .map(|e| e.value().clone())
220            .collect()
221    }
222
223    fn get_by_behaviour(&self, behaviour_ty: &BehaviourTypeId) -> Vec<ReactiveRelation> {
224        self.reactive_relation_instances
225            .iter()
226            .filter(|e| e.behaves_as(behaviour_ty))
227            .map(|e| e.value().clone())
228            .collect()
229    }
230
231    fn get_by_namespace(&self, namespace: &str) -> Vec<ReactiveRelation> {
232        self.reactive_relation_instances
233            .iter()
234            .filter(|r| r.namespace() == namespace)
235            .map(|r| r.value().clone())
236            .collect()
237    }
238
239    fn count(&self) -> usize {
240        self.reactive_relation_instances.len()
241    }
242
243    fn count_by_type(&self, ty: &RelationTypeId) -> usize {
244        self.reactive_relation_instances.iter().filter(|r| &r.relation_type_id() == ty).count()
245    }
246
247    fn count_by_component(&self, component_ty: &ComponentTypeId) -> usize {
248        self.reactive_relation_instances.iter().filter(|r| r.is_a(component_ty)).count()
249    }
250
251    fn count_by_behaviour(&self, behaviour_ty: &BehaviourTypeId) -> usize {
252        self.reactive_relation_instances.iter().filter(|r| r.behaves_as(behaviour_ty)).count()
253    }
254
255    fn get_relation_instance_ids(&self) -> Vec<RelationInstanceId> {
256        self.reactive_relation_instances.iter().map(|e| e.key().clone()).collect()
257    }
258
259    fn create_reactive_relation(&self, id: &RelationInstanceId, properties: PropertyInstances) -> Result<ReactiveRelation, ReactiveRelationCreationError> {
260        let relation_instance = RelationInstance::builder()
261            .outbound_id(id.outbound_id)
262            .ty(id.ty.clone())
263            .inbound_id(id.inbound_id)
264            .properties(properties)
265            .build();
266        self.create_reactive_instance(relation_instance)
267        // match self.relation_instance_manager.create(relation_instance_id, properties) {
268        //     Ok(relation_instance_id) => match self.relation_instance_manager.get(&relation_instance_id) {
269        //         Some(relation_instance) => self.create_reactive_instance(relation_instance),
270        //         None => Err(ReactiveRelationCreationError::MissingInstance(relation_instance_id)),
271        //     },
272        //     Err(e) => Err(ReactiveRelationCreationError::RelationInstanceCreationError(e)),
273        // }
274    }
275
276    fn create_reactive_instance(&self, reactive_relation_instance: RelationInstance) -> Result<ReactiveRelation, ReactiveRelationCreationError> {
277        let outbound = self
278            .reactive_entity_manager
279            .get(reactive_relation_instance.outbound_id)
280            .ok_or(ReactiveRelationCreationError::MissingOutboundEntityInstance(reactive_relation_instance.outbound_id))?;
281        let inbound = self
282            .reactive_entity_manager
283            .get(reactive_relation_instance.inbound_id)
284            .ok_or(ReactiveRelationCreationError::MissingInboundEntityInstance(reactive_relation_instance.inbound_id))?;
285        let ty = reactive_relation_instance.ty.clone();
286        let relation_ty = ty.relation_type_id();
287        let relation_type = self
288            .relation_type_manager
289            .get(&relation_ty)
290            .ok_or_else(|| ReactiveRelationCreationError::UnknownRelationType(relation_ty.clone()))?;
291
292        if !relation_type.outbound_type.type_name().eq("*") {
293            match &relation_type.outbound_type {
294                ComponentOrEntityTypeId::Component(component_ty) => {
295                    if !outbound.components.contains(component_ty) {
296                        return Err(ReactiveRelationCreationError::OutboundEntityDoesNotHaveComponent(outbound.id, component_ty.clone()));
297                    }
298                }
299                ComponentOrEntityTypeId::EntityType(entity_ty) => {
300                    if &outbound.ty != entity_ty {
301                        return Err(ReactiveRelationCreationError::OutboundEntityIsNotOfType(outbound.id, outbound.ty.clone(), entity_ty.clone()));
302                    }
303                }
304            }
305        }
306        // if !relation_type.outbound_type.eq("*")
307        //     && !outbound.type_name.eq(&relation_type.outbound_type)
308        //     && !outbound.components.contains(&relation_type.outbound_type)
309        // {
310        //     return Err(ReactiveRelationCreationError::OutboundEntityIsNotOfType(
311        //         outbound.type_name.clone(),
312        //         relation_type.outbound_type,
313        //     ));
314        // }
315
316        if !relation_type.inbound_type.type_name().eq("*") {
317            match &relation_type.inbound_type {
318                ComponentOrEntityTypeId::Component(component_ty) => {
319                    if !inbound.components.contains(component_ty) {
320                        return Err(ReactiveRelationCreationError::InboundEntityDoesNotHaveComponent(inbound.id, component_ty.clone()));
321                    }
322                }
323                ComponentOrEntityTypeId::EntityType(entity_ty) => {
324                    if &inbound.ty != entity_ty {
325                        return Err(ReactiveRelationCreationError::InboundEntityIsNotOfType(inbound.id, inbound.ty.clone(), entity_ty.clone()));
326                    }
327                }
328            }
329        }
330
331        // if !relation_type.inbound_type.eq("*")
332        //     && !inbound.type_name.eq(&relation_type.inbound_type)
333        //     && !inbound.components.contains(&relation_type.inbound_type)
334        // {
335        //     return Err(ReactiveRelationCreationError::InboundEntityIsNotOfType(
336        //         inbound.type_name.clone(),
337        //         relation_type.inbound_type,
338        //     ));
339        // }
340
341        let relation_instance = ReactiveRelation::new_from_instance(outbound, inbound, reactive_relation_instance);
342
343        // Initialize property mutability states
344        if let Some(entity_type) = self.relation_type_manager.get(&relation_instance.relation_type_id()) {
345            for component_ty in entity_type.components {
346                if let Some(component) = self.component_manager.get(&component_ty) {
347                    for property_type in component.properties.iter() {
348                        if let Some(mut property) = relation_instance.properties.get_mut(&property_type.name) {
349                            property.set_mutability(property_type.mutability);
350                        }
351                    }
352                }
353            }
354            for property_type in entity_type.properties.iter() {
355                if let Some(mut property) = relation_instance.properties.get_mut(&property_type.name) {
356                    property.set_mutability(property_type.mutability);
357                }
358            }
359        }
360
361        self.register_reactive_instance(relation_instance)
362            .map_err(ReactiveRelationCreationError::ReactiveRelationRegistrationError)
363    }
364
365    fn register_reactive_instance(&self, reactive_relation: ReactiveRelation) -> Result<ReactiveRelation, ReactiveRelationRegistrationError> {
366        let id = reactive_relation.id();
367        if self.reactive_relation_instances.contains_key(&id) {
368            return Err(ReactiveRelationRegistrationError::RelationInstanceAlreadyExists(id.clone()));
369        }
370        // TODO: check if id already exists
371        self.reactive_relation_instances.insert(id.clone(), reactive_relation.clone());
372        self.outbound_instances.insert(&id);
373        self.inbound_instances.insert(&id);
374        // Apply all components that are predefined in the relation type
375        let relation_ty = reactive_relation.relation_type_id();
376        if let Some(components) = self.relation_type_manager.get(&relation_ty).map(|relation_type| relation_type.components) {
377            components.iter().for_each(|component_ty| {
378                reactive_relation.components.insert(component_ty.clone());
379            });
380        }
381        // Add component behaviours
382        self.relation_component_behaviour_manager.add_behaviours_to_relation(reactive_relation.clone());
383        // Add relation behaviours
384        self.relation_behaviour_manager.add_behaviours(reactive_relation.clone());
385        self.reactive_instance_event_manager
386            .emit_event(ReactiveInstanceEvent::RelationInstanceCreated(id));
387        Ok(reactive_relation)
388
389        // match self
390        //     .relation_instance_manager
391        //     .create_from_instance_if_not_exist(reactive_relation_instance.clone().into())
392        // {
393        //     Ok(relation_instance_id) => {
394        //         self.reactive_relation_instances.insert(relation_instance_id.clone(), reactive_relation_instance.clone());
395        //         // Apply all components that are predefined in the relation type
396        //         let relation_ty = reactive_relation_instance.relation_type_id();
397        //         if let Some(components) = self.relation_type_manager.get(&relation_ty).map(|relation_type| relation_type.components) {
398        //             components.iter().for_each(|component_ty| {
399        //                 reactive_relation_instance.components.insert(component_ty.clone());
400        //             });
401        //         }
402        //         // Add component behaviours
403        //         self.relation_component_behaviour_manager
404        //             .add_behaviours_to_relation(reactive_relation_instance.clone());
405        //         // Add relation behaviours
406        //         self.relation_behaviour_manager.add_behaviours(reactive_relation_instance.clone());
407        //         self.event_manager.emit_event(SystemEvent::RelationInstanceCreated(relation_instance_id));
408        //         Ok(reactive_relation_instance)
409        //     }
410        //     Err(e) => Err(ReactiveRelationRegistrationError::RelationInstanceCreationError(e)),
411        // }
412    }
413
414    fn register_or_merge_reactive_instance(&self, relation_instance: ReactiveRelation) -> Result<ReactiveRelation, ReactiveRelationRegistrationError> {
415        let id = relation_instance.id();
416        match self.get(&id) {
417            // No instance with the given relation instance id exists yet, try to register the given reactive instance
418            None => self.register_reactive_instance(relation_instance),
419            // Instance with the given relation instance id exists. Don't register but return the existing reactive instance instead of the given instance
420            Some(reactive_relation_instance) => Ok(reactive_relation_instance),
421        }
422    }
423
424    fn add_component(&self, id: &RelationInstanceId, component_ty: &ComponentTypeId) -> Result<(), ReactiveRelationComponentAddError> {
425        let Some(component) = self.component_manager.get(component_ty) else {
426            return Err(ReactiveRelationComponentAddError::ComponentNotRegistered(component_ty.clone()));
427        };
428        let Some(reactive_relation) = self.get(id) else {
429            return Err(ReactiveRelationComponentAddError::MissingInstance(id.clone()));
430        };
431        if reactive_relation.is_a(component_ty) {
432            return Err(ReactiveRelationComponentAddError::IsAlreadyA(component_ty.clone()));
433        }
434        // Add components with properties
435        reactive_relation.add_component_with_properties(&component);
436        // Add component behaviours
437        self.relation_component_behaviour_manager
438            .add_behaviours_to_relation_component(reactive_relation, component);
439        Ok(())
440    }
441
442    fn remove_component(&self, id: &RelationInstanceId, component_ty: &ComponentTypeId) -> Result<(), ReactiveRelationComponentRemoveError> {
443        let Some(reactive_relation) = self.get(id) else {
444            return Err(ReactiveRelationComponentRemoveError::MissingInstance(id.clone()));
445        };
446        if !reactive_relation.is_a(component_ty) {
447            return Err(ReactiveRelationComponentRemoveError::IsNotA(component_ty.clone()));
448        }
449        let Some(component) = self.component_manager.get(component_ty) else {
450            return Err(ReactiveRelationComponentRemoveError::ComponentNotRegistered(component_ty.clone()));
451        };
452        // Remove component
453        reactive_relation.remove_component(component_ty);
454        //
455        // We do not remove properties because we cannot ensure that the removal is intended
456        // (At least yet)
457        //
458        // Remove component behaviours
459        self.relation_component_behaviour_manager
460            .remove_behaviours_from_relation_component(reactive_relation, component);
461        Ok(())
462    }
463
464    fn add_property(
465        &self,
466        relation_instance_id: &RelationInstanceId,
467        property_name: &str,
468        mutability: Mutability,
469        value: Value,
470    ) -> Result<(), ReactiveRelationPropertyAddError> {
471        match self.get(relation_instance_id) {
472            Some(relation_instance) => {
473                if relation_instance.has_property(property_name) {
474                    return Err(ReactiveRelationPropertyAddError::PropertyAlreadyExists(property_name.to_string()));
475                }
476                relation_instance.add_property(property_name, mutability, value);
477                Ok(())
478            }
479            None => Err(ReactiveRelationPropertyAddError::MissingInstance(relation_instance_id.clone())),
480        }
481    }
482
483    fn remove_property(&self, relation_instance_id: &RelationInstanceId, property_name: &str) -> Result<(), ReactiveRelationPropertyRemoveError> {
484        match self.get(relation_instance_id) {
485            Some(relation_instance) => {
486                if !relation_instance.has_property(property_name) {
487                    return Err(ReactiveRelationPropertyRemoveError::MissingProperty(property_name.to_string()));
488                }
489                for component_ty in relation_instance.get_components() {
490                    if let Some(component) = self.component_manager.get(&component_ty) {
491                        if component.has_own_property(property_name) {
492                            return Err(ReactiveRelationPropertyRemoveError::PropertyInUseByComponent(component_ty.clone()));
493                        }
494                    }
495                }
496                relation_instance.remove_property(property_name);
497                Ok(())
498            }
499            None => Err(ReactiveRelationPropertyRemoveError::MissingInstance(relation_instance_id.clone())),
500        }
501    }
502
503    fn add_behaviour_to_all_relation_instances(&self, relation_behaviour_ty: &RelationBehaviourTypeId) {
504        for relation_instance in self.reactive_relation_instances.iter() {
505            if relation_instance.relation_type_id() == relation_behaviour_ty.relation_ty {
506                self.relation_behaviour_manager
507                    .add_behaviour(relation_instance.clone(), &relation_behaviour_ty.behaviour_ty);
508            }
509        }
510    }
511
512    fn add_behaviour_to_all_relation_components(&self, component_behaviour_ty: &ComponentBehaviourTypeId) {
513        for relation_instance in self.reactive_relation_instances.iter() {
514            if relation_instance.components.contains(&component_behaviour_ty.component_ty) {
515                self.relation_component_behaviour_manager
516                    .add_behaviour_to_relation_component(relation_instance.clone(), component_behaviour_ty);
517            }
518        }
519    }
520
521    // fn commit(&self, relation_instance_id: &RelationInstanceId) {
522    //     if let Some(reactive_relation_instance) = self.get(relation_instance_id) {
523    //         self.relation_instance_manager.commit(reactive_relation_instance.into());
524    //     }
525    // }
526
527    fn delete(&self, id: &RelationInstanceId) -> bool {
528        if self.has(id) {
529            self.unregister_reactive_instance(id);
530            self.reactive_instance_event_manager
531                .emit_event(ReactiveInstanceEvent::RelationInstanceDeleted(id.clone()));
532            true
533        } else {
534            false
535        }
536    }
537
538    fn unregister_reactive_instance(&self, id: &RelationInstanceId) {
539        match self.get(id) {
540            Some(relation_instance) => {
541                // Remove relation behaviours
542                self.relation_behaviour_manager.remove_behaviours(relation_instance.clone());
543                // Remove component behaviours
544                self.relation_component_behaviour_manager.remove_behaviours_from_relation(relation_instance);
545            }
546            None => {
547                // Remove relation behaviours
548                self.relation_behaviour_manager.remove_behaviours_by_key(id);
549                // Remove component behaviours
550                self.relation_component_behaviour_manager.remove_behaviours_by_key(id);
551            }
552        }
553        self.outbound_instances.remove(id);
554        self.inbound_instances.remove(id);
555        self.reactive_relation_instances.remove(id);
556    }
557
558    fn handle_component_added_events(&self) {
559        let component_manager = self.component_manager.clone();
560        let relation_component_behaviour_manager = self.relation_component_behaviour_manager.clone();
561        let reactive_relation_instances = self.reactive_relation_instances.clone();
562        let running = self.running.clone();
563        if let Some(receiver) = self.event_channels.receiver(&HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED) {
564            tokio::task::spawn(async move {
565                while running.load(Ordering::Relaxed) {
566                    match receiver.try_recv() {
567                        Ok(type_definition_component_event) => {
568                            if let Ok(type_definition_component) = TypeDefinitionComponent::try_from(type_definition_component_event.clone()) {
569                                if let Some(component) = component_manager.get(&type_definition_component.component_ty) {
570                                    for reactive_relation_instance in reactive_relation_instances
571                                        .iter()
572                                        .filter(|relation_instance| {
573                                            relation_instance.relation_type_id().type_definition() == type_definition_component.type_definition
574                                        })
575                                        .map(|relation_instance| relation_instance.value().clone())
576                                    {
577                                        reactive_relation_instance.add_component_with_properties(&component);
578                                        relation_component_behaviour_manager
579                                            .add_behaviours_to_relation_component(reactive_relation_instance, component.clone());
580                                    }
581                                }
582                            }
583                        }
584                        Err(_) => {
585                            sleep(Duration::from_millis(100)).await;
586                        }
587                    }
588                }
589            });
590        }
591    }
592
593    fn handle_component_removed_events(&self) {
594        let component_manager = self.component_manager.clone();
595        let relation_component_behaviour_manager = self.relation_component_behaviour_manager.clone();
596        let reactive_relation_instances = self.reactive_relation_instances.clone();
597        let running = self.running.clone();
598        if let Some(receiver) = self.event_channels.receiver(&HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED) {
599            tokio::spawn(async move {
600                while running.load(Ordering::Relaxed) {
601                    match receiver.try_recv() {
602                        Ok(type_definition_component_event) => {
603                            if let Ok(type_definition_component) = TypeDefinitionComponent::try_from(type_definition_component_event.clone()) {
604                                if let Some(component) = component_manager.get(&type_definition_component.component_ty) {
605                                    for reactive_relation_instance in reactive_relation_instances
606                                        .iter()
607                                        .filter(|relation_instance| {
608                                            relation_instance.relation_type_id().type_definition() == type_definition_component.type_definition
609                                        })
610                                        .map(|relation_instance| relation_instance.value().clone())
611                                    {
612                                        reactive_relation_instance.remove_component(&component.ty);
613                                        relation_component_behaviour_manager
614                                            .remove_behaviours_from_relation_component(reactive_relation_instance, component.clone());
615                                    }
616                                }
617                            }
618                        }
619                        Err(_) => {
620                            sleep(Duration::from_millis(100)).await;
621                        }
622                    }
623                }
624            });
625        }
626    }
627
628    fn handle_property_added_events(&self) {
629        let relation_type_manager = self.relation_type_manager.clone();
630        let reactive_relation_instances = self.reactive_relation_instances.clone();
631        let running = self.running.clone();
632        if let Some(receiver) = self.event_channels.receiver(&HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED) {
633            tokio::spawn(async move {
634                while running.load(Ordering::Relaxed) {
635                    match receiver.try_recv() {
636                        Ok(type_definition_property_event) => {
637                            if let Ok(type_definition_property) = TypeDefinitionProperty::try_from(type_definition_property_event.clone()) {
638                                if let Ok(relation_ty) = RelationTypeId::try_from(&type_definition_property.type_definition) {
639                                    if let Some(relation_type) = relation_type_manager.get(&relation_ty) {
640                                        for reactive_relation_instance in reactive_relation_instances
641                                            .iter()
642                                            .filter(|relation_instance| relation_instance.relation_type_id() == relation_ty)
643                                            .map(|relation_instance| relation_instance.value().clone())
644                                        {
645                                            if let Some(property_type) = relation_type.get_own_property(&type_definition_property.property) {
646                                                reactive_relation_instance.add_property_by_type(&property_type);
647                                            }
648                                        }
649                                    }
650                                }
651                            }
652                        }
653                        Err(_) => {
654                            sleep(Duration::from_millis(100)).await;
655                        }
656                    }
657                }
658            });
659        }
660    }
661
662    fn handle_property_removed_events(&self) {
663        let reactive_relation_instances = self.reactive_relation_instances.clone();
664        let running = self.running.clone();
665        if let Some(receiver) = self.event_channels.receiver(&HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED) {
666            tokio::spawn(async move {
667                while running.load(Ordering::Relaxed) {
668                    match receiver.try_recv() {
669                        Ok(type_definition_property_event) => {
670                            if let Ok(type_definition_property) = TypeDefinitionProperty::try_from(type_definition_property_event.clone()) {
671                                for reactive_relation_instance in reactive_relation_instances
672                                    .iter()
673                                    .filter(|relation_instance| {
674                                        relation_instance.relation_type_id().type_definition() == type_definition_property.type_definition
675                                    })
676                                    .map(|relation_instance| relation_instance.value().clone())
677                                {
678                                    reactive_relation_instance.remove_property(&type_definition_property.property);
679                                }
680                            }
681                        }
682                        Err(_) => {
683                            sleep(Duration::from_millis(100)).await;
684                        }
685                    }
686                }
687            });
688        }
689    }
690}
691
692impl TypeSystemEventSubscriber for ReactiveRelationManagerImpl {
693    fn subscribe_type_system_event(&self, system_event_type: TypeSystemEventTypes, handle_id: u128) {
694        if let Some(entity_instance) = self.type_system_event_manager.get_type_system_event_instance(system_event_type) {
695            if let Some(sender) = self.event_channels.sender(&handle_id) {
696                entity_instance.observe_with_handle(
697                    &EVENT.property_name(),
698                    move |v| {
699                        let _ = sender.send(v.clone());
700                    },
701                    handle_id,
702                );
703            }
704        }
705    }
706
707    fn unsubscribe_type_system_event(&self, system_event_type: TypeSystemEventTypes, handle_id: u128) {
708        if let Some(entity_instance) = self.type_system_event_manager.get_type_system_event_instance(system_event_type) {
709            entity_instance.remove_observer(&EVENT.property_name(), handle_id);
710        }
711    }
712}
713
714#[async_trait]
715impl Lifecycle for ReactiveRelationManagerImpl {
716    async fn post_init(&self) {
717        self.subscribe_type_system_event(TypeSystemEventTypes::RelationTypeComponentAdded, HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED);
718        self.subscribe_type_system_event(TypeSystemEventTypes::RelationTypeComponentRemoved, HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED);
719        self.subscribe_type_system_event(TypeSystemEventTypes::RelationTypePropertyAdded, HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED);
720        self.subscribe_type_system_event(TypeSystemEventTypes::RelationTypePropertyRemoved, HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED);
721
722        self.handle_component_added_events();
723        self.handle_component_removed_events();
724        self.handle_property_added_events();
725        self.handle_property_removed_events();
726    }
727
728    async fn pre_shutdown(&self) {
729        self.running.store(false, Ordering::Relaxed);
730
731        self.unsubscribe_type_system_event(TypeSystemEventTypes::RelationTypePropertyRemoved, HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED);
732        self.unsubscribe_type_system_event(TypeSystemEventTypes::RelationTypePropertyAdded, HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED);
733        self.unsubscribe_type_system_event(TypeSystemEventTypes::RelationTypeComponentRemoved, HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED);
734        self.unsubscribe_type_system_event(TypeSystemEventTypes::RelationTypeComponentAdded, HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED);
735    }
736}