reactive_graph_graphql_schema/subscription/
entity_instance.rs

1use std::pin::Pin;
2use std::task::Poll;
3use std::time::Duration;
4
5use crossbeam::channel::Receiver;
6use futures_util::Stream;
7use log::debug;
8use rand::Rng;
9use serde_json::Value;
10
11use reactive_graph_reactive_model_impl::ReactiveEntity;
12
13pub struct EntityPropertyInstanceStream {
14    entity_instance: ReactiveEntity,
15    property_name: String,
16    handle_id: u128,
17    receiver: Receiver<Value>,
18}
19
20impl EntityPropertyInstanceStream {
21    pub fn new(entity_instance: ReactiveEntity, property_name: String) -> EntityPropertyInstanceStream {
22        debug!("Opened subscription entity({})[{}]", entity_instance.id, property_name);
23        let mut rng = rand::rng();
24        let handle_id = rng.random::<u128>();
25        let entity_instance2 = entity_instance.clone();
26        let property_instance = entity_instance2.properties.get(&property_name).unwrap();
27        let (sender, receiver) = crossbeam::channel::unbounded();
28        property_instance.stream.read().unwrap().observe_with_handle(
29            move |value: &Value| {
30                let _ = sender.send(value.clone());
31            },
32            handle_id,
33        );
34        EntityPropertyInstanceStream {
35            entity_instance,
36            property_name,
37            handle_id,
38            receiver,
39        }
40    }
41}
42
43impl Stream for EntityPropertyInstanceStream {
44    type Item = Value;
45
46    fn poll_next(self: Pin<&mut Self>, _context: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
47        match self.receiver.try_recv() {
48            Ok(value) => {
49                std::thread::sleep(Duration::from_millis(10));
50                Poll::Ready(Some(value))
51            }
52            Err(_) => {
53                std::thread::sleep(Duration::from_millis(100));
54                Poll::Ready(None)
55            }
56        }
57    }
58}
59
60impl Drop for EntityPropertyInstanceStream {
61    fn drop(&mut self) {
62        debug!("Closing subscription entity({})[{}]", self.entity_instance.id, self.property_name.clone());
63        let property_instance = self.entity_instance.properties.get(self.property_name.as_str()).unwrap();
64        property_instance.stream.read().unwrap().remove(self.handle_id);
65    }
66}