reactive_graph_reactive_model_impl/properties/
reactive_property.rs

1use std::ops::Deref;
2use std::ops::DerefMut;
3use std::sync::Arc;
4use std::sync::RwLock;
5
6use dashmap::DashMap;
7use dashmap::iter::OwningIter;
8use serde_json::Map;
9use serde_json::Value;
10
11use crate::Stream;
12
13use reactive_graph_graph::ContainerPropertyInstance;
14use reactive_graph_graph::Mutability;
15use reactive_graph_graph::Mutability::Mutable;
16use reactive_graph_graph::PropertyInstances;
17
18pub struct ReactiveProperty<IdType: Clone> {
19    /// The parent identifier (entity: uuid, relation: RelationInstanceId)
20    pub id: IdType,
21
22    /// Property name
23    pub name: String,
24
25    /// The property instance is mutable or immutable.
26    pub mutability: Mutability,
27
28    /// The reactive stream
29    pub stream: Arc<RwLock<Stream<'static, Value>>>,
30
31    /// Store the current value
32    pub value: RwLock<Value>,
33}
34
35impl<IdType: Clone> ReactiveProperty<IdType> {
36    pub fn new<S: Into<String>>(id: IdType, name: S, mutability: Mutability, value: Value) -> ReactiveProperty<IdType> {
37        ReactiveProperty {
38            id,
39            name: name.into(),
40            mutability,
41            stream: Arc::new(RwLock::new(Stream::new())),
42            value: RwLock::new(value),
43        }
44    }
45
46    pub fn get(&self) -> Value {
47        self.value.read().unwrap().clone()
48    }
49
50    pub fn set_checked(&self, value: Value) {
51        if self.mutability == Mutable {
52            self.set(value);
53        }
54    }
55
56    pub fn set(&self, value: Value) {
57        let mut writer = self.value.write().unwrap();
58        *writer.deref_mut() = value.clone();
59        self.stream.read().unwrap().send(&value);
60    }
61
62    pub fn set_no_propagate_checked(&self, value: Value) {
63        if self.mutability == Mutable {
64            self.set_no_propagate(value);
65        }
66    }
67
68    pub fn set_no_propagate(&self, value: Value) {
69        let mut writer = self.value.write().unwrap();
70        *writer.deref_mut() = value;
71    }
72
73    /// Send a value down the stream, but does not change the current value
74    pub fn send(&self, signal: &Value) {
75        self.stream.read().unwrap().send(signal);
76    }
77
78    /// Resend the current value manually if mutable
79    pub fn tick_checked(&self) {
80        if self.mutability == Mutable {
81            self.tick();
82        }
83    }
84
85    /// Resend the current value manually
86    pub fn tick(&self) {
87        // println!("tick {}::{}", self.id, self.name);
88        let value = self.value.read().unwrap().deref().clone();
89        self.stream.read().unwrap().send(&value);
90    }
91
92    pub fn set_mutability(&mut self, mutability: Mutability) {
93        self.mutability = mutability;
94    }
95
96    pub fn as_bool(&self) -> Option<bool> {
97        self.get().as_bool()
98    }
99
100    pub fn as_u64(&self) -> Option<u64> {
101        self.get().as_u64()
102    }
103
104    pub fn as_i64(&self) -> Option<i64> {
105        self.get().as_i64()
106    }
107
108    pub fn as_f64(&self) -> Option<f64> {
109        self.get().as_f64()
110    }
111
112    pub fn as_string(&self) -> Option<String> {
113        self.get().as_str().map(String::from)
114    }
115
116    pub fn as_array(&self) -> Option<Vec<Value>> {
117        self.get().as_array().cloned()
118    }
119
120    pub fn as_object(&self) -> Option<Map<String, Value>> {
121        self.get().as_object().cloned()
122    }
123}
124
125impl<IdType: Clone> PartialEq for ReactiveProperty<IdType> {
126    fn eq(&self, other: &Self) -> bool {
127        self.value.read().unwrap().deref() == other.value.read().unwrap().deref()
128    }
129}
130
131impl<IdType: Clone> From<ReactiveProperty<IdType>> for ContainerPropertyInstance<IdType> {
132    fn from(property: ReactiveProperty<IdType>) -> Self {
133        let reader = property.value.read().unwrap();
134        ContainerPropertyInstance::new(property.id, property.name, reader.clone())
135    }
136}
137
138impl<IdType: Clone> From<&ReactiveProperty<IdType>> for ContainerPropertyInstance<IdType> {
139    fn from(property: &ReactiveProperty<IdType>) -> Self {
140        let reader = property.value.read().unwrap();
141        ContainerPropertyInstance::new(property.id.clone(), property.name.clone(), reader.clone())
142    }
143}
144
145// #[derive(Default)]
146pub struct ReactiveProperties<IdType: Clone>(DashMap<String, ReactiveProperty<IdType>>);
147
148impl<IdType: Clone> ReactiveProperties<IdType> {
149    /// Constructs an empty reactive properties container.
150    pub fn new() -> Self {
151        ReactiveProperties(DashMap::new())
152    }
153
154    /// Constructs a reactive properties container with the given properties which gets bound.
155    pub fn new_with_id_from_properties<I: Into<IdType>, P: Into<PropertyInstances>>(id: I, properties: P) -> ReactiveProperties<IdType> {
156        let id = id.into();
157        let reactive_properties = ReactiveProperties::new();
158        for (property_name, value) in properties.into().into_iter() {
159            reactive_properties.insert(property_name.clone(), ReactiveProperty::new(id.clone(), property_name.clone(), Mutable, value));
160        }
161        reactive_properties
162    }
163
164    pub fn property<P: Into<ReactiveProperty<IdType>>>(self, property: P) -> Self {
165        let property = property.into();
166        self.insert(property.name.clone(), property);
167        self
168    }
169}
170
171impl<IdType: Clone> Default for ReactiveProperties<IdType> {
172    fn default() -> Self {
173        Self::new()
174    }
175}
176
177impl<IdType: Clone> Deref for ReactiveProperties<IdType> {
178    type Target = DashMap<String, ReactiveProperty<IdType>>;
179
180    fn deref(&self) -> &Self::Target {
181        &self.0
182    }
183}
184
185impl<IdType: Clone> DerefMut for ReactiveProperties<IdType> {
186    fn deref_mut(&mut self) -> &mut Self::Target {
187        &mut self.0
188    }
189}
190
191impl<IdType: Clone> IntoIterator for ReactiveProperties<IdType> {
192    type Item = (String, ReactiveProperty<IdType>);
193    type IntoIter = OwningIter<String, ReactiveProperty<IdType>>;
194
195    fn into_iter(self) -> Self::IntoIter {
196        self.0.into_iter()
197    }
198}
199
200impl<IdType: Clone> FromIterator<ReactiveProperty<IdType>> for ReactiveProperties<IdType> {
201    fn from_iter<I: IntoIterator<Item = ReactiveProperty<IdType>>>(iter: I) -> Self {
202        let properties = ReactiveProperties::new();
203        for property in iter {
204            properties.insert(property.name.clone(), property);
205        }
206        properties
207    }
208}
209
210impl<IdType: Clone> From<ReactiveProperties<IdType>> for PropertyInstances {
211    fn from(properties: ReactiveProperties<IdType>) -> Self {
212        let property_instances = PropertyInstances::new();
213        for (property_name, property) in properties.into_iter() {
214            property_instances.insert(property_name, property.get());
215        }
216        property_instances
217    }
218}
219
220impl<IdType: Clone> From<&ReactiveProperties<IdType>> for PropertyInstances {
221    fn from(properties: &ReactiveProperties<IdType>) -> Self {
222        let property_instances = PropertyInstances::new();
223        for property in properties.0.iter() {
224            property_instances.insert(property.key().clone(), property.get());
225        }
226        property_instances
227    }
228}
229
230// impl Add for ReactiveProperty {
231//     type Output = Self;
232//
233//     fn add(self, rhs: Self) -> Self::Output {
234//         self.value.read().unwrap().deref() + rhs.value.read().unwrap().deref()
235//     }
236// }
237
238// impl AddAssign for ReactiveProperty {
239//     fn add_assign(&mut self, rhs: Self) {
240//         let v = *self.value.read().unwrap().deref() + *rhs.value.read().unwrap().deref();
241//     }
242// }
243
244// TODO: implement PartialEq traits for bool, u64, i64, f64, string, &str
245// This makes it possible to simplify comparison:
246// if entity.get(name) == 32_i64 () { /* ...*/ }
247// 1. as_i64() -> Option
248// 2. if None -> false
249// 3. if Some -> Compare -> true or false
250
251// TODO: Implement operators
252// https://doc.rust-lang.org/std/ops/index.html
253// Add, A
254// Sub
255
256// TODO: Implement is_
257// self.value.read().unwrap().is_boolean()
258// is_64 is_array is_boolean is_i64 is_null is_number is_object is_string is_u64
259
260#[cfg(test)]
261pub mod tests {
262    use std::ops::DerefMut;
263    use std::ops::Index;
264    use std::sync::Arc;
265    use std::sync::RwLock;
266    use std::sync::atomic::AtomicU64;
267    use std::sync::atomic::Ordering;
268    use std::thread;
269
270    use rand::Rng;
271    use serde_json::json;
272    use stopwatch2::Stopwatch;
273    use uuid::Uuid;
274
275    use crate::Stream;
276
277    use crate::ReactiveProperty;
278    use reactive_graph_graph::Mutability::Mutable;
279    use reactive_graph_utils_test::r_string;
280
281    #[test]
282    fn reactive_property_instance_test() {
283        let uuid = Uuid::new_v4();
284
285        let property_name = r_string();
286
287        let initial_property_value = r_string();
288
289        let initial_property_value_json = json!(initial_property_value);
290
291        let reactive_property_instance = ReactiveProperty {
292            id: uuid,
293            name: property_name.clone(),
294            stream: Arc::new(RwLock::new(Stream::new())),
295            mutability: Mutable,
296            value: RwLock::new(initial_property_value_json),
297        };
298
299        // Check that the meta data is correct
300        assert_eq!(uuid, reactive_property_instance.id);
301        assert_eq!(property_name.clone(), reactive_property_instance.name);
302        assert_eq!(initial_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
303
304        // Set: Send to "stream", write inner "value"
305
306        let new_property_value = r_string();
307        let new_property_value_json = json!(new_property_value);
308
309        reactive_property_instance.set(new_property_value_json);
310
311        // Check that the inner value has changed
312        assert_eq!(new_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
313        assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
314
315        // Send: Send to "stream", do not change the inner "value" (!)
316
317        let send_property_value = r_string();
318        let send_property_value_json = json!(send_property_value);
319
320        reactive_property_instance.send(&send_property_value_json);
321
322        // Check that the inner value has not changed
323        assert_eq!(new_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
324        assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
325
326        // Check that the inner value is the same
327        assert_ne!(send_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
328        assert_ne!(send_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
329
330        // Create an observer which sinks on a variable
331
332        let observed_value_json = Arc::new(RwLock::new(reactive_property_instance.get()));
333        let inner_observed_value_json = Arc::clone(&observed_value_json);
334        reactive_property_instance.stream.read().unwrap().observe(move |value| {
335            let mut writer = inner_observed_value_json.write().unwrap();
336            *writer.deref_mut() = value.clone();
337        });
338
339        reactive_property_instance.send(&send_property_value_json);
340
341        // Check that the observer gets the sent value
342        assert_eq!(send_property_value.as_str(), observed_value_json.read().unwrap().as_str().unwrap());
343        // Check that the value hasn't changed
344        assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
345
346        // Resend the last value
347
348        let tick_value_json = Arc::new(RwLock::new(json!("")));
349        let i_tick_value_json = Arc::clone(&tick_value_json);
350        reactive_property_instance.stream.read().unwrap().observe(move |value| {
351            let mut writer = i_tick_value_json.write().unwrap();
352            *writer.deref_mut() = value.clone();
353        });
354
355        reactive_property_instance.tick();
356
357        // Check that the inner value has been sent to the observer
358        assert_eq!(new_property_value.as_str(), tick_value_json.read().unwrap().as_str().unwrap());
359    }
360
361    #[test]
362    fn create_reactive_property_instance_test() {
363        let uuid = Uuid::new_v4();
364        let property_name = r_string();
365        let initial_property_value = r_string();
366        let initial_property_value_json = json!(initial_property_value);
367        let reactive_property_instance = ReactiveProperty::new(uuid, property_name.clone(), Mutable, initial_property_value_json);
368
369        assert_eq!(uuid, reactive_property_instance.id);
370        assert_eq!(property_name.clone(), reactive_property_instance.name);
371        assert_eq!(initial_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
372
373        // Set: Send to "stream", write "value"
374
375        let new_property_value = r_string();
376        let new_property_value_json = json!(new_property_value);
377        reactive_property_instance.set_no_propagate(new_property_value_json);
378
379        assert_eq!(new_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
380        assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
381
382        // Send: Send to "stream", do not change "value"
383
384        let send_property_value = r_string();
385        let send_property_value_json = json!(send_property_value);
386        reactive_property_instance.send(&send_property_value_json);
387
388        assert_eq!(new_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
389        assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
390        assert_ne!(send_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
391        assert_ne!(send_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
392    }
393
394    #[test]
395    fn reactive_property_instance_typed_getter_test() {
396        let property_name = r_string();
397
398        let bool_value = json!(true);
399        assert_eq!(
400            bool_value.as_bool().unwrap(),
401            ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, bool_value)
402                .as_bool()
403                .unwrap()
404        );
405
406        let u64 = json!(123);
407        assert_eq!(123, ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, u64).as_u64().unwrap());
408
409        let i64 = json!(-123);
410        assert_eq!(-123, ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, i64).as_i64().unwrap());
411
412        let f64 = json!(-1.23);
413        assert_eq!(-1.23, ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, f64).as_f64().unwrap());
414
415        let rand_str = r_string();
416        let s = json!(rand_str.clone());
417        assert_eq!(
418            rand_str.clone(),
419            ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, s).as_string().unwrap()
420        );
421
422        let a = json!([1, 2, 3]);
423        let i = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, a);
424        assert_eq!(json!(1), i.as_array().unwrap().index(0).clone());
425        assert_eq!(json!(2), i.as_array().unwrap().index(1).clone());
426        assert_eq!(json!(3), i.as_array().unwrap().index(2).clone());
427
428        let o = json!({
429            "k": "v"
430        });
431        let i = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, o);
432        assert_eq!(json!("v"), i.as_object().unwrap().index("k").clone());
433    }
434
435    #[test]
436    fn reactive_property_instance_eq_bool_test() {
437        let property_name = r_string();
438
439        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(true));
440        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(true));
441        assert!(instance1 == instance2);
442
443        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(false));
444        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(true));
445        assert!(instance1 != instance2);
446
447        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(true));
448        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(false));
449        assert!(instance1 != instance2);
450
451        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(false));
452        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(false));
453        assert!(instance1 == instance2);
454    }
455
456    #[test]
457    fn reactive_property_instance_eq_number_test() {
458        let property_name = r_string();
459
460        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(1));
461        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(1));
462        assert!(instance1 == instance2);
463
464        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(2));
465        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(3));
466        assert!(instance1 != instance2);
467    }
468
469    #[test]
470    fn reactive_property_instance_eq_float_test() {
471        let property_name = r_string();
472
473        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(0.0));
474        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(0.0));
475        assert!(instance1 == instance2);
476
477        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(1.0));
478        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(1.1));
479        assert!(instance1 != instance2);
480    }
481
482    #[test]
483    fn reactive_property_instance_eq_string_test() {
484        let property_name = r_string();
485        let property_value = r_string();
486
487        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(property_value.clone()));
488        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(property_value.clone()));
489        assert!(instance1 == instance2);
490
491        let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(r_string()));
492        let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(r_string()));
493        assert!(instance1 != instance2);
494    }
495
496    #[test]
497    fn reactive_property_instance_stream_test() {
498        let instance1 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
499        let instance2 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
500
501        let v = Arc::new(AtomicU64::new(0));
502
503        {
504            let v = v.clone();
505            let writer = instance2.stream.write().unwrap();
506            let handle_id = Uuid::new_v4().as_u128();
507            writer.observe_with_handle(
508                move |value| {
509                    v.store(value.as_u64().unwrap(), Ordering::Relaxed);
510                },
511                handle_id,
512            );
513        }
514
515        {
516            let writer = instance1.stream.write().unwrap();
517            let handle_id = Uuid::new_v4().as_u128();
518            writer.observe_with_handle(
519                move |value| {
520                    instance2.set(value.clone());
521                },
522                handle_id,
523            );
524        }
525
526        instance1.set(json!(1));
527        assert_eq!(1, v.load(Ordering::Relaxed));
528    }
529
530    // TODO: implement cycle loop protection!
531    #[test]
532    #[ignore]
533    fn reactive_property_instance_stream_loop_test() {
534        let instance1 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
535        let instance2 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
536
537        {
538            let writer = instance2.stream.write().unwrap();
539            let handle_id = Uuid::new_v4().as_u128();
540            let instance1_stream = instance1.stream.clone();
541            writer.observe_with_handle(
542                move |value| {
543                    instance1_stream.write().unwrap().send(value);
544                },
545                handle_id,
546            );
547        }
548
549        {
550            let writer = instance1.stream.write().unwrap();
551            let handle_id = Uuid::new_v4().as_u128();
552            let instance2_stream = instance2.stream.clone();
553            writer.observe_with_handle(
554                move |value| {
555                    instance2_stream.write().unwrap().send(value);
556                },
557                handle_id,
558            );
559        }
560
561        let mut rng = rand::rng();
562
563        let number: u64 = rng.random();
564        instance1.set(json!(number));
565    }
566
567    #[test]
568    #[ignore]
569    fn reactive_property_instance_stream_mt_benchmark() {
570        let instance1 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
571        let instance2 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
572
573        let v = Arc::new(AtomicU64::new(0));
574
575        {
576            let v = v.clone();
577            let writer = instance2.stream.write().unwrap();
578            let handle_id = Uuid::new_v4().as_u128();
579            writer.observe_with_handle(
580                move |value| {
581                    v.store(value.as_u64().unwrap(), Ordering::Relaxed);
582                },
583                handle_id,
584            );
585        }
586
587        {
588            let writer = instance1.stream.write().unwrap();
589            let handle_id = Uuid::new_v4().as_u128();
590            writer.observe_with_handle(
591                move |value| {
592                    instance2.set(value.clone());
593                },
594                handle_id,
595            );
596        }
597
598        const NUM_THREADS: i32 = 256;
599        const NUM_ITERATIONS: i32 = 10000;
600
601        println!("starting");
602        let mut handles = Vec::new();
603        let mut s1 = Stopwatch::default();
604        s1.start();
605        for thread_no in 1..NUM_THREADS {
606            let v = v.clone();
607            let stream = instance1.stream.clone();
608            let handle = thread::spawn(move || {
609                let mut rng = rand::rng();
610                let mut s = Stopwatch::default();
611                s.start();
612                for _iteration in 1..NUM_ITERATIONS {
613                    let number: u64 = rng.random();
614                    let value = json!(number);
615                    let writer = stream.write().unwrap();
616                    writer.send(&value);
617                    let new_value = v.load(Ordering::Relaxed);
618                    assert_eq!(number, new_value);
619                }
620                s.stop();
621                println!("finished thread [{}] in {:?}", thread_no, s.elapsed());
622            });
623            handles.push(handle)
624        }
625        println!("started {} parallel threads in {:?}", NUM_THREADS, s1.elapsed());
626        s1.start();
627        println!("running");
628        handles.into_iter().for_each(move |handle| {
629            handle.join().unwrap();
630        });
631        println!(
632            "finished {} parallel threads with each {} stream propagations (total {}) in {:?}",
633            NUM_THREADS,
634            NUM_ITERATIONS,
635            NUM_THREADS * NUM_ITERATIONS,
636            s1.elapsed()
637        );
638    }
639}