reactive_graph_graphql_schema/subscription/
relation_instance.rs1use std::pin::Pin;
2use std::task::Poll;
3use std::time::Duration;
4
5use crossbeam::channel::Receiver;
6use futures_util::Stream;
7use log::debug;
8use rand::Rng;
9use serde_json::Value;
10
11use reactive_graph_graph::TypeDefinitionGetter;
12use reactive_graph_reactive_model_impl::ReactiveRelation;
13
14pub struct RelationPropertyInstanceStream {
15 relation_instance: ReactiveRelation,
16 property_name: String,
17 handle_id: u128,
18 receiver: Receiver<Value>,
19}
20
21impl RelationPropertyInstanceStream {
22 pub fn new(relation_instance: ReactiveRelation, property_name: String) -> RelationPropertyInstanceStream {
23 debug!(
24 "Opened subscription relation({}__{}__{})[{}]",
25 relation_instance.inbound.id,
26 relation_instance.type_definition(),
27 relation_instance.outbound.id,
28 property_name
29 );
30 let mut rng = rand::rng();
31 let handle_id = rng.random::<u128>();
32 let relation_instance2 = relation_instance.clone();
33 let property_instance = relation_instance2.properties.get(&property_name).unwrap();
34 let (sender, receiver) = crossbeam::channel::unbounded();
35 property_instance.stream.read().unwrap().observe_with_handle(
36 move |value: &Value| {
37 let _ = sender.send(value.clone());
38 },
39 handle_id,
40 );
41 RelationPropertyInstanceStream {
42 relation_instance,
43 property_name,
44 handle_id,
45 receiver,
46 }
47 }
48}
49
50impl Stream for RelationPropertyInstanceStream {
51 type Item = Value;
52
53 fn poll_next(self: Pin<&mut Self>, _context: &mut std::task::Context<'_>) -> Poll<Option<Self::Item>> {
54 match self.receiver.try_recv() {
55 Ok(value) => {
56 std::thread::sleep(Duration::from_millis(10));
57 Poll::Ready(Some(value))
58 }
59 Err(_) => {
60 std::thread::sleep(Duration::from_millis(100));
61 Poll::Ready(None)
62 }
63 }
64 }
65}
66
67impl Drop for RelationPropertyInstanceStream {
68 fn drop(&mut self) {
69 debug!(
70 "Closing subscription relation({}__{}__{})[{}]",
71 self.relation_instance.inbound.id,
72 self.relation_instance.type_definition(),
73 self.relation_instance.outbound.id,
74 self.property_name.clone()
75 );
76 let property_instance = self.relation_instance.properties.get(self.property_name.as_str()).unwrap();
77 property_instance.stream.read().unwrap().remove(self.handle_id);
78 }
79}