reactive_graph_reactive_model_impl/frp/mod.rs
1//! The bidule FRP crate.
2//!
3//! This crate provides few simple primitives to write FRP-driven programs. Everything revolves
4//! around the concept of a [`Stream`].
5//!
6//! # Streams
7//!
8//! A [`Stream`] is a *stream of typed signals*. A stream of signals will get a *signal* as input
9//! and will broadcast it downwards. You can compose streams with each other with very simple
10//! combinators, such as `map`, `filter`, `filter_map`, `zip`, `unzip`, `merge`, `fold`, etc. (non-exhaustive list).
11//!
12//! ## Creating streams and send signals
13//!
14//! Streams are typed. You can use type inference or give them an explicit type:
15//!
16//! ```rust
17//! use reactive_graph_reactive_model_impl::Stream;
18//!
19//! let my_stream: Stream<i32> = Stream::new();
20//! ```
21//!
22//! That’s all you need to create a stream. A stream represent a value that will be flowing in *at
23//! some time*.
24//!
25//! > Even though it’s not strictly the same thing, you can see a similitude with
26//! > [futures](https://crates.io/crates/futures).
27//!
28//! When you’re ready to send signals, just call the `send` function:
29//!
30//! ```rust
31//! use reactive_graph_reactive_model_impl::Stream;
32//!
33//! let my_stream: Stream<i32> = Stream::new();
34//!
35//! my_stream.send(&1);
36//! my_stream.send(&2);
37//! my_stream.send(&3);
38//! ```
39//!
40//! ## Observing signals
41//!
42//! A single stream like that one won’t do much – actually, it’ll do nothing. The first thing we
43//! might want to do is to subscribe a closure to do something when a signal is emitted. This is
44//! done with the [`Stream::observe`] function.
45//!
46//! ```rust
47//! use reactive_graph_reactive_model_impl::Stream;
48//!
49//! let my_stream: Stream<i32> = Stream::new();
50//!
51//! my_stream.observe(|sig| {
52//! // print the signal on stdout each time it’s flowing in
53//! println!("signal: {:?}", sig);
54//! });
55//!
56//! my_stream.send(&1);
57//! my_stream.send(&2);
58//! my_stream.send(&3);
59//! ```
60//!
61//! ## FRP basics
62//!
63//! However, FRP is not about callbacks. It’s actually the opposite, to be honest. We try to reduce
64//! the use of callbacks as much as possible. FRP solves this by inversing the way you must work:
65//! instead of subscribing callbacks to react to something, you transform that something to create
66//! new values or objects. This is akin to the kind of transformations you do with `Future`.
67//!
68//! Let’s get our feet wet: let’s create a new stream that will only emit signals for even values:
69//!
70//! ```rust
71//! use reactive_graph_reactive_model_impl::Stream;
72//!
73//! let int_stream: Stream<i32> = Stream::new();
74//! let even_stream = int_stream.filter(|x| x % 2 == 0);
75//! ```
76//!
77//! `even_stream` has type `Stream<i32>` and will only emit signals when the input signal is `even`.
78//!
79//! Let’s try something more complicated: on those signals, if the value is less or equal to 10,
80//! output `"Hello, world!"`; otherwise, output `"See you!"`.
81//!
82//! ```rust
83//! use reactive_graph_reactive_model_impl::Stream;
84//!
85//! let int_stream: Stream<i32> = Stream::new();
86//! let even_stream = int_stream.filter(|x| x % 2 == 0);
87//! let str_stream = even_stream.map(|x| if *x <= 10 { "Hello, world!" } else { "See you!" });
88//! ```
89//!
90//! This is really easy; no trap involved.
91//!
92//! Ok, let’s try something else. Some kind of a *Hello world* for FRP.
93//!
94//! ```rust
95//! use reactive_graph_reactive_model_impl::Stream;
96//!
97//! enum Button {
98//! Pressed,
99//! Released
100//! }
101//!
102//! fn unbuttonify(button: &Button, v: i32) -> Option<i32> {
103//! match *button {
104//! Button::Released => Some(v),
105//! _ => None
106//! }
107//! }
108//!
109//! let minus = Stream::new();
110//! let plus = Stream::new();
111//! let counter =
112//! minus.filter_map(|b| unbuttonify(b, -1))
113//! .merge(&plus.filter_map(|b| unbuttonify(b, 1)))
114//! .fold(0, |a, x| a + x);
115//! ```
116//!
117//! In this snippet, we have two buttons: `minus` and `plus`. If we hit the `minus` button, we want
118//! a counter to be decremented and if we hit the `plus` button, the counter must increment.
119//!
120//! FRP solves that problem by expressing `counter` in terms of both `minus` and `plus`. The first
121//! thing we do is to map a number on the stream that broadcasts button signals. Whenever that
122//! signal is a `Button::Released`, we return a given number. For `minus`, we return `-1` and for
123//! `plus`, we return `1` – or `+1`, it’s the same thing. That gives us two new streams. Let’s see
124//! the types to have a deeper understanding:
125//!
126//! - `minus: Stream<Button>`
127//! - `plus: Stream<Button>`
128//! - `minus.filter_map(|b| unbuttonify(b, -1)): Stream<i32>`
129//! - `plus.filter_map(|b| unbuttonify(b, 1)): Stream<i32>`
130//!
131//! The `merge` method is very simple: it takes two streams that emit the same type of signals and
132//! merges them into a single stream that will broadcasts both the signals:
133//!
134//! - `minus.filter_map(|b| unbuttonify(b, -1)).merge(plus.filter_map(|b| unbuttonify(b, 1)): Stream<i32>): Stream<i32>`
135//!
136//! The next and final step is to `fold` those `i32` into the final value of the counter by applying
137//! successive additions. This is done with the `fold` method, that takes the initial value – in the
138//! case of a counter, it’s `0` – and the function to accumulate, with the accumulator as first
139//! argument and the iterated value as second argument.
140//!
141//! The resulting stream, which type is `Stream<i32>`, will then contain the value of the counter.
142//! You can test it by sending `Button` signals on both `minus` and `plus`: the resulting signal
143//! in `counter` will be correctly decrementing or incrementing.
144//!
145//! There exist several more, interesting combinators to work with your streams. For instance, if
146//! you don’t want to map a function over two streams to make them compatible with each other – they
147//! have different types, you can still perform some kind of a merge. That operation is called a
148//! `zip` and the resulting stream will yield either the value from the first – left – stream or the
149//! value of the other – right – as soon as a signal is emitted. Its dual method is called `unzip`
150//! and will split a stream apart into two streams if it’s a zipped stream. See `Either` for further
151//! details.
152//!
153//! ## Sinking
154//!
155//! *Sinking* is the action to consume the signals of a stream and collect them. The current
156//! implementation uses non-blocking buffering when sending signals, and reading is up to you: the
157//! stream will collect the output signals in a buffer you can read via the
158//! [Iterator](https://doc.rust-lang.org/std/iter/trait.Iterator.html) trait. For instance,
159//! non-blocking reads:
160//!
161//! ```rust
162//! use reactive_graph_reactive_model_impl::Stream;
163//!
164//! enum Button {
165//! Pressed,
166//! Released
167//! }
168//!
169//! fn unbuttonify(button: &Button, v: i32) -> Option<i32> {
170//! match *button {
171//! Button::Released => Some(v),
172//! _ => None
173//! }
174//! }
175//!
176//! let minus = Stream::new();
177//! let plus = Stream::new();
178//! let counter =
179//! minus.filter_map(|b| unbuttonify(b, -1))
180//! .merge(&plus.filter_map(|b| unbuttonify(b, 1)))
181//! .fold(0, |a, x| a + x);
182//!
183//! let rx = counter.recv();
184//!
185//! // do something with minus and plus
186//! // …
187//!
188//! for v in rx.try_iter() {
189//! println!("read a new value of the counter: {}", v);
190//! }
191//! ```
192
193use dashmap::DashMap;
194// use rayon::prelude::*;
195use std::hash::Hash;
196use std::ops::Deref;
197use std::sync::Arc;
198use std::sync::RwLock;
199use std::sync::Weak;
200use std::sync::mpsc::Receiver;
201use std::sync::mpsc::channel;
202use uuid::Uuid;
203
204/// The subscriber stores the handle_id and the closure.
205type Subscriber<'a, Sig> = dyn FnMut(&Sig) + Send + 'a;
206
207/// List of subscribers to a stream.
208///
209/// When a stream emit a value, it streams the value down to all subscribers.
210type BoxedSubscriber<'a, Sig> = Box<Subscriber<'a, Sig>>;
211type Subscribers<'a, Sig> = DashMap<u128, BoxedSubscriber<'a, Sig>>;
212
213/// Dependent streams that will receive values.
214///
215/// Most streams will own their subscribers. However, in some cases, it’s required not to own
216/// the subscribers to break rc cycles.
217enum DependentStreams<'a, Sig> {
218 Own(Arc<Subscribers<'a, Sig>>),
219 Weak(Weak<Subscribers<'a, Sig>>),
220}
221
222/// Either one or another type.
223///
224/// This type is especially useful for zipping and unzipping streams. If a stream has a type like
225/// `Stream<Either<A, B>>`, it means you can unzip it and get two streams: `Stream<A>` and
226/// `Stream<B>`.
227#[derive(Clone, Debug, Eq, Hash, PartialEq)]
228pub enum Either<A, B> {
229 /// Left part of the merger.
230 Left(A),
231 /// Right part of the merger.
232 Right(B),
233}
234
235/// A stream of signals.
236///
237/// A stream represents a composable signal producer. When you decide to send a signal down a
238/// stream, any other streams composed with that first stream will also receive the signal. This
239/// enables to construct more interesting and complex streams by composing them via, for instance,
240/// [`Stream::map`], [`Stream::filter_map`], [`Stream::filter`], [`Stream::fold`],
241/// [`Stream::merge`], [`Stream::zip`], etc.
242pub struct Stream<'a, Sig> {
243 subscribers: RwLock<DependentStreams<'a, Sig>>,
244}
245
246/// TODO: This solves many problems, but is it really OK?
247unsafe impl<Sig> Send for Stream<'_, Sig> {}
248
249unsafe impl<Sig> Sync for Stream<'_, Sig> {}
250
251impl<'a, Sig> Stream<'a, Sig>
252where
253 Sig: 'a + Send + Sync,
254{
255 /// Create a new stream.
256 pub fn new() -> Self {
257 let subscribers = DependentStreams::Own(Arc::new(DashMap::new()));
258 Stream {
259 subscribers: RwLock::new(subscribers),
260 }
261 }
262
263 /// Create a new version of this stream by behaving the same way as the input reference (if it’s
264 /// an owned pointer, it clones ownership; if it’s a weak pointer, it clone the weak pointer).
265 fn new_same(&self) -> Self {
266 let guard = self.subscribers.read().unwrap();
267 let subscribers = RwLock::new(match guard.deref() {
268 DependentStreams::Own(rc) => DependentStreams::Own(rc.clone()),
269 DependentStreams::Weak(weak) => DependentStreams::Weak(weak.clone()),
270 });
271
272 Stream { subscribers }
273 }
274
275 /// Create new, non-owning version of this stream.
276 fn new_weak(&self) -> Self {
277 let guard = self.subscribers.read().unwrap();
278 let subscribers = RwLock::new(match guard.deref() {
279 DependentStreams::Own(rc) => DependentStreams::Weak(Arc::downgrade(rc)),
280 DependentStreams::Weak(weak) => DependentStreams::Weak(weak.clone()),
281 });
282
283 Stream { subscribers }
284 }
285
286 /// Observe a stream signal output flowing out of the stream.
287 ///
288 /// Do not abuse this function, as its primary use is to build other combinators.
289 #[allow(clippy::readonly_write_lock)]
290 pub fn observe<F>(&self, subscriber: F) -> u128
291 where
292 F: 'a + FnMut(&Sig) + Send,
293 {
294 let handle_id = Uuid::new_v4().as_u128();
295 let guard = self.subscribers.write().unwrap();
296 match guard.deref() {
297 DependentStreams::Own(subscribers) => {
298 subscribers.insert(handle_id, Box::new(subscriber));
299 }
300 DependentStreams::Weak(weak) => {
301 if let Some(subscribers) = weak.upgrade() {
302 subscribers.insert(handle_id, Box::new(subscriber));
303 }
304 }
305 }
306 handle_id
307 }
308
309 /// Observe a stream signal output flowing out of the stream.
310 ///
311 /// Do not abuse this function, as its primary use is to build other combinators.
312 #[allow(clippy::readonly_write_lock)]
313 pub fn observe_with_handle<F>(&self, subscriber: F, handle_id: u128)
314 where
315 F: 'a + FnMut(&Sig) + Send,
316 {
317 let guard = self.subscribers.write().unwrap();
318 match guard.deref() {
319 DependentStreams::Own(subscribers) => {
320 subscribers.insert(handle_id, Box::new(subscriber));
321 }
322 DependentStreams::Weak(weak) => {
323 if let Some(subscribers) = weak.upgrade() {
324 subscribers.insert(handle_id, Box::new(subscriber));
325 }
326 }
327 }
328 }
329
330 /// Removes the subscriber with the given handle_id.
331 #[allow(clippy::readonly_write_lock)]
332 pub fn remove(&self, handle_id: u128) {
333 let guard = self.subscribers.write().unwrap();
334 match guard.deref() {
335 DependentStreams::Own(subscribers) => {
336 subscribers.remove(&handle_id);
337 }
338
339 DependentStreams::Weak(weak) => {
340 if let Some(subscribers) = weak.upgrade() {
341 subscribers.remove(&handle_id);
342 }
343 }
344 }
345 }
346
347 /// Removes all subscribers without dropping the stream.
348 #[allow(clippy::readonly_write_lock)]
349 pub fn clear(&self) {
350 let guard = self.subscribers.write().unwrap();
351 match guard.deref() {
352 DependentStreams::Own(subscribers) => {
353 subscribers.clear();
354 }
355
356 DependentStreams::Weak(weak) => {
357 if let Some(subscribers) = weak.upgrade() {
358 subscribers.clear();
359 }
360 }
361 }
362 }
363
364 /// Send a signal down the stream.
365 #[allow(clippy::readonly_write_lock)]
366 pub fn send(&self, signal: &Sig) {
367 let guard = self.subscribers.write().unwrap();
368 match guard.deref() {
369 DependentStreams::Own(subscribers) => {
370 // subscribers.par_iter_mut().for_each(|mut subscriber| {
371 // subscriber(&signal);
372 // });
373 subscribers.iter_mut().for_each(|mut subscriber| {
374 subscriber(signal);
375 });
376 }
377
378 DependentStreams::Weak(weak) => {
379 if let Some(subscribers) = weak.upgrade() {
380 // subscribers.deref().par_iter_mut().for_each(|mut subscriber| {
381 // subscriber(&signal);
382 // });
383 subscribers.iter_mut().for_each(|mut subscriber| {
384 subscriber(signal);
385 });
386 }
387 }
388 }
389 }
390
391 /// Map any signals flowing out of a stream.
392 ///
393 /// Please note that this function is total: you cannot ignore signals. Even if you map
394 /// *uninteresting signals* to `None`, you’ll still compose signals for those. If you're interested
395 /// in filtering signals while mapping, have a look at the [`Stream::filter_map`] function.
396 pub fn map<F, OutSig>(&self, f: F) -> Stream<'a, OutSig>
397 where
398 F: 'a + Fn(&Sig) -> OutSig + Send,
399 OutSig: 'a + Send + Sync,
400 {
401 let mapped_stream = Stream::new();
402 let mapped_stream_ = mapped_stream.new_same();
403
404 self.observe(move |sig| {
405 mapped_stream_.send(&f(sig));
406 });
407
408 mapped_stream
409 }
410
411 /// Filter and map signals flowing out of a stream.
412 ///
413 /// If you’re not interested in a specific signal, you can emit [`None`]: no signal will be sent.
414 pub fn filter_map<F, OutSig>(&self, f: F) -> Stream<'a, OutSig>
415 where
416 F: 'a + Fn(&Sig) -> Option<OutSig> + Send,
417 OutSig: 'a + Send + Sync,
418 {
419 let mapped_stream = Stream::new();
420 let mapped_stream_ = mapped_stream.new_same();
421
422 self.observe(move |sig| {
423 if let Some(ref mapped_sig) = f(sig) {
424 mapped_stream_.send(mapped_sig);
425 }
426 });
427
428 mapped_stream
429 }
430
431 /// Filter the signals flowing out of a stream with a predicate.
432 pub fn filter<F>(&self, pred: F) -> Self
433 where
434 F: 'a + Fn(&Sig) -> bool + Send,
435 {
436 let filtered = Stream::new();
437 let filtered_ = filtered.new_same();
438
439 self.observe(move |sig| {
440 if pred(sig) {
441 filtered_.send(sig);
442 }
443 });
444
445 filtered
446 }
447
448 /// Fold all signals flowing out of a stream into a stream of values.
449 pub fn fold<F, A>(&self, value: A, f: F) -> Stream<'a, A>
450 where
451 F: 'a + Fn(A, &Sig) -> A + Send,
452 A: 'a + Send + Sync,
453 {
454 let folded_stream = Stream::new();
455 let folded_stream_ = folded_stream.new_same();
456 let mut boxed = Some(value);
457
458 self.observe(move |sig| {
459 if let Some(value) = boxed.take() {
460 let output = f(value, sig);
461 folded_stream_.send(&output);
462 boxed = Some(output);
463 }
464 });
465
466 folded_stream
467 }
468
469 /// Merge two streams into one.
470 ///
471 /// Merging streams enables you to perform later useful compositions, such as folding the merged
472 /// results.
473 pub fn merge(&self, rhs: &Self) -> Self {
474 let merged = Stream::new();
475 let merged_self = merged.new_same();
476 let merged_rhs = merged.new_same();
477
478 self.observe(move |sig| {
479 merged_self.send(sig);
480 });
481
482 rhs.observe(move |sig| {
483 merged_rhs.send(sig);
484 });
485
486 merged
487 }
488
489 /// Merge two streams into one with incompatible types.
490 ///
491 /// This method performs the same logical operation as:
492 ///
493 /// ```rust
494 /// use reactive_graph_reactive_model_impl::Stream;
495 ///
496 /// let stream_a = Stream::new();
497 /// let stream_b = Stream::new();
498 /// let merged = stream_a.merge(&stream_b.map(String::len));
499 /// ```
500 ///
501 /// However, because it does it in a more optimal way, you are advised to use this combinator
502 /// instead.
503 pub fn merge_with<F, SigRHS>(&self, rhs: &Stream<'a, SigRHS>, adapter: F) -> Self
504 where
505 F: 'a + Fn(&SigRHS) -> Sig + Send,
506 SigRHS: 'a + Send + Sync,
507 {
508 let merged = Stream::new();
509 let merged_self = merged.new_same();
510 let merged_rhs = merged.new_same();
511
512 self.observe(move |sig| {
513 merged_self.send(sig);
514 });
515
516 rhs.observe(move |sig| {
517 merged_rhs.send(&adapter(sig));
518 });
519
520 merged
521 }
522
523 /// Zip two streams with each other.
524 ///
525 /// The resulting stream will output [`Either`] from one or the other stream.
526 pub fn zip<SigRHS>(&self, rhs: &Stream<'a, SigRHS>) -> Stream<'a, Either<Sig, SigRHS>>
527 where
528 Sig: Clone + Send,
529 SigRHS: 'a + Clone + Send + Sync,
530 {
531 let zipped = Stream::new();
532 let zipped_self = zipped.new_same();
533 let zipped_rhs = zipped.new_same();
534
535 self.observe(move |sig| {
536 zipped_self.send(&Either::Left(sig.clone()));
537 });
538
539 rhs.observe(move |sig| {
540 zipped_rhs.send(&Either::Right(sig.clone()));
541 });
542
543 zipped
544 }
545
546 /// Create a pair of entangled streams.
547 ///
548 /// If any of the streams sends a signal, the other one receives it. However, be careful: since
549 /// the signals are defined in terms of each other, it’s quite easy to cause infinite loops if you
550 /// don’t have a well-defined bottom to your recursion. This is why you’re expected to return
551 /// `Option<_>` signals.
552 pub fn entangled<F, G, GSig>(f: F, g: G) -> (Self, Stream<'a, GSig>)
553 where
554 F: 'a + Fn(&Sig) -> Option<GSig> + Send,
555 G: 'a + Fn(&GSig) -> Option<Sig> + Send,
556 GSig: 'a + Send + Sync,
557 {
558 let fs = Stream::new();
559 let gs = Stream::new();
560 let fs_ = fs.new_weak();
561 let gs_ = gs.new_weak();
562
563 fs.observe(move |sig| {
564 if let Some(sig_) = f(sig) {
565 gs_.send(&sig_);
566 }
567 });
568
569 gs.observe(move |sig| {
570 if let Some(sig_) = g(sig) {
571 fs_.send(&sig_);
572 }
573 });
574
575 (fs, gs)
576 }
577
578 /// Sink a stream.
579 ///
580 /// This method allows to _receive_ the signal and extract it out of the stream via a channel.
581 pub fn recv(&self) -> Receiver<Sig>
582 where
583 Sig: Clone,
584 {
585 let (sx, rx) = channel();
586
587 self.observe(move |sig| {
588 let _ = sx.send(sig.clone());
589 });
590
591 rx
592 }
593}
594
595impl<'a, Sig> Default for Stream<'a, Sig>
596where
597 Sig: 'a + Send + Sync,
598{
599 fn default() -> Self {
600 Self::new()
601 }
602}
603
604impl<'a, SigA, SigB> Stream<'a, Either<SigA, SigB>>
605where
606 SigA: 'static + Send + Sync,
607 SigB: 'static + Send + Sync,
608{
609 /// Split a stream of zipped values into two streams.
610 ///
611 /// If the [`Either::Left`] part of the stream emits a signal, it is sent to the first stream.
612 /// If the [`Either::Right`] part of the tream emits, it is sent to the second stream.
613 pub fn unzip(&self) -> (Stream<'a, SigA>, Stream<'a, SigB>) {
614 let a = Stream::new();
615 let a_ = a.new_same();
616 let b = Stream::new();
617 let b_ = b.new_same();
618
619 self.observe(move |sig| match *sig {
620 Either::Left(ref l) => a_.send(l),
621 Either::Right(ref r) => b_.send(r),
622 });
623
624 (a, b)
625 }
626}
627
628#[cfg(test)]
629#[cfg(not(tarpaulin_include))]
630mod tests;