reactive_graph_graphql_schema/subscription/
mod.rs1use std::sync::Arc;
2use std::time::Duration;
3
4use async_graphql::Context;
5use async_graphql::Result;
6use async_graphql::Subscription;
7use async_graphql::async_stream;
8use futures_util::Stream;
9use futures_util::StreamExt;
10use serde::Serialize;
11use serde_json::Value;
12use uuid::Uuid;
13
14pub use entity_instance::*;
15pub use relation_instance::*;
16
17use crate::mutation::GraphQLRelationInstanceId;
18use crate::query::GraphQLPropertyInstance;
19use reactive_graph_reactive_service_api::ReactiveEntityManager;
20use reactive_graph_reactive_service_api::ReactiveRelationManager;
21
22pub mod entity_instance;
23pub mod relation_instance;
24
25pub struct ReactiveGraphSubscription;
26
27#[Subscription(name = "Subscription")]
29impl ReactiveGraphSubscription {
30 async fn entity(
31 &self,
32 context: &Context<'_>,
33 #[graphql(desc = "The uuid of the entity instance")] id: Option<Uuid>,
34 #[graphql(desc = "The label of the entity instance")] label: Option<String>,
35 #[graphql(desc = "The name of the property")] property_name: String,
36 ) -> Result<impl Stream<Item = GraphQLPropertyInstance>> {
37 match context.data::<Arc<dyn ReactiveEntityManager + Send + Sync>>() {
38 Ok(entity_instance_manager) => {
39 let entity_instance;
40 if let Some(id) = id {
41 entity_instance = entity_instance_manager.get(id);
42 } else if let Some(label) = label {
43 entity_instance = entity_instance_manager.get_by_label(label.as_str());
44 } else {
45 return Err("Either id or label must be given!".into());
46 }
47 match entity_instance {
48 Some(entity_instance) => {
49 if !entity_instance.properties.contains_key(&property_name) {
50 return Err("Error: property by name not found".into());
51 }
52 let entity_ty = entity_instance.ty.clone();
53 let mut stream = EntityPropertyInstanceStream::new(entity_instance, property_name.clone());
54
55 Ok(async_stream::stream! {
56 loop {
57 match stream.next().await {
58 Some(value) => {
59 futures_timer::Delay::new(Duration::from_millis(10)).await;
60 yield GraphQLPropertyInstance::new_entity_property(entity_ty.clone(), property_name.clone(), value.clone());
61 }
62 None => {
63 futures_timer::Delay::new(Duration::from_millis(100)).await;
64 }
65 };
66 }
67 })
68 }
69 None => Err("Error: id not found".into()),
70 }
71 }
72 Err(_) => Err("Error: REIM".into()),
73 }
74 }
75
76 async fn relation(
77 &self,
78 context: &Context<'_>,
79 relation_instance_id: GraphQLRelationInstanceId,
80 #[graphql(desc = "The name of the property")] property_name: String,
81 ) -> Result<impl Stream<Item = GraphQLPropertyInstance>> {
82 match context.data::<Arc<dyn ReactiveRelationManager + Send + Sync>>() {
83 Ok(relation_instance_manager) => match relation_instance_manager.get(&relation_instance_id.into()) {
84 Some(relation_instance) => {
85 if !relation_instance.properties.contains_key(&property_name) {
86 return Err("Error: property by name not found".into());
87 }
88 let relation_ty = relation_instance.relation_type_id();
89 let mut stream = RelationPropertyInstanceStream::new(relation_instance, property_name.clone());
90
91 Ok(async_stream::stream! {
92 loop {
93 match stream.next().await {
94 Some(value) => {
95 futures_timer::Delay::new(Duration::from_millis(10)).await;
96 yield GraphQLPropertyInstance::new_relation_property(relation_ty.clone(), property_name.clone(), value.clone());
97 }
98 None => {
99 futures_timer::Delay::new(Duration::from_millis(100)).await;
100 }
101 };
102 }
103 })
104 }
105 None => Err("Error: id not found".into()),
106 },
107 Err(_) => Err("Error: REIM".into()),
108 }
109 }
110}
111
112#[derive(Serialize)]
113pub struct GraphQLPropertyValueChanged {
114 property_name: String,
115 value: Value,
116}
117
118impl GraphQLPropertyValueChanged {
119 pub fn new(property_name: String, value: Value) -> Self {
120 GraphQLPropertyValueChanged { property_name, value }
121 }
122}
123
124impl From<GraphQLPropertyValueChanged> for Value {
125 fn from(value_changed: GraphQLPropertyValueChanged) -> Self {
126 serde_json::to_value(value_changed).unwrap()
127 }
128}