reactive_graph_reactive_model_impl/flows/
reactive_flow.rs

1use std::collections::HashMap;
2use std::fmt;
3use std::fmt::Display;
4use std::fmt::Formatter;
5use std::ops::Deref;
6use std::sync::Arc;
7use std::sync::RwLock;
8
9use crate::ReactiveEntity;
10use crate::ReactiveRelation;
11use reactive_graph_behaviour_model_api::BehaviourTypeId;
12use reactive_graph_behaviour_model_api::BehaviourTypesContainer;
13use reactive_graph_graph::Component;
14use reactive_graph_graph::ComponentContainer;
15use reactive_graph_graph::ComponentTypeId;
16use reactive_graph_graph::ComponentTypeIds;
17use reactive_graph_graph::CreateFlowInstanceError;
18use reactive_graph_graph::EntityInstance;
19use reactive_graph_graph::EntityTypeId;
20use reactive_graph_graph::FlowInstance;
21use reactive_graph_graph::Mutability;
22use reactive_graph_graph::NamespacedTypeGetter;
23use reactive_graph_graph::PropertyInstanceGetter;
24use reactive_graph_graph::PropertyInstanceSetter;
25use reactive_graph_graph::PropertyType;
26use reactive_graph_graph::RelationInstance;
27use reactive_graph_graph::RelationInstanceId;
28use reactive_graph_graph::TypeDefinition;
29use reactive_graph_graph::TypeDefinitionGetter;
30use reactive_graph_graph::instances::named::NamedInstanceContainer;
31use reactive_graph_reactive_model_api::ReactiveFlowConstructionError;
32use reactive_graph_reactive_model_api::ReactiveInstance;
33use reactive_graph_reactive_model_api::ReactivePropertyContainer;
34use serde_json::Map;
35use serde_json::Value;
36use uuid::Uuid;
37
38pub struct ReactiveFlowInstance {
39    /// The id of the flow corresponds to the id of the wrapper entity instance.
40    pub id: Uuid,
41
42    /// The type definition of the entity type of the wrapper entity instance.
43    pub ty: EntityTypeId,
44
45    /// The name of the flow instance.
46    pub name: String,
47
48    /// Textual description of the flow instance.
49    pub description: String,
50
51    /// The flow contains entity instances. The entity instance may also
52    /// be contained in other flows.
53    pub entity_instances: RwLock<HashMap<Uuid, ReactiveEntity>>,
54
55    /// The flow contains relation instances. The relation instances may also
56    /// be contained in other flows.
57    pub relation_instances: RwLock<HashMap<RelationInstanceId, ReactiveRelation>>,
58
59    /// List of entities that has been added since creation of the flow.
60    pub entities_added: RwLock<Vec<Uuid>>,
61
62    /// List of entities that has been removed since creation of the flow.
63    pub entities_removed: RwLock<Vec<Uuid>>,
64
65    /// List of relations that has been added since creation of the flow.
66    pub relations_added: RwLock<Vec<RelationInstanceId>>,
67
68    /// List of relations that has been removed since creation of the flow.
69    pub relations_removed: RwLock<Vec<RelationInstanceId>>,
70}
71
72impl ReactiveFlowInstance {
73    pub fn new(wrapper_entity_instance: ReactiveEntity) -> Self {
74        let mut entity_instances = HashMap::new();
75        entity_instances.insert(wrapper_entity_instance.id, wrapper_entity_instance.clone());
76        ReactiveFlowInstance {
77            id: wrapper_entity_instance.id,
78            ty: wrapper_entity_instance.ty.clone(),
79            name: String::new(),
80            description: String::new(),
81            entity_instances: RwLock::new(entity_instances),
82            relation_instances: RwLock::new(HashMap::new()),
83            entities_added: RwLock::new(Vec::new()),
84            entities_removed: RwLock::new(Vec::new()),
85            relations_added: RwLock::new(Vec::new()),
86            relations_removed: RwLock::new(Vec::new()),
87        }
88    }
89
90    pub fn has_entity(&self, entity_instance: ReactiveEntity) -> bool {
91        self.entity_instances.read().unwrap().contains_key(&entity_instance.id)
92    }
93
94    pub fn has_entity_by_id(&self, id: Uuid) -> bool {
95        self.entity_instances.read().unwrap().contains_key(&id)
96    }
97
98    pub fn get_entity(&self, id: Uuid) -> Option<ReactiveEntity> {
99        let reader = self.entity_instances.read().unwrap();
100        reader.get(&id).cloned()
101    }
102
103    pub fn get_wrapper_entity_instance(&self) -> Option<ReactiveEntity> {
104        self.get_entity(self.id)
105    }
106
107    pub fn add_entity(&self, entity_instance: ReactiveEntity) {
108        let id = entity_instance.id;
109        if !self.has_entity_by_id(id) {
110            self.entity_instances.write().unwrap().insert(id, entity_instance);
111            self.entities_added.write().unwrap().push(id);
112            // self.entities_removed.write().unwrap().remove(entity_instance.id);
113        }
114    }
115
116    pub fn remove_entity(&self, id: Uuid) {
117        self.entity_instances.write().unwrap().remove(&id);
118        self.entities_removed.write().unwrap().push(id);
119    }
120
121    pub fn has_relation(&self, relation_instance: ReactiveRelation) -> bool {
122        self.relation_instances.read().unwrap().contains_key(&relation_instance.id())
123    }
124
125    pub fn has_relation_by_key(&self, relation_instance_id: &RelationInstanceId) -> bool {
126        self.relation_instances.read().unwrap().contains_key(relation_instance_id)
127    }
128
129    pub fn get_relation(&self, relation_instance_id: &RelationInstanceId) -> Option<ReactiveRelation> {
130        let reader = self.relation_instances.read().unwrap();
131        reader.get(relation_instance_id).cloned()
132    }
133
134    pub fn add_relation(&self, relation_instance: ReactiveRelation) {
135        let id = relation_instance.id();
136        if !self.has_relation_by_key(&id) {
137            self.relation_instances.write().unwrap().insert(id.clone(), relation_instance);
138            self.relations_added.write().unwrap().push(id);
139        }
140    }
141
142    pub fn remove_relation(&self, relation_instance_id: &RelationInstanceId) {
143        self.relation_instances.write().unwrap().remove(relation_instance_id);
144        self.relations_removed.write().unwrap().push(relation_instance_id.clone());
145    }
146
147    pub fn tick(&self) {
148        let reader = self.entity_instances.read().unwrap();
149        for (_, entity_instance) in reader.iter() {
150            entity_instance.tick();
151        }
152    }
153}
154
155impl From<ReactiveEntity> for ReactiveFlowInstance {
156    fn from(wrapper_entity_instance: ReactiveEntity) -> Self {
157        ReactiveFlowInstance::new(wrapper_entity_instance)
158    }
159}
160
161impl TryFrom<FlowInstance> for ReactiveFlowInstance {
162    type Error = ReactiveFlowConstructionError;
163
164    fn try_from(flow_instance: FlowInstance) -> Result<Self, ReactiveFlowConstructionError> {
165        let flow_id = flow_instance.id;
166        let mut entity_instances = HashMap::new();
167        let mut wrapper = None;
168        for (id, entity_instance) in flow_instance.entity_instances {
169            let reactive_entity = ReactiveEntity::from(entity_instance);
170            entity_instances.insert(id, reactive_entity.clone());
171            if id == flow_id {
172                wrapper = Some(reactive_entity.clone());
173            }
174        }
175        if wrapper.is_none() {
176            return Err(ReactiveFlowConstructionError::MissingWrapperInstance);
177        }
178        let mut relation_instances = HashMap::new();
179        for (id, relation_instance) in flow_instance.relation_instances {
180            // let id = relation_instance.id();
181            let outbound = entity_instances.get(&relation_instance.outbound_id);
182            if outbound.is_none() {
183                // outbound entity missing
184                return Err(ReactiveFlowConstructionError::MissingOutboundEntityInstance(relation_instance.outbound_id));
185            }
186            let inbound = entity_instances.get(&relation_instance.inbound_id);
187            if inbound.is_none() {
188                // inbound entity missing
189                return Err(ReactiveFlowConstructionError::MissingInboundEntityInstance(relation_instance.inbound_id));
190            }
191            let outbound = outbound.unwrap().clone();
192            let inbound = inbound.unwrap().clone();
193            let reactive_relation = ReactiveRelation::new_from_instance(outbound, inbound, relation_instance.clone());
194            relation_instances.insert(id, reactive_relation);
195        }
196        Ok(ReactiveFlowInstance {
197            id: flow_id,
198            ty: flow_instance.ty,
199            name: flow_instance.name.clone(),
200            description: flow_instance.description.clone(),
201            entity_instances: RwLock::new(entity_instances),
202            relation_instances: RwLock::new(relation_instances),
203            // wrapper: wrapper.unwrap(),
204            entities_added: RwLock::new(Vec::new()),
205            entities_removed: RwLock::new(Vec::new()),
206            relations_added: RwLock::new(Vec::new()),
207            relations_removed: RwLock::new(Vec::new()),
208        })
209    }
210}
211
212impl PropertyInstanceGetter for ReactiveFlowInstance {
213    fn get<S: Into<String>>(&self, property_name: S) -> Option<Value> {
214        self.get_entity(self.id).and_then(|e| e.properties.get(&property_name.into()).map(|p| p.get()))
215    }
216
217    fn as_bool<S: Into<String>>(&self, property_name: S) -> Option<bool> {
218        self.get_entity(self.id)
219            .and_then(|e| e.properties.get(&property_name.into()).and_then(|p| p.as_bool()))
220    }
221
222    fn as_u64<S: Into<String>>(&self, property_name: S) -> Option<u64> {
223        self.get_entity(self.id)
224            .and_then(|e| e.properties.get(&property_name.into()).and_then(|p| p.as_u64()))
225    }
226
227    fn as_i64<S: Into<String>>(&self, property_name: S) -> Option<i64> {
228        self.get_entity(self.id)
229            .and_then(|e| e.properties.get(&property_name.into()).and_then(|p| p.as_i64()))
230    }
231
232    fn as_f64<S: Into<String>>(&self, property_name: S) -> Option<f64> {
233        self.get_entity(self.id)
234            .and_then(|e| e.properties.get(&property_name.into()).and_then(|p| p.as_f64()))
235    }
236
237    fn as_string<S: Into<String>>(&self, property_name: S) -> Option<String> {
238        self.get_entity(self.id)
239            .and_then(|e| e.properties.get(&property_name.into()).and_then(|p| p.as_string()))
240    }
241
242    fn as_array<S: Into<String>>(&self, property_name: S) -> Option<Vec<Value>> {
243        self.get_entity(self.id)
244            .and_then(|e| e.properties.get(&property_name.into()).and_then(|p| p.as_array()))
245    }
246
247    fn as_object<S: Into<String>>(&self, property_name: S) -> Option<Map<String, Value>> {
248        self.get_entity(self.id)
249            .and_then(|e| e.properties.get(&property_name.into()).and_then(|p| p.as_object()))
250    }
251}
252
253impl PropertyInstanceSetter for ReactiveFlowInstance {
254    fn set_checked<S: Into<String>>(&self, property_name: S, value: Value) {
255        if let Some(instance) = self.get_entity(self.id) {
256            if let Some(instance) = instance.properties.get(&property_name.into()) {
257                instance.set_checked(value);
258            }
259        }
260    }
261
262    fn set<S: Into<String>>(&self, property_name: S, value: Value) {
263        if let Some(instance) = self.get_entity(self.id) {
264            if let Some(instance) = instance.properties.get(&property_name.into()) {
265                instance.set(value);
266            }
267        }
268    }
269
270    fn set_no_propagate_checked<S: Into<String>>(&self, property_name: S, value: Value) {
271        if let Some(instance) = self.get_entity(self.id) {
272            if let Some(instance) = instance.properties.get(&property_name.into()) {
273                instance.set_no_propagate_checked(value);
274            }
275        }
276    }
277
278    fn set_no_propagate<S: Into<String>>(&self, property_name: S, value: Value) {
279        if let Some(instance) = self.get_entity(self.id) {
280            if let Some(instance) = instance.properties.get(&property_name.into()) {
281                instance.set_no_propagate(value);
282            }
283        }
284    }
285
286    fn mutability<S: Into<String>>(&self, property_name: S) -> Option<Mutability> {
287        self.get_entity(self.id)
288            .and_then(|instance| instance.properties.get(&property_name.into()).map(|p| p.value().mutability))
289    }
290
291    fn set_mutability<S: Into<String>>(&self, property_name: S, mutability: Mutability) {
292        if let Some(instance) = self.get_entity(self.id) {
293            if let Some(mut property_instance) = instance.properties.get_mut(&property_name.into()) {
294                property_instance.set_mutability(mutability);
295            }
296        }
297    }
298
299    // TODO: fn set(&self, Map<String, Value>
300    // TODO: Set values transactional: first set all values internally, then send all affected streams
301}
302
303impl NamespacedTypeGetter for ReactiveFlowInstance {
304    fn namespace(&self) -> String {
305        self.ty.namespace()
306    }
307
308    fn type_name(&self) -> String {
309        self.ty.type_name()
310    }
311}
312
313impl TypeDefinitionGetter for ReactiveFlowInstance {
314    fn type_definition(&self) -> TypeDefinition {
315        self.ty.type_definition()
316    }
317}
318
319impl TypeDefinitionGetter for &ReactiveFlowInstance {
320    fn type_definition(&self) -> TypeDefinition {
321        self.ty.type_definition()
322    }
323}
324
325impl Display for ReactiveFlowInstance {
326    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
327        write!(f, "{}__{}", &self.ty, self.id)
328    }
329}
330
331impl TryFrom<ReactiveFlowInstance> for FlowInstance {
332    type Error = CreateFlowInstanceError;
333
334    fn try_from(reactive_flow: ReactiveFlowInstance) -> Result<Self, CreateFlowInstanceError> {
335        let wrapper = reactive_flow.get_entity(reactive_flow.id);
336        if wrapper.is_none() {
337            return Err(CreateFlowInstanceError::CantGetWrapperEntity(reactive_flow.id));
338        }
339        let wrapper = wrapper.unwrap();
340        let entity_instance: EntityInstance = wrapper.clone().into();
341        let mut flow_instance = FlowInstance::from(entity_instance);
342        flow_instance.description = wrapper.description.clone();
343        reactive_flow.entity_instances.read().unwrap().iter().for_each(|(_, reactive_entity)| {
344            if reactive_entity.id != reactive_flow.id {
345                let entity_instance: EntityInstance = reactive_entity.into();
346                flow_instance.entity_instances.push(entity_instance);
347            }
348        });
349        reactive_flow.relation_instances.read().unwrap().iter().for_each(|(_, reactive_instance)| {
350            let relation_instance = RelationInstance::from(reactive_instance);
351            flow_instance.relation_instances.push(relation_instance);
352        });
353        Ok(flow_instance)
354    }
355}
356
357impl TryFrom<&ReactiveFlowInstance> for FlowInstance {
358    type Error = CreateFlowInstanceError;
359
360    fn try_from(reactive_flow: &ReactiveFlowInstance) -> Result<Self, CreateFlowInstanceError> {
361        let wrapper = reactive_flow.get_entity(reactive_flow.id);
362        if wrapper.is_none() {
363            return Err(CreateFlowInstanceError::CantGetWrapperEntity(reactive_flow.id));
364        }
365        let wrapper = wrapper.unwrap();
366        let entity_instance: EntityInstance = wrapper.clone().into();
367        let mut flow_instance = FlowInstance::from(entity_instance);
368        flow_instance.description = wrapper.description.clone();
369        reactive_flow.entity_instances.read().unwrap().iter().for_each(|(_, reactive_entity)| {
370            if reactive_entity.id != reactive_flow.id {
371                let entity_instance: EntityInstance = reactive_entity.into();
372                flow_instance.entity_instances.push(entity_instance);
373            }
374        });
375        reactive_flow.relation_instances.read().unwrap().iter().for_each(|(_, reactive_relation)| {
376            let relation_instance: RelationInstance = reactive_relation.into();
377            flow_instance.relation_instances.push(relation_instance);
378        });
379        Ok(flow_instance)
380    }
381}
382
383#[derive(Clone)]
384pub struct ReactiveFlow(Arc<ReactiveFlowInstance>);
385
386impl ReactiveFlow {
387    pub fn new(wrapper_entity_instance: ReactiveEntity) -> Self {
388        ReactiveFlowInstance::new(wrapper_entity_instance).into()
389    }
390}
391
392impl NamedInstanceContainer for ReactiveFlow {
393    fn name(&self) -> String {
394        self.name.clone()
395    }
396
397    fn description(&self) -> String {
398        self.description.clone()
399    }
400}
401
402impl Deref for ReactiveFlow {
403    type Target = Arc<ReactiveFlowInstance>;
404
405    fn deref(&self) -> &Self::Target {
406        &self.0
407    }
408}
409
410impl ReactiveInstance<Uuid> for ReactiveFlow {
411    fn id(&self) -> Uuid {
412        self.id
413    }
414}
415
416impl ReactivePropertyContainer for ReactiveFlow {
417    fn tick_checked(&self) {
418        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
419            wrapper_entity_instance.tick_checked();
420        }
421    }
422
423    fn tick(&self) {
424        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
425            wrapper_entity_instance.tick();
426        }
427    }
428
429    fn has_property(&self, name: &str) -> bool {
430        self.get_wrapper_entity_instance()
431            .map(|wrapper_entity| wrapper_entity.has_property(name))
432            .unwrap_or_default()
433    }
434
435    fn add_property<S: Into<String>>(&self, name: S, mutability: Mutability, value: Value) {
436        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
437            wrapper_entity_instance.add_property(name, mutability, value);
438        }
439    }
440
441    fn add_property_by_type(&self, property: &PropertyType) {
442        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
443            wrapper_entity_instance.add_property_by_type(property);
444        }
445    }
446
447    fn remove_property<S: Into<String>>(&self, name: S) {
448        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
449            wrapper_entity_instance.remove_property(name);
450        }
451    }
452
453    fn observe_with_handle<F>(&self, name: &str, subscriber: F, handle_id: u128)
454    where
455        F: FnMut(&Value) + 'static + Send,
456    {
457        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
458            wrapper_entity_instance.observe_with_handle(name, subscriber, handle_id);
459        }
460    }
461
462    fn remove_observer(&self, name: &str, handle_id: u128) {
463        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
464            wrapper_entity_instance.remove_observer(name, handle_id);
465        }
466    }
467
468    fn remove_observers(&self, name: &str) {
469        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
470            wrapper_entity_instance.remove_observers(name);
471        }
472    }
473
474    fn remove_all_observers(&self) {
475        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
476            wrapper_entity_instance.remove_all_observers();
477        }
478    }
479}
480
481impl ComponentContainer for ReactiveFlow {
482    fn get_components(&self) -> ComponentTypeIds {
483        self.get_wrapper_entity_instance()
484            .map(|wrapper_entity| wrapper_entity.get_components())
485            .unwrap_or_default()
486    }
487
488    fn add_component(&self, ty: ComponentTypeId) {
489        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
490            wrapper_entity_instance.add_component(ty);
491        }
492    }
493
494    fn add_component_with_properties(&self, component: &Component) {
495        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
496            wrapper_entity_instance.add_component_with_properties(component);
497        }
498    }
499
500    fn remove_component(&self, ty: &ComponentTypeId) {
501        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
502            wrapper_entity_instance.remove_component(ty);
503        }
504    }
505
506    fn is_a(&self, ty: &ComponentTypeId) -> bool {
507        self.get_wrapper_entity_instance()
508            .map(|wrapper_entity| wrapper_entity.is_a(ty))
509            .unwrap_or_default()
510    }
511}
512
513impl BehaviourTypesContainer for ReactiveFlow {
514    fn get_behaviours(&self) -> Vec<BehaviourTypeId> {
515        self.get_wrapper_entity_instance()
516            .map(|wrapper_entity| wrapper_entity.get_behaviours())
517            .unwrap_or_default()
518    }
519
520    fn add_behaviour(&self, ty: BehaviourTypeId) {
521        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
522            wrapper_entity_instance.add_behaviour(ty);
523        }
524    }
525
526    fn remove_behaviour(&self, ty: &BehaviourTypeId) {
527        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
528            wrapper_entity_instance.remove_behaviour(ty);
529        }
530    }
531
532    fn behaves_as(&self, ty: &BehaviourTypeId) -> bool {
533        self.get_wrapper_entity_instance()
534            .map(|wrapper_entity| wrapper_entity.behaves_as(ty))
535            .unwrap_or_default()
536    }
537}
538
539impl PropertyInstanceGetter for ReactiveFlow {
540    fn get<S: Into<String>>(&self, property_name: S) -> Option<Value> {
541        self.get_wrapper_entity_instance().and_then(|wrapper_entity| wrapper_entity.get(property_name))
542    }
543
544    fn as_bool<S: Into<String>>(&self, property_name: S) -> Option<bool> {
545        self.get_wrapper_entity_instance()
546            .and_then(|wrapper_entity| wrapper_entity.as_bool(property_name))
547    }
548
549    fn as_u64<S: Into<String>>(&self, property_name: S) -> Option<u64> {
550        self.get_wrapper_entity_instance()
551            .and_then(|wrapper_entity| wrapper_entity.as_u64(property_name))
552    }
553
554    fn as_i64<S: Into<String>>(&self, property_name: S) -> Option<i64> {
555        self.get_wrapper_entity_instance()
556            .and_then(|wrapper_entity| wrapper_entity.as_i64(property_name))
557    }
558
559    fn as_f64<S: Into<String>>(&self, property_name: S) -> Option<f64> {
560        self.get_wrapper_entity_instance()
561            .and_then(|wrapper_entity| wrapper_entity.as_f64(property_name))
562    }
563
564    fn as_string<S: Into<String>>(&self, property_name: S) -> Option<String> {
565        self.get_wrapper_entity_instance()
566            .and_then(|wrapper_entity| wrapper_entity.as_string(property_name))
567    }
568
569    fn as_array<S: Into<String>>(&self, property_name: S) -> Option<Vec<Value>> {
570        self.get_wrapper_entity_instance()
571            .and_then(|wrapper_entity| wrapper_entity.as_array(property_name))
572    }
573
574    fn as_object<S: Into<String>>(&self, property_name: S) -> Option<Map<String, Value>> {
575        self.get_wrapper_entity_instance()
576            .and_then(|wrapper_entity| wrapper_entity.as_object(property_name))
577    }
578}
579
580impl PropertyInstanceSetter for ReactiveFlow {
581    fn set_checked<S: Into<String>>(&self, property_name: S, value: Value) {
582        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
583            wrapper_entity_instance.set_checked(property_name, value);
584        }
585    }
586
587    fn set<S: Into<String>>(&self, property_name: S, value: Value) {
588        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
589            wrapper_entity_instance.set(property_name, value);
590        }
591    }
592
593    fn set_no_propagate_checked<S: Into<String>>(&self, property_name: S, value: Value) {
594        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
595            wrapper_entity_instance.set_no_propagate_checked(property_name, value);
596        }
597    }
598
599    fn set_no_propagate<S: Into<String>>(&self, property_name: S, value: Value) {
600        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
601            wrapper_entity_instance.set_no_propagate(property_name, value);
602        }
603    }
604
605    fn mutability<S: Into<String>>(&self, property_name: S) -> Option<Mutability> {
606        self.get_wrapper_entity_instance()
607            .and_then(|wrapper_entity| wrapper_entity.mutability(property_name))
608    }
609
610    fn set_mutability<S: Into<String>>(&self, property_name: S, mutability: Mutability) {
611        if let Some(wrapper_entity_instance) = self.get_wrapper_entity_instance() {
612            wrapper_entity_instance.set_mutability(property_name, mutability);
613        }
614    }
615
616    // TODO: fn set(&self, Map<String, Value>
617    // TODO: Set values transactional: first set all values internally, then send all affected streams
618}
619
620impl NamespacedTypeGetter for ReactiveFlow {
621    fn namespace(&self) -> String {
622        self.ty.namespace()
623    }
624
625    fn type_name(&self) -> String {
626        self.ty.type_name()
627    }
628}
629
630impl TypeDefinitionGetter for ReactiveFlow {
631    fn type_definition(&self) -> TypeDefinition {
632        self.ty.type_definition()
633    }
634}
635
636impl Display for ReactiveFlow {
637    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
638        write!(f, "{}__{}", &self.ty, self.id)
639    }
640}
641
642impl From<ReactiveFlowInstance> for ReactiveFlow {
643    fn from(reactive_flow: ReactiveFlowInstance) -> Self {
644        ReactiveFlow(Arc::new(reactive_flow))
645    }
646}
647
648impl TryFrom<FlowInstance> for ReactiveFlow {
649    type Error = ReactiveFlowConstructionError;
650
651    fn try_from(flow_instance: FlowInstance) -> Result<Self, Self::Error> {
652        ReactiveFlowInstance::try_from(flow_instance).map(|reactive_flow| reactive_flow.into())
653    }
654}
655
656impl TryFrom<ReactiveFlow> for FlowInstance {
657    type Error = CreateFlowInstanceError;
658
659    fn try_from(reactive_flow: ReactiveFlow) -> Result<Self, CreateFlowInstanceError> {
660        FlowInstance::try_from(reactive_flow.0.deref())
661    }
662}