reactive_graph_reactive_service_impl/
reactive_relation_manager_impl.rs1use std::ops::Deref;
2use std::sync::Arc;
3use std::sync::atomic::AtomicBool;
4use std::sync::atomic::Ordering;
5
6use async_trait::async_trait;
7use dashmap::DashMap;
8use dashmap::DashSet;
9use reactive_graph_reactive_service_api::EventChannels;
10use reactive_graph_reactive_service_api::ReactiveInstanceEvent;
11use reactive_graph_reactive_service_api::ReactiveInstanceEventManager;
12use reactive_graph_reactive_service_api::ReactiveRelationComponentAddError;
13use reactive_graph_reactive_service_api::ReactiveRelationComponentRemoveError;
14use reactive_graph_reactive_service_api::ReactiveRelationCreationError;
15use reactive_graph_reactive_service_api::ReactiveRelationPropertyAddError;
16use reactive_graph_reactive_service_api::ReactiveRelationPropertyRemoveError;
17use reactive_graph_reactive_service_api::ReactiveRelationRegistrationError;
18use serde_json::Value;
19use springtime_di::Component;
20use tokio::time::Duration;
21use tokio::time::sleep;
22use uuid::Uuid;
23
24use reactive_graph_behaviour_model_api::BehaviourTypeId;
25use reactive_graph_behaviour_model_api::BehaviourTypesContainer;
26use reactive_graph_behaviour_model_api::ComponentBehaviourTypeId;
27use reactive_graph_behaviour_model_api::RelationBehaviourTypeId;
28use reactive_graph_behaviour_service_api::RelationBehaviourManager;
29use reactive_graph_behaviour_service_api::RelationComponentBehaviourManager;
30use reactive_graph_graph::ComponentContainer;
31use reactive_graph_graph::ComponentOrEntityTypeId;
32use reactive_graph_graph::ComponentTypeId;
33use reactive_graph_graph::Mutability;
34use reactive_graph_graph::NamespacedTypeGetter;
35use reactive_graph_graph::PropertyInstances;
36use reactive_graph_graph::PropertyTypeContainer;
37use reactive_graph_graph::PropertyTypeDefinition;
38use reactive_graph_graph::RelationInstance;
39use reactive_graph_graph::RelationInstanceId;
40use reactive_graph_graph::RelationTypeId;
41use reactive_graph_graph::TypeDefinitionComponent;
42use reactive_graph_graph::TypeDefinitionGetter;
43use reactive_graph_graph::TypeDefinitionProperty;
44use reactive_graph_lifecycle::Lifecycle;
45use reactive_graph_reactive_model_api::ReactiveInstance;
46use reactive_graph_reactive_model_api::ReactivePropertyContainer;
47use reactive_graph_reactive_model_impl::ReactiveRelation;
48use reactive_graph_reactive_service_api::ReactiveEntityManager;
49use reactive_graph_reactive_service_api::ReactiveRelationManager;
50use reactive_graph_runtime_model::EventProperties::EVENT;
51use reactive_graph_type_system_api::ComponentManager;
52use reactive_graph_type_system_api::RelationTypeManager;
53use reactive_graph_type_system_api::TypeSystemEventManager;
54use reactive_graph_type_system_api::TypeSystemEventSubscriber;
55use reactive_graph_type_system_api::TypeSystemEventTypes;
56use springtime_di::component_alias;
57
58static HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED: u128 = 0x6ba7b9210e1513d350b300c04fe530c7;
59static HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED: u128 = 0x6ba8b8119e1513ee59b300c04fe630c7;
60static HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED: u128 = 0x6bb9b9232e1513d350b300c04fe530c7;
61static HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED: u128 = 0x6ba8b8339e1535ee5bd300c0410630c7;
62
63pub struct OutboundInstances(DashMap<Uuid, DashSet<RelationInstanceId>>);
64
65impl OutboundInstances {
66 pub fn new() -> Self {
67 OutboundInstances(DashMap::new())
68 }
69
70 pub fn insert(&self, id: &RelationInstanceId) {
71 match self.0.get(&id.outbound_id) {
72 Some(outbound_instances) => {
73 outbound_instances.insert(id.clone());
74 }
75 None => {
76 let outbound_instances = DashSet::new();
77 outbound_instances.insert(id.clone());
78 self.0.insert(id.outbound_id, outbound_instances);
79 }
80 }
81 }
82
83 pub fn remove(&self, id: &RelationInstanceId) {
84 self.0.get(&id.outbound_id).and_then(|outbound_instances| outbound_instances.remove(id));
85 }
86
87 pub fn get(&self, id: Uuid) -> Option<DashSet<RelationInstanceId>> {
88 self.0.get(&id).map(|entry| entry.value().clone())
89 }
90}
91
92impl Default for OutboundInstances {
93 fn default() -> Self {
94 Self::new()
95 }
96}
97
98pub struct InboundInstances(DashMap<Uuid, DashSet<RelationInstanceId>>);
99
100impl InboundInstances {
101 pub fn new() -> Self {
102 InboundInstances(DashMap::new())
103 }
104
105 pub fn insert(&self, id: &RelationInstanceId) {
106 match self.0.get(&id.inbound_id) {
107 Some(inbound_instances) => {
108 inbound_instances.insert(id.clone());
109 }
110 None => {
111 let inbound_instances = DashSet::new();
112 inbound_instances.insert(id.clone());
113 self.0.insert(id.inbound_id, inbound_instances);
114 }
115 }
116 }
117
118 pub fn remove(&self, id: &RelationInstanceId) {
119 self.0.get(&id.inbound_id).and_then(|inbound_instances| inbound_instances.remove(id));
120 }
121
122 pub fn get(&self, id: Uuid) -> Option<DashSet<RelationInstanceId>> {
123 self.0.get(&id).map(|entry| entry.value().clone())
124 }
125}
126
127impl Default for InboundInstances {
128 fn default() -> Self {
129 Self::new()
130 }
131}
132
133fn create_running_state() -> Arc<AtomicBool> {
134 Arc::new(AtomicBool::new(true))
135}
136
137fn create_event_channels() -> EventChannels {
138 let event_channels = EventChannels::new();
139 event_channels.insert(HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED, crossbeam::channel::unbounded());
140 event_channels.insert(HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED, crossbeam::channel::unbounded());
141 event_channels.insert(HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED, crossbeam::channel::unbounded());
142 event_channels.insert(HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED, crossbeam::channel::unbounded());
143 event_channels
144}
145
146#[derive(Component)]
147pub struct ReactiveRelationManagerImpl {
148 reactive_instance_event_manager: Arc<dyn ReactiveInstanceEventManager + Send + Sync>,
149
150 type_system_event_manager: Arc<dyn TypeSystemEventManager + Send + Sync>,
151
152 component_manager: Arc<dyn ComponentManager + Send + Sync>,
153
154 relation_type_manager: Arc<dyn RelationTypeManager + Send + Sync>,
155
156 reactive_entity_manager: Arc<dyn ReactiveEntityManager + Send + Sync>,
157
158 relation_behaviour_manager: Arc<dyn RelationBehaviourManager + Send + Sync>,
159
160 relation_component_behaviour_manager: Arc<dyn RelationComponentBehaviourManager + Send + Sync>,
161
162 #[component(default = "DashMap::new")]
163 reactive_relation_instances: DashMap<RelationInstanceId, ReactiveRelation>, #[component(default = "OutboundInstances::new")]
166 outbound_instances: OutboundInstances, #[component(default = "InboundInstances::new")]
169 inbound_instances: InboundInstances, #[component(default = "create_running_state")]
172 running: Arc<AtomicBool>, #[component(default = "create_event_channels")]
175 event_channels: EventChannels,
176}
177
178#[async_trait]
179#[component_alias]
180impl ReactiveRelationManager for ReactiveRelationManagerImpl {
181 fn has(&self, id: &RelationInstanceId) -> bool {
182 self.reactive_relation_instances.contains_key(id)
183 }
184
185 fn get(&self, id: &RelationInstanceId) -> Option<ReactiveRelation> {
186 self.reactive_relation_instances.get(id).map(|r| r.value().clone())
187 }
188
189 fn get_by_outbound_entity(&self, outbound_entity_id: Uuid) -> Vec<ReactiveRelation> {
190 self.outbound_instances
191 .get(outbound_entity_id)
192 .and_then(|outbound_instances| outbound_instances.iter().map(|id| self.get(id.deref())).collect())
193 .unwrap_or_default()
194 }
195
196 fn get_by_inbound_entity(&self, inbound_entity_id: Uuid) -> Vec<ReactiveRelation> {
197 self.inbound_instances
198 .get(inbound_entity_id)
199 .and_then(|inbound_instances| inbound_instances.iter().map(|id| self.get(id.deref())).collect())
200 .unwrap_or_default()
201 }
202
203 fn get_all(&self) -> Vec<ReactiveRelation> {
204 self.reactive_relation_instances.iter().map(|r| r.value().clone()).collect()
205 }
206
207 fn get_by_type(&self, ty: &RelationTypeId) -> Vec<ReactiveRelation> {
208 self.reactive_relation_instances
209 .iter()
210 .filter(|r| &r.relation_type_id() == ty)
211 .map(|r| r.value().clone())
212 .collect()
213 }
214
215 fn get_by_component(&self, ty: &ComponentTypeId) -> Vec<ReactiveRelation> {
216 self.reactive_relation_instances
217 .iter()
218 .filter(|e| e.is_a(ty))
219 .map(|e| e.value().clone())
220 .collect()
221 }
222
223 fn get_by_behaviour(&self, behaviour_ty: &BehaviourTypeId) -> Vec<ReactiveRelation> {
224 self.reactive_relation_instances
225 .iter()
226 .filter(|e| e.behaves_as(behaviour_ty))
227 .map(|e| e.value().clone())
228 .collect()
229 }
230
231 fn get_by_namespace(&self, namespace: &str) -> Vec<ReactiveRelation> {
232 self.reactive_relation_instances
233 .iter()
234 .filter(|r| r.namespace() == namespace)
235 .map(|r| r.value().clone())
236 .collect()
237 }
238
239 fn count(&self) -> usize {
240 self.reactive_relation_instances.len()
241 }
242
243 fn count_by_type(&self, ty: &RelationTypeId) -> usize {
244 self.reactive_relation_instances.iter().filter(|r| &r.relation_type_id() == ty).count()
245 }
246
247 fn count_by_component(&self, component_ty: &ComponentTypeId) -> usize {
248 self.reactive_relation_instances.iter().filter(|r| r.is_a(component_ty)).count()
249 }
250
251 fn count_by_behaviour(&self, behaviour_ty: &BehaviourTypeId) -> usize {
252 self.reactive_relation_instances.iter().filter(|r| r.behaves_as(behaviour_ty)).count()
253 }
254
255 fn get_relation_instance_ids(&self) -> Vec<RelationInstanceId> {
256 self.reactive_relation_instances.iter().map(|e| e.key().clone()).collect()
257 }
258
259 fn create_reactive_relation(&self, id: &RelationInstanceId, properties: PropertyInstances) -> Result<ReactiveRelation, ReactiveRelationCreationError> {
260 let relation_instance = RelationInstance::builder()
261 .outbound_id(id.outbound_id)
262 .ty(id.ty.clone())
263 .inbound_id(id.inbound_id)
264 .properties(properties)
265 .build();
266 self.create_reactive_instance(relation_instance)
267 }
275
276 fn create_reactive_instance(&self, reactive_relation_instance: RelationInstance) -> Result<ReactiveRelation, ReactiveRelationCreationError> {
277 let outbound = self
278 .reactive_entity_manager
279 .get(reactive_relation_instance.outbound_id)
280 .ok_or(ReactiveRelationCreationError::MissingOutboundEntityInstance(reactive_relation_instance.outbound_id))?;
281 let inbound = self
282 .reactive_entity_manager
283 .get(reactive_relation_instance.inbound_id)
284 .ok_or(ReactiveRelationCreationError::MissingInboundEntityInstance(reactive_relation_instance.inbound_id))?;
285 let ty = reactive_relation_instance.ty.clone();
286 let relation_ty = ty.relation_type_id();
287 let relation_type = self
288 .relation_type_manager
289 .get(&relation_ty)
290 .ok_or_else(|| ReactiveRelationCreationError::UnknownRelationType(relation_ty.clone()))?;
291
292 if !relation_type.outbound_type.type_name().eq("*") {
293 match &relation_type.outbound_type {
294 ComponentOrEntityTypeId::Component(component_ty) => {
295 if !outbound.components.contains(component_ty) {
296 return Err(ReactiveRelationCreationError::OutboundEntityDoesNotHaveComponent(outbound.id, component_ty.clone()));
297 }
298 }
299 ComponentOrEntityTypeId::EntityType(entity_ty) => {
300 if &outbound.ty != entity_ty {
301 return Err(ReactiveRelationCreationError::OutboundEntityIsNotOfType(outbound.id, outbound.ty.clone(), entity_ty.clone()));
302 }
303 }
304 }
305 }
306 if !relation_type.inbound_type.type_name().eq("*") {
317 match &relation_type.inbound_type {
318 ComponentOrEntityTypeId::Component(component_ty) => {
319 if !inbound.components.contains(component_ty) {
320 return Err(ReactiveRelationCreationError::InboundEntityDoesNotHaveComponent(inbound.id, component_ty.clone()));
321 }
322 }
323 ComponentOrEntityTypeId::EntityType(entity_ty) => {
324 if &inbound.ty != entity_ty {
325 return Err(ReactiveRelationCreationError::InboundEntityIsNotOfType(inbound.id, inbound.ty.clone(), entity_ty.clone()));
326 }
327 }
328 }
329 }
330
331 let relation_instance = ReactiveRelation::new_from_instance(outbound, inbound, reactive_relation_instance);
342
343 if let Some(entity_type) = self.relation_type_manager.get(&relation_instance.relation_type_id()) {
345 for component_ty in entity_type.components {
346 if let Some(component) = self.component_manager.get(&component_ty) {
347 for property_type in component.properties.iter() {
348 if let Some(mut property) = relation_instance.properties.get_mut(&property_type.name) {
349 property.set_mutability(property_type.mutability);
350 }
351 }
352 }
353 }
354 for property_type in entity_type.properties.iter() {
355 if let Some(mut property) = relation_instance.properties.get_mut(&property_type.name) {
356 property.set_mutability(property_type.mutability);
357 }
358 }
359 }
360
361 self.register_reactive_instance(relation_instance)
362 .map_err(ReactiveRelationCreationError::ReactiveRelationRegistrationError)
363 }
364
365 fn register_reactive_instance(&self, reactive_relation: ReactiveRelation) -> Result<ReactiveRelation, ReactiveRelationRegistrationError> {
366 let id = reactive_relation.id();
367 if self.reactive_relation_instances.contains_key(&id) {
368 return Err(ReactiveRelationRegistrationError::RelationInstanceAlreadyExists(id.clone()));
369 }
370 self.reactive_relation_instances.insert(id.clone(), reactive_relation.clone());
372 self.outbound_instances.insert(&id);
373 self.inbound_instances.insert(&id);
374 let relation_ty = reactive_relation.relation_type_id();
376 if let Some(components) = self.relation_type_manager.get(&relation_ty).map(|relation_type| relation_type.components) {
377 components.iter().for_each(|component_ty| {
378 reactive_relation.components.insert(component_ty.clone());
379 });
380 }
381 self.relation_component_behaviour_manager.add_behaviours_to_relation(reactive_relation.clone());
383 self.relation_behaviour_manager.add_behaviours(reactive_relation.clone());
385 self.reactive_instance_event_manager
386 .emit_event(ReactiveInstanceEvent::RelationInstanceCreated(id));
387 Ok(reactive_relation)
388
389 }
413
414 fn register_or_merge_reactive_instance(&self, relation_instance: ReactiveRelation) -> Result<ReactiveRelation, ReactiveRelationRegistrationError> {
415 let id = relation_instance.id();
416 match self.get(&id) {
417 None => self.register_reactive_instance(relation_instance),
419 Some(reactive_relation_instance) => Ok(reactive_relation_instance),
421 }
422 }
423
424 fn add_component(&self, id: &RelationInstanceId, component_ty: &ComponentTypeId) -> Result<(), ReactiveRelationComponentAddError> {
425 let Some(component) = self.component_manager.get(component_ty) else {
426 return Err(ReactiveRelationComponentAddError::ComponentNotRegistered(component_ty.clone()));
427 };
428 let Some(reactive_relation) = self.get(id) else {
429 return Err(ReactiveRelationComponentAddError::MissingInstance(id.clone()));
430 };
431 if reactive_relation.is_a(component_ty) {
432 return Err(ReactiveRelationComponentAddError::IsAlreadyA(component_ty.clone()));
433 }
434 reactive_relation.add_component_with_properties(&component);
436 self.relation_component_behaviour_manager
438 .add_behaviours_to_relation_component(reactive_relation, component);
439 Ok(())
440 }
441
442 fn remove_component(&self, id: &RelationInstanceId, component_ty: &ComponentTypeId) -> Result<(), ReactiveRelationComponentRemoveError> {
443 let Some(reactive_relation) = self.get(id) else {
444 return Err(ReactiveRelationComponentRemoveError::MissingInstance(id.clone()));
445 };
446 if !reactive_relation.is_a(component_ty) {
447 return Err(ReactiveRelationComponentRemoveError::IsNotA(component_ty.clone()));
448 }
449 let Some(component) = self.component_manager.get(component_ty) else {
450 return Err(ReactiveRelationComponentRemoveError::ComponentNotRegistered(component_ty.clone()));
451 };
452 reactive_relation.remove_component(component_ty);
454 self.relation_component_behaviour_manager
460 .remove_behaviours_from_relation_component(reactive_relation, component);
461 Ok(())
462 }
463
464 fn add_property(
465 &self,
466 relation_instance_id: &RelationInstanceId,
467 property_name: &str,
468 mutability: Mutability,
469 value: Value,
470 ) -> Result<(), ReactiveRelationPropertyAddError> {
471 match self.get(relation_instance_id) {
472 Some(relation_instance) => {
473 if relation_instance.has_property(property_name) {
474 return Err(ReactiveRelationPropertyAddError::PropertyAlreadyExists(property_name.to_string()));
475 }
476 relation_instance.add_property(property_name, mutability, value);
477 Ok(())
478 }
479 None => Err(ReactiveRelationPropertyAddError::MissingInstance(relation_instance_id.clone())),
480 }
481 }
482
483 fn remove_property(&self, relation_instance_id: &RelationInstanceId, property_name: &str) -> Result<(), ReactiveRelationPropertyRemoveError> {
484 match self.get(relation_instance_id) {
485 Some(relation_instance) => {
486 if !relation_instance.has_property(property_name) {
487 return Err(ReactiveRelationPropertyRemoveError::MissingProperty(property_name.to_string()));
488 }
489 for component_ty in relation_instance.get_components() {
490 if let Some(component) = self.component_manager.get(&component_ty) {
491 if component.has_own_property(property_name) {
492 return Err(ReactiveRelationPropertyRemoveError::PropertyInUseByComponent(component_ty.clone()));
493 }
494 }
495 }
496 relation_instance.remove_property(property_name);
497 Ok(())
498 }
499 None => Err(ReactiveRelationPropertyRemoveError::MissingInstance(relation_instance_id.clone())),
500 }
501 }
502
503 fn add_behaviour_to_all_relation_instances(&self, relation_behaviour_ty: &RelationBehaviourTypeId) {
504 for relation_instance in self.reactive_relation_instances.iter() {
505 if relation_instance.relation_type_id() == relation_behaviour_ty.relation_ty {
506 self.relation_behaviour_manager
507 .add_behaviour(relation_instance.clone(), &relation_behaviour_ty.behaviour_ty);
508 }
509 }
510 }
511
512 fn add_behaviour_to_all_relation_components(&self, component_behaviour_ty: &ComponentBehaviourTypeId) {
513 for relation_instance in self.reactive_relation_instances.iter() {
514 if relation_instance.components.contains(&component_behaviour_ty.component_ty) {
515 self.relation_component_behaviour_manager
516 .add_behaviour_to_relation_component(relation_instance.clone(), component_behaviour_ty);
517 }
518 }
519 }
520
521 fn delete(&self, id: &RelationInstanceId) -> bool {
528 if self.has(id) {
529 self.unregister_reactive_instance(id);
530 self.reactive_instance_event_manager
531 .emit_event(ReactiveInstanceEvent::RelationInstanceDeleted(id.clone()));
532 true
533 } else {
534 false
535 }
536 }
537
538 fn unregister_reactive_instance(&self, id: &RelationInstanceId) {
539 match self.get(id) {
540 Some(relation_instance) => {
541 self.relation_behaviour_manager.remove_behaviours(relation_instance.clone());
543 self.relation_component_behaviour_manager.remove_behaviours_from_relation(relation_instance);
545 }
546 None => {
547 self.relation_behaviour_manager.remove_behaviours_by_key(id);
549 self.relation_component_behaviour_manager.remove_behaviours_by_key(id);
551 }
552 }
553 self.outbound_instances.remove(id);
554 self.inbound_instances.remove(id);
555 self.reactive_relation_instances.remove(id);
556 }
557
558 fn handle_component_added_events(&self) {
559 let component_manager = self.component_manager.clone();
560 let relation_component_behaviour_manager = self.relation_component_behaviour_manager.clone();
561 let reactive_relation_instances = self.reactive_relation_instances.clone();
562 let running = self.running.clone();
563 if let Some(receiver) = self.event_channels.receiver(&HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED) {
564 tokio::task::spawn(async move {
565 while running.load(Ordering::Relaxed) {
566 match receiver.try_recv() {
567 Ok(type_definition_component_event) => {
568 if let Ok(type_definition_component) = TypeDefinitionComponent::try_from(type_definition_component_event.clone()) {
569 if let Some(component) = component_manager.get(&type_definition_component.component_ty) {
570 for reactive_relation_instance in reactive_relation_instances
571 .iter()
572 .filter(|relation_instance| {
573 relation_instance.relation_type_id().type_definition() == type_definition_component.type_definition
574 })
575 .map(|relation_instance| relation_instance.value().clone())
576 {
577 reactive_relation_instance.add_component_with_properties(&component);
578 relation_component_behaviour_manager
579 .add_behaviours_to_relation_component(reactive_relation_instance, component.clone());
580 }
581 }
582 }
583 }
584 Err(_) => {
585 sleep(Duration::from_millis(100)).await;
586 }
587 }
588 }
589 });
590 }
591 }
592
593 fn handle_component_removed_events(&self) {
594 let component_manager = self.component_manager.clone();
595 let relation_component_behaviour_manager = self.relation_component_behaviour_manager.clone();
596 let reactive_relation_instances = self.reactive_relation_instances.clone();
597 let running = self.running.clone();
598 if let Some(receiver) = self.event_channels.receiver(&HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED) {
599 tokio::spawn(async move {
600 while running.load(Ordering::Relaxed) {
601 match receiver.try_recv() {
602 Ok(type_definition_component_event) => {
603 if let Ok(type_definition_component) = TypeDefinitionComponent::try_from(type_definition_component_event.clone()) {
604 if let Some(component) = component_manager.get(&type_definition_component.component_ty) {
605 for reactive_relation_instance in reactive_relation_instances
606 .iter()
607 .filter(|relation_instance| {
608 relation_instance.relation_type_id().type_definition() == type_definition_component.type_definition
609 })
610 .map(|relation_instance| relation_instance.value().clone())
611 {
612 reactive_relation_instance.remove_component(&component.ty);
613 relation_component_behaviour_manager
614 .remove_behaviours_from_relation_component(reactive_relation_instance, component.clone());
615 }
616 }
617 }
618 }
619 Err(_) => {
620 sleep(Duration::from_millis(100)).await;
621 }
622 }
623 }
624 });
625 }
626 }
627
628 fn handle_property_added_events(&self) {
629 let relation_type_manager = self.relation_type_manager.clone();
630 let reactive_relation_instances = self.reactive_relation_instances.clone();
631 let running = self.running.clone();
632 if let Some(receiver) = self.event_channels.receiver(&HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED) {
633 tokio::spawn(async move {
634 while running.load(Ordering::Relaxed) {
635 match receiver.try_recv() {
636 Ok(type_definition_property_event) => {
637 if let Ok(type_definition_property) = TypeDefinitionProperty::try_from(type_definition_property_event.clone()) {
638 if let Ok(relation_ty) = RelationTypeId::try_from(&type_definition_property.type_definition) {
639 if let Some(relation_type) = relation_type_manager.get(&relation_ty) {
640 for reactive_relation_instance in reactive_relation_instances
641 .iter()
642 .filter(|relation_instance| relation_instance.relation_type_id() == relation_ty)
643 .map(|relation_instance| relation_instance.value().clone())
644 {
645 if let Some(property_type) = relation_type.get_own_property(&type_definition_property.property) {
646 reactive_relation_instance.add_property_by_type(&property_type);
647 }
648 }
649 }
650 }
651 }
652 }
653 Err(_) => {
654 sleep(Duration::from_millis(100)).await;
655 }
656 }
657 }
658 });
659 }
660 }
661
662 fn handle_property_removed_events(&self) {
663 let reactive_relation_instances = self.reactive_relation_instances.clone();
664 let running = self.running.clone();
665 if let Some(receiver) = self.event_channels.receiver(&HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED) {
666 tokio::spawn(async move {
667 while running.load(Ordering::Relaxed) {
668 match receiver.try_recv() {
669 Ok(type_definition_property_event) => {
670 if let Ok(type_definition_property) = TypeDefinitionProperty::try_from(type_definition_property_event.clone()) {
671 for reactive_relation_instance in reactive_relation_instances
672 .iter()
673 .filter(|relation_instance| {
674 relation_instance.relation_type_id().type_definition() == type_definition_property.type_definition
675 })
676 .map(|relation_instance| relation_instance.value().clone())
677 {
678 reactive_relation_instance.remove_property(&type_definition_property.property);
679 }
680 }
681 }
682 Err(_) => {
683 sleep(Duration::from_millis(100)).await;
684 }
685 }
686 }
687 });
688 }
689 }
690}
691
692impl TypeSystemEventSubscriber for ReactiveRelationManagerImpl {
693 fn subscribe_type_system_event(&self, system_event_type: TypeSystemEventTypes, handle_id: u128) {
694 if let Some(entity_instance) = self.type_system_event_manager.get_type_system_event_instance(system_event_type) {
695 if let Some(sender) = self.event_channels.sender(&handle_id) {
696 entity_instance.observe_with_handle(
697 &EVENT.property_name(),
698 move |v| {
699 let _ = sender.send(v.clone());
700 },
701 handle_id,
702 );
703 }
704 }
705 }
706
707 fn unsubscribe_type_system_event(&self, system_event_type: TypeSystemEventTypes, handle_id: u128) {
708 if let Some(entity_instance) = self.type_system_event_manager.get_type_system_event_instance(system_event_type) {
709 entity_instance.remove_observer(&EVENT.property_name(), handle_id);
710 }
711 }
712}
713
714#[async_trait]
715impl Lifecycle for ReactiveRelationManagerImpl {
716 async fn post_init(&self) {
717 self.subscribe_type_system_event(TypeSystemEventTypes::RelationTypeComponentAdded, HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED);
718 self.subscribe_type_system_event(TypeSystemEventTypes::RelationTypeComponentRemoved, HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED);
719 self.subscribe_type_system_event(TypeSystemEventTypes::RelationTypePropertyAdded, HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED);
720 self.subscribe_type_system_event(TypeSystemEventTypes::RelationTypePropertyRemoved, HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED);
721
722 self.handle_component_added_events();
723 self.handle_component_removed_events();
724 self.handle_property_added_events();
725 self.handle_property_removed_events();
726 }
727
728 async fn pre_shutdown(&self) {
729 self.running.store(false, Ordering::Relaxed);
730
731 self.unsubscribe_type_system_event(TypeSystemEventTypes::RelationTypePropertyRemoved, HANDLE_ID_RELATION_TYPE_PROPERTY_REMOVED);
732 self.unsubscribe_type_system_event(TypeSystemEventTypes::RelationTypePropertyAdded, HANDLE_ID_RELATION_TYPE_PROPERTY_ADDED);
733 self.unsubscribe_type_system_event(TypeSystemEventTypes::RelationTypeComponentRemoved, HANDLE_ID_RELATION_TYPE_COMPONENT_REMOVED);
734 self.unsubscribe_type_system_event(TypeSystemEventTypes::RelationTypeComponentAdded, HANDLE_ID_RELATION_TYPE_COMPONENT_ADDED);
735 }
736}