reactive_graph_graphql_schema/subscription/
relation_instance.rs

1use std::pin::Pin;
2use std::task::Poll;
3use std::time::Duration;
4
5use crossbeam::channel::Receiver;
6use futures_util::Stream;
7use log::debug;
8use rand::Rng;
9use serde_json::Value;
10
11use reactive_graph_graph::TypeDefinitionGetter;
12use reactive_graph_reactive_model_impl::ReactiveRelation;
13
14pub struct RelationPropertyInstanceStream {
15    relation_instance: ReactiveRelation,
16    property_name: String,
17    handle_id: u128,
18    receiver: Receiver<Value>,
19}
20
21impl RelationPropertyInstanceStream {
22    pub fn new(relation_instance: ReactiveRelation, property_name: String) -> RelationPropertyInstanceStream {
23        debug!(
24            "Opened subscription relation({}__{}__{})[{}]",
25            relation_instance.inbound.id,
26            relation_instance.type_definition(),
27            relation_instance.outbound.id,
28            property_name
29        );
30        let mut rng = rand::rng();
31        let handle_id = rng.random::<u128>();
32        let relation_instance2 = relation_instance.clone();
33        let property_instance = relation_instance2.properties.get(&property_name).unwrap();
34        let (sender, receiver) = crossbeam::channel::unbounded();
35        property_instance.stream.read().unwrap().observe_with_handle(
36            move |value: &Value| {
37                let _ = sender.send(value.clone());
38            },
39            handle_id,
40        );
41        RelationPropertyInstanceStream {
42            relation_instance,
43            property_name,
44            handle_id,
45            receiver,
46        }
47    }
48}
49
50impl Stream for RelationPropertyInstanceStream {
51    type Item = Value;
52
53    fn poll_next(self: Pin<&mut Self>, _context: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
54        match self.receiver.try_recv() {
55            Ok(value) => {
56                std::thread::sleep(Duration::from_millis(10));
57                Poll::Ready(Some(value))
58            }
59            Err(_) => {
60                std::thread::sleep(Duration::from_millis(100));
61                Poll::Ready(None)
62            }
63        }
64    }
65}
66
67impl Drop for RelationPropertyInstanceStream {
68    fn drop(&mut self) {
69        debug!(
70            "Closing subscription relation({}__{}__{})[{}]",
71            self.relation_instance.inbound.id,
72            self.relation_instance.type_definition(),
73            self.relation_instance.outbound.id,
74            self.property_name.clone()
75        );
76        let property_instance = self.relation_instance.properties.get(self.property_name.as_str()).unwrap();
77        property_instance.stream.read().unwrap().remove(self.handle_id);
78    }
79}