reactive_graph_reactive_model_impl/frp/
mod.rs

1//! The bidule FRP crate.
2//!
3//! This crate provides few simple primitives to write FRP-driven programs. Everything revolves
4//! around the concept of a [`Stream`].
5//!
6//! # Streams
7//!
8//! A [`Stream`] is a *stream of typed signals*. A stream of signals will get a *signal* as input
9//! and will broadcast it downwards. You can compose streams with each other with very simple
10//! combinators, such as `map`, `filter`, `filter_map`, `zip`, `unzip`, `merge`, `fold`, etc. (non-exhaustive list).
11//!
12//! ## Creating streams and send signals
13//!
14//! Streams are typed. You can use type inference or give them an explicit type:
15//!
16//! ```rust
17//! use reactive_graph_reactive_model_impl::Stream;
18//!
19//! let my_stream: Stream<i32> = Stream::new();
20//! ```
21//!
22//! That’s all you need to create a stream. A stream represent a value that will be flowing in *at
23//! some time*.
24//!
25//! > Even though it’s not strictly the same thing, you can see a similitude with
26//! > [futures](https://crates.io/crates/futures).
27//!
28//! When you’re ready to send signals, just call the `send` function:
29//!
30//! ```rust
31//! use reactive_graph_reactive_model_impl::Stream;
32//!
33//! let my_stream: Stream<i32> = Stream::new();
34//!
35//! my_stream.send(&1);
36//! my_stream.send(&2);
37//! my_stream.send(&3);
38//! ```
39//!
40//! ## Observing signals
41//!
42//! A single stream like that one won’t do much – actually, it’ll do nothing. The first thing we
43//! might want to do is to subscribe a closure to do something when a signal is emitted. This is
44//! done with the [`Stream::observe`] function.
45//!
46//! ```rust
47//! use reactive_graph_reactive_model_impl::Stream;
48//!
49//! let my_stream: Stream<i32> = Stream::new();
50//!
51//! my_stream.observe(|sig| {
52//!   // print the signal on stdout each time it’s flowing in
53//!   println!("signal: {:?}", sig);
54//! });
55//!
56//! my_stream.send(&1);
57//! my_stream.send(&2);
58//! my_stream.send(&3);
59//! ```
60//!
61//! ## FRP basics
62//!
63//! However, FRP is not about callbacks. It’s actually the opposite, to be honest. We try to reduce
64//! the use of callbacks as much as possible. FRP solves this by inversing the way you must work:
65//! instead of subscribing callbacks to react to something, you transform that something to create
66//! new values or objects. This is akin to the kind of transformations you do with `Future`.
67//!
68//! Let’s get our feet wet: let’s create a new stream that will only emit signals for even values:
69//!
70//! ```rust
71//! use reactive_graph_reactive_model_impl::Stream;
72//!
73//! let int_stream: Stream<i32> = Stream::new();
74//! let even_stream = int_stream.filter(|x| x % 2 == 0);
75//! ```
76//!
77//! `even_stream` has type `Stream<i32>` and will only emit signals when the input signal is `even`.
78//!
79//! Let’s try something more complicated: on those signals, if the value is less or equal to 10,
80//! output `"Hello, world!"`; otherwise, output `"See you!"`.
81//!
82//! ```rust
83//! use reactive_graph_reactive_model_impl::Stream;
84//!
85//! let int_stream: Stream<i32> = Stream::new();
86//! let even_stream = int_stream.filter(|x| x % 2 == 0);
87//! let str_stream = even_stream.map(|x| if *x <= 10 { "Hello, world!" } else { "See you!" });
88//! ```
89//!
90//! This is really easy; no trap involved.
91//!
92//! Ok, let’s try something else. Some kind of a *Hello world* for FRP.
93//!
94//! ```rust
95//! use reactive_graph_reactive_model_impl::Stream;
96//!
97//! enum Button {
98//!   Pressed,
99//!   Released
100//! }
101//!
102//! fn unbuttonify(button: &Button, v: i32) -> Option<i32> {
103//!   match *button {
104//!     Button::Released => Some(v),
105//!     _ => None
106//!   }
107//! }
108//!
109//! let minus = Stream::new();
110//! let plus = Stream::new();
111//! let counter =
112//!   minus.filter_map(|b| unbuttonify(b, -1))
113//!        .merge(&plus.filter_map(|b| unbuttonify(b, 1)))
114//!        .fold(0, |a, x| a + x);
115//! ```
116//!
117//! In this snippet, we have two buttons: `minus` and `plus`. If we hit the `minus` button, we want
118//! a counter to be decremented and if we hit the `plus` button, the counter must increment.
119//!
120//! FRP solves that problem by expressing `counter` in terms of both `minus` and `plus`. The first
121//! thing we do is to map a number on the stream that broadcasts button signals. Whenever that
122//! signal is a `Button::Released`, we return a given number. For `minus`, we return `-1` and for
123//! `plus`, we return `1` – or `+1`, it’s the same thing. That gives us two new streams. Let’s see
124//! the types to have a deeper understanding:
125//!
126//! - `minus: Stream<Button>`
127//! - `plus: Stream<Button>`
128//! - `minus.filter_map(|b| unbuttonify(b, -1)): Stream<i32>`
129//! - `plus.filter_map(|b| unbuttonify(b, 1)): Stream<i32>`
130//!
131//! The `merge` method is very simple: it takes two streams that emit the same type of signals and
132//! merges them into a single stream that will broadcasts both the signals:
133//!
134//! - `minus.filter_map(|b| unbuttonify(b, -1)).merge(plus.filter_map(|b| unbuttonify(b, 1)): Stream<i32>): Stream<i32>`
135//!
136//! The next and final step is to `fold` those `i32` into the final value of the counter by applying
137//! successive additions. This is done with the `fold` method, that takes the initial value – in the
138//! case of a counter, it’s `0` – and the function to accumulate, with the accumulator as first
139//! argument and the iterated value as second argument.
140//!
141//! The resulting stream, which type is `Stream<i32>`, will then contain the value of the counter.
142//! You can test it by sending `Button` signals on both `minus` and `plus`: the resulting signal
143//! in `counter` will be correctly decrementing or incrementing.
144//!
145//! There exist several more, interesting combinators to work with your streams. For instance, if
146//! you don’t want to map a function over two streams to make them compatible with each other – they
147//! have different types, you can still perform some kind of a merge. That operation is called a
148//! `zip` and the resulting stream will yield either the value from the first – left – stream or the
149//! value of the other – right – as soon as a signal is emitted. Its dual method is called `unzip`
150//! and will split a stream apart into two streams if it’s a zipped stream. See `Either` for further
151//! details.
152//!
153//! ## Sinking
154//!
155//! *Sinking* is the action to consume the signals of a stream and collect them. The current
156//! implementation uses non-blocking buffering when sending signals, and reading is up to you: the
157//! stream will collect the output signals in a buffer you can read via the
158//! [Iterator](https://doc.rust-lang.org/std/iter/trait.Iterator.html) trait. For instance,
159//! non-blocking reads:
160//!
161//! ```rust
162//! use reactive_graph_reactive_model_impl::Stream;
163//!
164//! enum Button {
165//!   Pressed,
166//!   Released
167//! }
168//!
169//! fn unbuttonify(button: &Button, v: i32) -> Option<i32> {
170//!   match *button {
171//!     Button::Released => Some(v),
172//!     _ => None
173//!   }
174//! }
175//!
176//! let minus = Stream::new();
177//! let plus = Stream::new();
178//! let counter =
179//!   minus.filter_map(|b| unbuttonify(b, -1))
180//!        .merge(&plus.filter_map(|b| unbuttonify(b, 1)))
181//!        .fold(0, |a, x| a + x);
182//!
183//! let rx = counter.recv();
184//!
185//! // do something with minus and plus
186//! // …
187//!
188//! for v in rx.try_iter() {
189//!   println!("read a new value of the counter: {}", v);
190//! }
191//! ```
192
193use dashmap::DashMap;
194// use rayon::prelude::*;
195use std::hash::Hash;
196use std::ops::Deref;
197use std::sync::Arc;
198use std::sync::RwLock;
199use std::sync::Weak;
200use std::sync::mpsc::Receiver;
201use std::sync::mpsc::channel;
202use uuid::Uuid;
203
204/// The subscriber stores the handle_id and the closure.
205type Subscriber<'a, Sig> = dyn FnMut(&Sig) + Send + 'a;
206
207/// List of subscribers to a stream.
208///
209/// When a stream emit a value, it streams the value down to all subscribers.
210type BoxedSubscriber<'a, Sig> = Box<Subscriber<'a, Sig>>;
211type Subscribers<'a, Sig> = DashMap<u128, BoxedSubscriber<'a, Sig>>;
212
213/// Dependent streams that will receive values.
214///
215/// Most streams will own their subscribers. However, in some cases, it’s required not to own
216/// the subscribers to break rc cycles.
217enum DependentStreams<'a, Sig> {
218    Own(Arc<Subscribers<'a, Sig>>),
219    Weak(Weak<Subscribers<'a, Sig>>),
220}
221
222/// Either one or another type.
223///
224/// This type is especially useful for zipping and unzipping streams. If a stream has a type like
225/// `Stream<Either<A, B>>`, it means you can unzip it and get two streams: `Stream<A>` and
226/// `Stream<B>`.
227#[derive(Clone, Debug, Eq, Hash, PartialEq)]
228pub enum Either<A, B> {
229    /// Left part of the merger.
230    Left(A),
231    /// Right part of the merger.
232    Right(B),
233}
234
235/// A stream of signals.
236///
237/// A stream represents a composable signal producer. When you decide to send a signal down a
238/// stream, any other streams composed with that first stream will also receive the signal. This
239/// enables to construct more interesting and complex streams by composing them via, for instance,
240/// [`Stream::map`], [`Stream::filter_map`], [`Stream::filter`], [`Stream::fold`],
241/// [`Stream::merge`], [`Stream::zip`], etc.
242pub struct Stream<'a, Sig> {
243    subscribers: RwLock<DependentStreams<'a, Sig>>,
244}
245
246/// TODO: This solves many problems, but is it really OK?
247unsafe impl<Sig> Send for Stream<'_, Sig> {}
248
249unsafe impl<Sig> Sync for Stream<'_, Sig> {}
250
251impl<'a, Sig> Stream<'a, Sig>
252where
253    Sig: 'a + Send + Sync,
254{
255    /// Create a new stream.
256    pub fn new() -> Self {
257        let subscribers = DependentStreams::Own(Arc::new(DashMap::new()));
258        Stream {
259            subscribers: RwLock::new(subscribers),
260        }
261    }
262
263    /// Create a new version of this stream by behaving the same way as the input reference (if it’s
264    /// an owned pointer, it clones ownership; if it’s a weak pointer, it clone the weak pointer).
265    fn new_same(&self) -> Self {
266        let guard = self.subscribers.read().unwrap();
267        let subscribers = RwLock::new(match guard.deref() {
268            DependentStreams::Own(rc) => DependentStreams::Own(rc.clone()),
269            DependentStreams::Weak(weak) => DependentStreams::Weak(weak.clone()),
270        });
271
272        Stream { subscribers }
273    }
274
275    /// Create new, non-owning version of this stream.
276    fn new_weak(&self) -> Self {
277        let guard = self.subscribers.read().unwrap();
278        let subscribers = RwLock::new(match guard.deref() {
279            DependentStreams::Own(rc) => DependentStreams::Weak(Arc::downgrade(rc)),
280            DependentStreams::Weak(weak) => DependentStreams::Weak(weak.clone()),
281        });
282
283        Stream { subscribers }
284    }
285
286    /// Observe a stream signal output flowing out of the stream.
287    ///
288    /// Do not abuse this function, as its primary use is to build other combinators.
289    #[allow(clippy::readonly_write_lock)]
290    pub fn observe<F>(&self, subscriber: F) -> u128
291    where
292        F: 'a + FnMut(&Sig) + Send,
293    {
294        let handle_id = Uuid::new_v4().as_u128();
295        let guard = self.subscribers.write().unwrap();
296        match guard.deref() {
297            DependentStreams::Own(subscribers) => {
298                subscribers.insert(handle_id, Box::new(subscriber));
299            }
300            DependentStreams::Weak(weak) => {
301                if let Some(subscribers) = weak.upgrade() {
302                    subscribers.insert(handle_id, Box::new(subscriber));
303                }
304            }
305        }
306        handle_id
307    }
308
309    /// Observe a stream signal output flowing out of the stream.
310    ///
311    /// Do not abuse this function, as its primary use is to build other combinators.
312    #[allow(clippy::readonly_write_lock)]
313    pub fn observe_with_handle<F>(&self, subscriber: F, handle_id: u128)
314    where
315        F: 'a + FnMut(&Sig) + Send,
316    {
317        let guard = self.subscribers.write().unwrap();
318        match guard.deref() {
319            DependentStreams::Own(subscribers) => {
320                subscribers.insert(handle_id, Box::new(subscriber));
321            }
322            DependentStreams::Weak(weak) => {
323                if let Some(subscribers) = weak.upgrade() {
324                    subscribers.insert(handle_id, Box::new(subscriber));
325                }
326            }
327        }
328    }
329
330    /// Removes the subscriber with the given handle_id.
331    #[allow(clippy::readonly_write_lock)]
332    pub fn remove(&self, handle_id: u128) {
333        let guard = self.subscribers.write().unwrap();
334        match guard.deref() {
335            DependentStreams::Own(subscribers) => {
336                subscribers.remove(&handle_id);
337            }
338
339            DependentStreams::Weak(weak) => {
340                if let Some(subscribers) = weak.upgrade() {
341                    subscribers.remove(&handle_id);
342                }
343            }
344        }
345    }
346
347    /// Removes all subscribers without dropping the stream.
348    #[allow(clippy::readonly_write_lock)]
349    pub fn clear(&self) {
350        let guard = self.subscribers.write().unwrap();
351        match guard.deref() {
352            DependentStreams::Own(subscribers) => {
353                subscribers.clear();
354            }
355
356            DependentStreams::Weak(weak) => {
357                if let Some(subscribers) = weak.upgrade() {
358                    subscribers.clear();
359                }
360            }
361        }
362    }
363
364    /// Send a signal down the stream.
365    #[allow(clippy::readonly_write_lock)]
366    pub fn send(&self, signal: &Sig) {
367        let guard = self.subscribers.write().unwrap();
368        match guard.deref() {
369            DependentStreams::Own(subscribers) => {
370                // subscribers.par_iter_mut().for_each(|mut subscriber| {
371                //     subscriber(&signal);
372                // });
373                subscribers.iter_mut().for_each(|mut subscriber| {
374                    subscriber(signal);
375                });
376            }
377
378            DependentStreams::Weak(weak) => {
379                if let Some(subscribers) = weak.upgrade() {
380                    // subscribers.deref().par_iter_mut().for_each(|mut subscriber| {
381                    //     subscriber(&signal);
382                    // });
383                    subscribers.iter_mut().for_each(|mut subscriber| {
384                        subscriber(signal);
385                    });
386                }
387            }
388        }
389    }
390
391    /// Map any signals flowing out of a stream.
392    ///
393    /// Please note that this function is total: you cannot ignore signals. Even if you map
394    /// *uninteresting signals* to `None`, you’ll still compose signals for those. If you're interested
395    /// in filtering signals while mapping, have a look at the [`Stream::filter_map`] function.
396    pub fn map<F, OutSig>(&self, f: F) -> Stream<'a, OutSig>
397    where
398        F: 'a + Fn(&Sig) -> OutSig + Send,
399        OutSig: 'a + Send + Sync,
400    {
401        let mapped_stream = Stream::new();
402        let mapped_stream_ = mapped_stream.new_same();
403
404        self.observe(move |sig| {
405            mapped_stream_.send(&f(sig));
406        });
407
408        mapped_stream
409    }
410
411    /// Filter and map signals flowing out of a stream.
412    ///
413    /// If you’re not interested in a specific signal, you can emit [`None`]: no signal will be sent.
414    pub fn filter_map<F, OutSig>(&self, f: F) -> Stream<'a, OutSig>
415    where
416        F: 'a + Fn(&Sig) -> Option<OutSig> + Send,
417        OutSig: 'a + Send + Sync,
418    {
419        let mapped_stream = Stream::new();
420        let mapped_stream_ = mapped_stream.new_same();
421
422        self.observe(move |sig| {
423            if let Some(ref mapped_sig) = f(sig) {
424                mapped_stream_.send(mapped_sig);
425            }
426        });
427
428        mapped_stream
429    }
430
431    /// Filter the signals flowing out of a stream with a predicate.
432    pub fn filter<F>(&self, pred: F) -> Self
433    where
434        F: 'a + Fn(&Sig) -> bool + Send,
435    {
436        let filtered = Stream::new();
437        let filtered_ = filtered.new_same();
438
439        self.observe(move |sig| {
440            if pred(sig) {
441                filtered_.send(sig);
442            }
443        });
444
445        filtered
446    }
447
448    /// Fold all signals flowing out of a stream into a stream of values.
449    pub fn fold<F, A>(&self, value: A, f: F) -> Stream<'a, A>
450    where
451        F: 'a + Fn(A, &Sig) -> A + Send,
452        A: 'a + Send + Sync,
453    {
454        let folded_stream = Stream::new();
455        let folded_stream_ = folded_stream.new_same();
456        let mut boxed = Some(value);
457
458        self.observe(move |sig| {
459            if let Some(value) = boxed.take() {
460                let output = f(value, sig);
461                folded_stream_.send(&output);
462                boxed = Some(output);
463            }
464        });
465
466        folded_stream
467    }
468
469    /// Merge two streams into one.
470    ///
471    /// Merging streams enables you to perform later useful compositions, such as folding the merged
472    /// results.
473    pub fn merge(&self, rhs: &Self) -> Self {
474        let merged = Stream::new();
475        let merged_self = merged.new_same();
476        let merged_rhs = merged.new_same();
477
478        self.observe(move |sig| {
479            merged_self.send(sig);
480        });
481
482        rhs.observe(move |sig| {
483            merged_rhs.send(sig);
484        });
485
486        merged
487    }
488
489    /// Merge two streams into one with incompatible types.
490    ///
491    /// This method performs the same logical operation as:
492    ///
493    /// ```rust
494    /// use reactive_graph_reactive_model_impl::Stream;
495    ///
496    /// let stream_a = Stream::new();
497    /// let stream_b = Stream::new();
498    /// let merged = stream_a.merge(&stream_b.map(String::len));
499    /// ```
500    ///
501    /// However, because it does it in a more optimal way, you are advised to use this combinator
502    /// instead.
503    pub fn merge_with<F, SigRHS>(&self, rhs: &Stream<'a, SigRHS>, adapter: F) -> Self
504    where
505        F: 'a + Fn(&SigRHS) -> Sig + Send,
506        SigRHS: 'a + Send + Sync,
507    {
508        let merged = Stream::new();
509        let merged_self = merged.new_same();
510        let merged_rhs = merged.new_same();
511
512        self.observe(move |sig| {
513            merged_self.send(sig);
514        });
515
516        rhs.observe(move |sig| {
517            merged_rhs.send(&adapter(sig));
518        });
519
520        merged
521    }
522
523    /// Zip two streams with each other.
524    ///
525    /// The resulting stream will output [`Either`] from one or the other stream.
526    pub fn zip<SigRHS>(&self, rhs: &Stream<'a, SigRHS>) -> Stream<'a, Either<Sig, SigRHS>>
527    where
528        Sig: Clone + Send,
529        SigRHS: 'a + Clone + Send + Sync,
530    {
531        let zipped = Stream::new();
532        let zipped_self = zipped.new_same();
533        let zipped_rhs = zipped.new_same();
534
535        self.observe(move |sig| {
536            zipped_self.send(&Either::Left(sig.clone()));
537        });
538
539        rhs.observe(move |sig| {
540            zipped_rhs.send(&Either::Right(sig.clone()));
541        });
542
543        zipped
544    }
545
546    /// Create a pair of entangled streams.
547    ///
548    /// If any of the streams sends a signal, the other one receives it. However, be careful: since
549    /// the signals are defined in terms of each other, it’s quite easy to cause infinite loops if you
550    /// don’t have a well-defined bottom to your recursion. This is why you’re expected to return
551    /// `Option<_>` signals.
552    pub fn entangled<F, G, GSig>(f: F, g: G) -> (Self, Stream<'a, GSig>)
553    where
554        F: 'a + Fn(&Sig) -> Option<GSig> + Send,
555        G: 'a + Fn(&GSig) -> Option<Sig> + Send,
556        GSig: 'a + Send + Sync,
557    {
558        let fs = Stream::new();
559        let gs = Stream::new();
560        let fs_ = fs.new_weak();
561        let gs_ = gs.new_weak();
562
563        fs.observe(move |sig| {
564            if let Some(sig_) = f(sig) {
565                gs_.send(&sig_);
566            }
567        });
568
569        gs.observe(move |sig| {
570            if let Some(sig_) = g(sig) {
571                fs_.send(&sig_);
572            }
573        });
574
575        (fs, gs)
576    }
577
578    /// Sink a stream.
579    ///
580    /// This method allows to _receive_ the signal and extract it out of the stream via a channel.
581    pub fn recv(&self) -> Receiver<Sig>
582    where
583        Sig: Clone,
584    {
585        let (sx, rx) = channel();
586
587        self.observe(move |sig| {
588            let _ = sx.send(sig.clone());
589        });
590
591        rx
592    }
593}
594
595impl<'a, Sig> Default for Stream<'a, Sig>
596where
597    Sig: 'a + Send + Sync,
598{
599    fn default() -> Self {
600        Self::new()
601    }
602}
603
604impl<'a, SigA, SigB> Stream<'a, Either<SigA, SigB>>
605where
606    SigA: 'static + Send + Sync,
607    SigB: 'static + Send + Sync,
608{
609    /// Split a stream of zipped values into two streams.
610    ///
611    /// If the [`Either::Left`] part of the stream emits a signal, it is sent to the first stream.
612    /// If the [`Either::Right`] part of the tream emits, it is sent to the second stream.
613    pub fn unzip(&self) -> (Stream<'a, SigA>, Stream<'a, SigB>) {
614        let a = Stream::new();
615        let a_ = a.new_same();
616        let b = Stream::new();
617        let b_ = b.new_same();
618
619        self.observe(move |sig| match *sig {
620            Either::Left(ref l) => a_.send(l),
621            Either::Right(ref r) => b_.send(r),
622        });
623
624        (a, b)
625    }
626}
627
628#[cfg(test)]
629#[cfg(not(tarpaulin_include))]
630mod tests;