reactive_graph_graphql_schema/subscription/
entity_instance.rs1use std::pin::Pin;
2use std::task::Poll;
3use std::time::Duration;
4
5use crossbeam::channel::Receiver;
6use futures_util::Stream;
7use log::debug;
8use rand::Rng;
9use serde_json::Value;
10
11use reactive_graph_reactive_model_impl::ReactiveEntity;
12
13pub struct EntityPropertyInstanceStream {
14 entity_instance: ReactiveEntity,
15 property_name: String,
16 handle_id: u128,
17 receiver: Receiver<Value>,
18}
19
20impl EntityPropertyInstanceStream {
21 pub fn new(entity_instance: ReactiveEntity, property_name: String) -> EntityPropertyInstanceStream {
22 debug!("Opened subscription entity({})[{}]", entity_instance.id, property_name);
23 let mut rng = rand::rng();
24 let handle_id = rng.random::<u128>();
25 let entity_instance2 = entity_instance.clone();
26 let property_instance = entity_instance2.properties.get(&property_name).unwrap();
27 let (sender, receiver) = crossbeam::channel::unbounded();
28 property_instance.stream.read().unwrap().observe_with_handle(
29 move |value: &Value| {
30 let _ = sender.send(value.clone());
31 },
32 handle_id,
33 );
34 EntityPropertyInstanceStream {
35 entity_instance,
36 property_name,
37 handle_id,
38 receiver,
39 }
40 }
41}
42
43impl Stream for EntityPropertyInstanceStream {
44 type Item = Value;
45
46 fn poll_next(self: Pin<&mut Self>, _context: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
47 match self.receiver.try_recv() {
48 Ok(value) => {
49 std::thread::sleep(Duration::from_millis(10));
50 Poll::Ready(Some(value))
51 }
52 Err(_) => {
53 std::thread::sleep(Duration::from_millis(100));
54 Poll::Ready(None)
55 }
56 }
57 }
58}
59
60impl Drop for EntityPropertyInstanceStream {
61 fn drop(&mut self) {
62 debug!("Closing subscription entity({})[{}]", self.entity_instance.id, self.property_name.clone());
63 let property_instance = self.entity_instance.properties.get(self.property_name.as_str()).unwrap();
64 property_instance.stream.read().unwrap().remove(self.handle_id);
65 }
66}