1use std::ops::Deref;
2use std::ops::DerefMut;
3use std::sync::Arc;
4use std::sync::RwLock;
5
6use dashmap::DashMap;
7use dashmap::iter::OwningIter;
8use serde_json::Map;
9use serde_json::Value;
10
11use crate::Stream;
12
13use reactive_graph_graph::ContainerPropertyInstance;
14use reactive_graph_graph::Mutability;
15use reactive_graph_graph::Mutability::Mutable;
16use reactive_graph_graph::PropertyInstances;
17
18pub struct ReactiveProperty<IdType: Clone> {
19 pub id: IdType,
21
22 pub name: String,
24
25 pub mutability: Mutability,
27
28 pub stream: Arc<RwLock<Stream<'static, Value>>>,
30
31 pub value: RwLock<Value>,
33}
34
35impl<IdType: Clone> ReactiveProperty<IdType> {
36 pub fn new<S: Into<String>>(id: IdType, name: S, mutability: Mutability, value: Value) -> ReactiveProperty<IdType> {
37 ReactiveProperty {
38 id,
39 name: name.into(),
40 mutability,
41 stream: Arc::new(RwLock::new(Stream::new())),
42 value: RwLock::new(value),
43 }
44 }
45
46 pub fn get(&self) -> Value {
47 self.value.read().unwrap().clone()
48 }
49
50 pub fn set_checked(&self, value: Value) {
51 if self.mutability == Mutable {
52 self.set(value);
53 }
54 }
55
56 pub fn set(&self, value: Value) {
57 let mut writer = self.value.write().unwrap();
58 *writer.deref_mut() = value.clone();
59 self.stream.read().unwrap().send(&value);
60 }
61
62 pub fn set_no_propagate_checked(&self, value: Value) {
63 if self.mutability == Mutable {
64 self.set_no_propagate(value);
65 }
66 }
67
68 pub fn set_no_propagate(&self, value: Value) {
69 let mut writer = self.value.write().unwrap();
70 *writer.deref_mut() = value;
71 }
72
73 pub fn send(&self, signal: &Value) {
75 self.stream.read().unwrap().send(signal);
76 }
77
78 pub fn tick_checked(&self) {
80 if self.mutability == Mutable {
81 self.tick();
82 }
83 }
84
85 pub fn tick(&self) {
87 let value = self.value.read().unwrap().deref().clone();
89 self.stream.read().unwrap().send(&value);
90 }
91
92 pub fn set_mutability(&mut self, mutability: Mutability) {
93 self.mutability = mutability;
94 }
95
96 pub fn as_bool(&self) -> Option<bool> {
97 self.get().as_bool()
98 }
99
100 pub fn as_u64(&self) -> Option<u64> {
101 self.get().as_u64()
102 }
103
104 pub fn as_i64(&self) -> Option<i64> {
105 self.get().as_i64()
106 }
107
108 pub fn as_f64(&self) -> Option<f64> {
109 self.get().as_f64()
110 }
111
112 pub fn as_string(&self) -> Option<String> {
113 self.get().as_str().map(String::from)
114 }
115
116 pub fn as_array(&self) -> Option<Vec<Value>> {
117 self.get().as_array().cloned()
118 }
119
120 pub fn as_object(&self) -> Option<Map<String, Value>> {
121 self.get().as_object().cloned()
122 }
123}
124
125impl<IdType: Clone> PartialEq for ReactiveProperty<IdType> {
126 fn eq(&self, other: &Self) -> bool {
127 self.value.read().unwrap().deref() == other.value.read().unwrap().deref()
128 }
129}
130
131impl<IdType: Clone> From<ReactiveProperty<IdType>> for ContainerPropertyInstance<IdType> {
132 fn from(property: ReactiveProperty<IdType>) -> Self {
133 let reader = property.value.read().unwrap();
134 ContainerPropertyInstance::new(property.id, property.name, reader.clone())
135 }
136}
137
138impl<IdType: Clone> From<&ReactiveProperty<IdType>> for ContainerPropertyInstance<IdType> {
139 fn from(property: &ReactiveProperty<IdType>) -> Self {
140 let reader = property.value.read().unwrap();
141 ContainerPropertyInstance::new(property.id.clone(), property.name.clone(), reader.clone())
142 }
143}
144
145pub struct ReactiveProperties<IdType: Clone>(DashMap<String, ReactiveProperty<IdType>>);
147
148impl<IdType: Clone> ReactiveProperties<IdType> {
149 pub fn new() -> Self {
151 ReactiveProperties(DashMap::new())
152 }
153
154 pub fn new_with_id_from_properties<I: Into<IdType>, P: Into<PropertyInstances>>(id: I, properties: P) -> ReactiveProperties<IdType> {
156 let id = id.into();
157 let reactive_properties = ReactiveProperties::new();
158 for (property_name, value) in properties.into().into_iter() {
159 reactive_properties.insert(property_name.clone(), ReactiveProperty::new(id.clone(), property_name.clone(), Mutable, value));
160 }
161 reactive_properties
162 }
163
164 pub fn property<P: Into<ReactiveProperty<IdType>>>(self, property: P) -> Self {
165 let property = property.into();
166 self.insert(property.name.clone(), property);
167 self
168 }
169}
170
171impl<IdType: Clone> Default for ReactiveProperties<IdType> {
172 fn default() -> Self {
173 Self::new()
174 }
175}
176
177impl<IdType: Clone> Deref for ReactiveProperties<IdType> {
178 type Target = DashMap<String, ReactiveProperty<IdType>>;
179
180 fn deref(&self) -> &Self::Target {
181 &self.0
182 }
183}
184
185impl<IdType: Clone> DerefMut for ReactiveProperties<IdType> {
186 fn deref_mut(&mut self) -> &mut Self::Target {
187 &mut self.0
188 }
189}
190
191impl<IdType: Clone> IntoIterator for ReactiveProperties<IdType> {
192 type Item = (String, ReactiveProperty<IdType>);
193 type IntoIter = OwningIter<String, ReactiveProperty<IdType>>;
194
195 fn into_iter(self) -> Self::IntoIter {
196 self.0.into_iter()
197 }
198}
199
200impl<IdType: Clone> FromIterator<ReactiveProperty<IdType>> for ReactiveProperties<IdType> {
201 fn from_iter<I: IntoIterator<Item = ReactiveProperty<IdType>>>(iter: I) -> Self {
202 let properties = ReactiveProperties::new();
203 for property in iter {
204 properties.insert(property.name.clone(), property);
205 }
206 properties
207 }
208}
209
210impl<IdType: Clone> From<ReactiveProperties<IdType>> for PropertyInstances {
211 fn from(properties: ReactiveProperties<IdType>) -> Self {
212 let property_instances = PropertyInstances::new();
213 for (property_name, property) in properties.into_iter() {
214 property_instances.insert(property_name, property.get());
215 }
216 property_instances
217 }
218}
219
220impl<IdType: Clone> From<&ReactiveProperties<IdType>> for PropertyInstances {
221 fn from(properties: &ReactiveProperties<IdType>) -> Self {
222 let property_instances = PropertyInstances::new();
223 for property in properties.0.iter() {
224 property_instances.insert(property.key().clone(), property.get());
225 }
226 property_instances
227 }
228}
229
230#[cfg(test)]
261pub mod tests {
262 use std::ops::DerefMut;
263 use std::ops::Index;
264 use std::sync::Arc;
265 use std::sync::RwLock;
266 use std::sync::atomic::AtomicU64;
267 use std::sync::atomic::Ordering;
268 use std::thread;
269
270 use rand::Rng;
271 use serde_json::json;
272 use stopwatch2::Stopwatch;
273 use uuid::Uuid;
274
275 use crate::Stream;
276
277 use crate::ReactiveProperty;
278 use reactive_graph_graph::Mutability::Mutable;
279 use reactive_graph_utils_test::r_string;
280
281 #[test]
282 fn reactive_property_instance_test() {
283 let uuid = Uuid::new_v4();
284
285 let property_name = r_string();
286
287 let initial_property_value = r_string();
288
289 let initial_property_value_json = json!(initial_property_value);
290
291 let reactive_property_instance = ReactiveProperty {
292 id: uuid,
293 name: property_name.clone(),
294 stream: Arc::new(RwLock::new(Stream::new())),
295 mutability: Mutable,
296 value: RwLock::new(initial_property_value_json),
297 };
298
299 assert_eq!(uuid, reactive_property_instance.id);
301 assert_eq!(property_name.clone(), reactive_property_instance.name);
302 assert_eq!(initial_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
303
304 let new_property_value = r_string();
307 let new_property_value_json = json!(new_property_value);
308
309 reactive_property_instance.set(new_property_value_json);
310
311 assert_eq!(new_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
313 assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
314
315 let send_property_value = r_string();
318 let send_property_value_json = json!(send_property_value);
319
320 reactive_property_instance.send(&send_property_value_json);
321
322 assert_eq!(new_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
324 assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
325
326 assert_ne!(send_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
328 assert_ne!(send_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
329
330 let observed_value_json = Arc::new(RwLock::new(reactive_property_instance.get()));
333 let inner_observed_value_json = Arc::clone(&observed_value_json);
334 reactive_property_instance.stream.read().unwrap().observe(move |value| {
335 let mut writer = inner_observed_value_json.write().unwrap();
336 *writer.deref_mut() = value.clone();
337 });
338
339 reactive_property_instance.send(&send_property_value_json);
340
341 assert_eq!(send_property_value.as_str(), observed_value_json.read().unwrap().as_str().unwrap());
343 assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
345
346 let tick_value_json = Arc::new(RwLock::new(json!("")));
349 let i_tick_value_json = Arc::clone(&tick_value_json);
350 reactive_property_instance.stream.read().unwrap().observe(move |value| {
351 let mut writer = i_tick_value_json.write().unwrap();
352 *writer.deref_mut() = value.clone();
353 });
354
355 reactive_property_instance.tick();
356
357 assert_eq!(new_property_value.as_str(), tick_value_json.read().unwrap().as_str().unwrap());
359 }
360
361 #[test]
362 fn create_reactive_property_instance_test() {
363 let uuid = Uuid::new_v4();
364 let property_name = r_string();
365 let initial_property_value = r_string();
366 let initial_property_value_json = json!(initial_property_value);
367 let reactive_property_instance = ReactiveProperty::new(uuid, property_name.clone(), Mutable, initial_property_value_json);
368
369 assert_eq!(uuid, reactive_property_instance.id);
370 assert_eq!(property_name.clone(), reactive_property_instance.name);
371 assert_eq!(initial_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
372
373 let new_property_value = r_string();
376 let new_property_value_json = json!(new_property_value);
377 reactive_property_instance.set_no_propagate(new_property_value_json);
378
379 assert_eq!(new_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
380 assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
381
382 let send_property_value = r_string();
385 let send_property_value_json = json!(send_property_value);
386 reactive_property_instance.send(&send_property_value_json);
387
388 assert_eq!(new_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
389 assert_eq!(new_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
390 assert_ne!(send_property_value.as_str(), reactive_property_instance.value.read().unwrap().as_str().unwrap());
391 assert_ne!(send_property_value.as_str(), reactive_property_instance.get().as_str().unwrap());
392 }
393
394 #[test]
395 fn reactive_property_instance_typed_getter_test() {
396 let property_name = r_string();
397
398 let bool_value = json!(true);
399 assert_eq!(
400 bool_value.as_bool().unwrap(),
401 ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, bool_value)
402 .as_bool()
403 .unwrap()
404 );
405
406 let u64 = json!(123);
407 assert_eq!(123, ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, u64).as_u64().unwrap());
408
409 let i64 = json!(-123);
410 assert_eq!(-123, ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, i64).as_i64().unwrap());
411
412 let f64 = json!(-1.23);
413 assert_eq!(-1.23, ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, f64).as_f64().unwrap());
414
415 let rand_str = r_string();
416 let s = json!(rand_str.clone());
417 assert_eq!(
418 rand_str.clone(),
419 ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, s).as_string().unwrap()
420 );
421
422 let a = json!([1, 2, 3]);
423 let i = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, a);
424 assert_eq!(json!(1), i.as_array().unwrap().index(0).clone());
425 assert_eq!(json!(2), i.as_array().unwrap().index(1).clone());
426 assert_eq!(json!(3), i.as_array().unwrap().index(2).clone());
427
428 let o = json!({
429 "k": "v"
430 });
431 let i = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, o);
432 assert_eq!(json!("v"), i.as_object().unwrap().index("k").clone());
433 }
434
435 #[test]
436 fn reactive_property_instance_eq_bool_test() {
437 let property_name = r_string();
438
439 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(true));
440 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(true));
441 assert!(instance1 == instance2);
442
443 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(false));
444 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(true));
445 assert!(instance1 != instance2);
446
447 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(true));
448 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(false));
449 assert!(instance1 != instance2);
450
451 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(false));
452 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(false));
453 assert!(instance1 == instance2);
454 }
455
456 #[test]
457 fn reactive_property_instance_eq_number_test() {
458 let property_name = r_string();
459
460 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(1));
461 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(1));
462 assert!(instance1 == instance2);
463
464 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(2));
465 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(3));
466 assert!(instance1 != instance2);
467 }
468
469 #[test]
470 fn reactive_property_instance_eq_float_test() {
471 let property_name = r_string();
472
473 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(0.0));
474 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(0.0));
475 assert!(instance1 == instance2);
476
477 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(1.0));
478 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(1.1));
479 assert!(instance1 != instance2);
480 }
481
482 #[test]
483 fn reactive_property_instance_eq_string_test() {
484 let property_name = r_string();
485 let property_value = r_string();
486
487 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(property_value.clone()));
488 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(property_value.clone()));
489 assert!(instance1 == instance2);
490
491 let instance1 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(r_string()));
492 let instance2 = ReactiveProperty::new(Uuid::new_v4(), property_name.clone(), Mutable, json!(r_string()));
493 assert!(instance1 != instance2);
494 }
495
496 #[test]
497 fn reactive_property_instance_stream_test() {
498 let instance1 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
499 let instance2 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
500
501 let v = Arc::new(AtomicU64::new(0));
502
503 {
504 let v = v.clone();
505 let writer = instance2.stream.write().unwrap();
506 let handle_id = Uuid::new_v4().as_u128();
507 writer.observe_with_handle(
508 move |value| {
509 v.store(value.as_u64().unwrap(), Ordering::Relaxed);
510 },
511 handle_id,
512 );
513 }
514
515 {
516 let writer = instance1.stream.write().unwrap();
517 let handle_id = Uuid::new_v4().as_u128();
518 writer.observe_with_handle(
519 move |value| {
520 instance2.set(value.clone());
521 },
522 handle_id,
523 );
524 }
525
526 instance1.set(json!(1));
527 assert_eq!(1, v.load(Ordering::Relaxed));
528 }
529
530 #[test]
532 #[ignore]
533 fn reactive_property_instance_stream_loop_test() {
534 let instance1 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
535 let instance2 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
536
537 {
538 let writer = instance2.stream.write().unwrap();
539 let handle_id = Uuid::new_v4().as_u128();
540 let instance1_stream = instance1.stream.clone();
541 writer.observe_with_handle(
542 move |value| {
543 instance1_stream.write().unwrap().send(value);
544 },
545 handle_id,
546 );
547 }
548
549 {
550 let writer = instance1.stream.write().unwrap();
551 let handle_id = Uuid::new_v4().as_u128();
552 let instance2_stream = instance2.stream.clone();
553 writer.observe_with_handle(
554 move |value| {
555 instance2_stream.write().unwrap().send(value);
556 },
557 handle_id,
558 );
559 }
560
561 let mut rng = rand::rng();
562
563 let number: u64 = rng.random();
564 instance1.set(json!(number));
565 }
566
567 #[test]
568 #[ignore]
569 fn reactive_property_instance_stream_mt_benchmark() {
570 let instance1 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
571 let instance2 = ReactiveProperty::new(Uuid::new_v4(), r_string(), Mutable, json!(0));
572
573 let v = Arc::new(AtomicU64::new(0));
574
575 {
576 let v = v.clone();
577 let writer = instance2.stream.write().unwrap();
578 let handle_id = Uuid::new_v4().as_u128();
579 writer.observe_with_handle(
580 move |value| {
581 v.store(value.as_u64().unwrap(), Ordering::Relaxed);
582 },
583 handle_id,
584 );
585 }
586
587 {
588 let writer = instance1.stream.write().unwrap();
589 let handle_id = Uuid::new_v4().as_u128();
590 writer.observe_with_handle(
591 move |value| {
592 instance2.set(value.clone());
593 },
594 handle_id,
595 );
596 }
597
598 const NUM_THREADS: i32 = 256;
599 const NUM_ITERATIONS: i32 = 10000;
600
601 println!("starting");
602 let mut handles = Vec::new();
603 let mut s1 = Stopwatch::default();
604 s1.start();
605 for thread_no in 1..NUM_THREADS {
606 let v = v.clone();
607 let stream = instance1.stream.clone();
608 let handle = thread::spawn(move || {
609 let mut rng = rand::rng();
610 let mut s = Stopwatch::default();
611 s.start();
612 for _iteration in 1..NUM_ITERATIONS {
613 let number: u64 = rng.random();
614 let value = json!(number);
615 let writer = stream.write().unwrap();
616 writer.send(&value);
617 let new_value = v.load(Ordering::Relaxed);
618 assert_eq!(number, new_value);
619 }
620 s.stop();
621 println!("finished thread [{}] in {:?}", thread_no, s.elapsed());
622 });
623 handles.push(handle)
624 }
625 println!("started {} parallel threads in {:?}", NUM_THREADS, s1.elapsed());
626 s1.start();
627 println!("running");
628 handles.into_iter().for_each(move |handle| {
629 handle.join().unwrap();
630 });
631 println!(
632 "finished {} parallel threads with each {} stream propagations (total {}) in {:?}",
633 NUM_THREADS,
634 NUM_ITERATIONS,
635 NUM_THREADS * NUM_ITERATIONS,
636 s1.elapsed()
637 );
638 }
639}