reactive_graph_graphql_schema/subscription/
mod.rs

1use std::sync::Arc;
2use std::time::Duration;
3
4use async_graphql::Context;
5use async_graphql::Result;
6use async_graphql::Subscription;
7use async_graphql::async_stream;
8use futures_util::Stream;
9use futures_util::StreamExt;
10use serde::Serialize;
11use serde_json::Value;
12use uuid::Uuid;
13
14pub use entity_instance::*;
15pub use relation_instance::*;
16
17use crate::mutation::GraphQLRelationInstanceId;
18use crate::query::GraphQLPropertyInstance;
19use reactive_graph_reactive_service_api::ReactiveEntityManager;
20use reactive_graph_reactive_service_api::ReactiveRelationManager;
21
22pub mod entity_instance;
23pub mod relation_instance;
24
25pub struct ReactiveGraphSubscription;
26
27/// Subscriptions for the reactive property instances.
28#[Subscription(name = "Subscription")]
29impl ReactiveGraphSubscription {
30    async fn entity(
31        &self,
32        context: &Context<'_>,
33        #[graphql(desc = "The uuid of the entity instance")] id: Option<Uuid>,
34        #[graphql(desc = "The label of the entity instance")] label: Option<String>,
35        #[graphql(desc = "The name of the property")] property_name: String,
36    ) -> Result<impl Stream<Item = GraphQLPropertyInstance>> {
37        match context.data::<Arc<dyn ReactiveEntityManager + Send + Sync>>() {
38            Ok(entity_instance_manager) => {
39                let entity_instance;
40                if let Some(id) = id {
41                    entity_instance = entity_instance_manager.get(id);
42                } else if let Some(label) = label {
43                    entity_instance = entity_instance_manager.get_by_label(label.as_str());
44                } else {
45                    return Err("Either id or label must be given!".into());
46                }
47                match entity_instance {
48                    Some(entity_instance) => {
49                        if !entity_instance.properties.contains_key(&property_name) {
50                            return Err("Error: property by name not found".into());
51                        }
52                        let entity_ty = entity_instance.ty.clone();
53                        let mut stream = EntityPropertyInstanceStream::new(entity_instance, property_name.clone());
54
55                        Ok(async_stream::stream! {
56                            loop {
57                                match stream.next().await {
58                                    Some(value) => {
59                                        futures_timer::Delay::new(Duration::from_millis(10)).await;
60                                        yield GraphQLPropertyInstance::new_entity_property(entity_ty.clone(), property_name.clone(), value.clone());
61                                    }
62                                    None => {
63                                        futures_timer::Delay::new(Duration::from_millis(100)).await;
64                                    }
65                                };
66                            }
67                        })
68                    }
69                    None => Err("Error: id not found".into()),
70                }
71            }
72            Err(_) => Err("Error: REIM".into()),
73        }
74    }
75
76    async fn relation(
77        &self,
78        context: &Context<'_>,
79        relation_instance_id: GraphQLRelationInstanceId,
80        #[graphql(desc = "The name of the property")] property_name: String,
81    ) -> Result<impl Stream<Item = GraphQLPropertyInstance>> {
82        match context.data::<Arc<dyn ReactiveRelationManager + Send + Sync>>() {
83            Ok(relation_instance_manager) => match relation_instance_manager.get(&relation_instance_id.into()) {
84                Some(relation_instance) => {
85                    if !relation_instance.properties.contains_key(&property_name) {
86                        return Err("Error: property by name not found".into());
87                    }
88                    let relation_ty = relation_instance.relation_type_id();
89                    let mut stream = RelationPropertyInstanceStream::new(relation_instance, property_name.clone());
90
91                    Ok(async_stream::stream! {
92                        loop {
93                            match stream.next().await {
94                                Some(value) => {
95                                    futures_timer::Delay::new(Duration::from_millis(10)).await;
96                                    yield GraphQLPropertyInstance::new_relation_property(relation_ty.clone(), property_name.clone(), value.clone());
97                                }
98                                None => {
99                                    futures_timer::Delay::new(Duration::from_millis(100)).await;
100                                }
101                            };
102                        }
103                    })
104                }
105                None => Err("Error: id not found".into()),
106            },
107            Err(_) => Err("Error: REIM".into()),
108        }
109    }
110}
111
112#[derive(Serialize)]
113pub struct GraphQLPropertyValueChanged {
114    property_name: String,
115    value: Value,
116}
117
118impl GraphQLPropertyValueChanged {
119    pub fn new(property_name: String, value: Value) -> Self {
120        GraphQLPropertyValueChanged { property_name, value }
121    }
122}
123
124impl From<GraphQLPropertyValueChanged> for Value {
125    fn from(value_changed: GraphQLPropertyValueChanged) -> Self {
126        serde_json::to_value(value_changed).unwrap()
127    }
128}