reactive_graph_graphql_schema/mutation/instances/
flow_instance.rs

1use std::sync::Arc;
2
3use async_graphql::*;
4use uuid::Uuid;
5
6use reactive_graph_graph::EntityTypeId;
7use reactive_graph_graph::RelationInstanceId;
8use reactive_graph_reactive_model_impl::ReactiveFlow;
9use reactive_graph_reactive_service_api::ReactiveEntityManager;
10use reactive_graph_reactive_service_api::ReactiveFlowManager;
11use reactive_graph_reactive_service_api::ReactiveRelationManager;
12use reactive_graph_type_system_api::EntityTypeManager;
13use reactive_graph_type_system_api::FlowTypeManager;
14use reactive_graph_type_system_api::RelationTypeManager;
15
16use crate::error::flow::FlowMutationError;
17use crate::mutation::EntityTypeIdDefinition;
18use crate::mutation::FlowTypeIdDefinition;
19use crate::mutation::GraphQLFlowInstanceDefinition;
20use crate::mutation::GraphQLRelationInstanceId;
21use crate::query::GraphQLFlowInstance;
22use crate::query::GraphQLPropertyInstance;
23
24#[derive(Default)]
25pub struct MutationFlowInstances;
26
27/// Mutations for flows and their contained instances.
28#[Object]
29impl MutationFlowInstances {
30    /// Creates a new flow and a corresponding wrapper entity instance.
31    ///
32    /// The given entity type must exist. It provides the properties for the wrapper entity instance
33    /// and therefore defines which properties of the flow are the inputs and outputs.
34    ///
35    /// Optionally, a UUID can be specified.
36    ///
37    /// Optionally, the initial values of the properties can be specified. Specified properties
38    /// which are not provided by the given entity type are lacking of a definition (data type,
39    /// socket type).
40    async fn create(
41        &self,
42        context: &Context<'_>,
43        #[graphql(name = "type", desc = "The entity type")] entity_ty: EntityTypeIdDefinition,
44        // #[graphql(desc = "The namespace of the entity type")] namespace: String,
45        // #[graphql(name = "type", desc = "The type name of the entity type")] type_name: String,
46        #[graphql(desc = "The unique identifier of the flow instance and the wrapper entity instance")] flow_id: Option<Uuid>,
47        #[graphql(desc = "The properties of the flow instance and the wrapper entity instance")] properties: Option<Vec<GraphQLPropertyInstance>>,
48    ) -> Result<GraphQLFlowInstance> {
49        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
50        let entity_type_manager = context.data::<Arc<dyn EntityTypeManager + Send + Sync>>()?;
51        let reactive_entity_manager = context.data::<Arc<dyn ReactiveEntityManager + Send + Sync>>()?;
52
53        let ty = entity_ty.into();
54        let entity_type = entity_type_manager.get(&ty);
55        if entity_type.is_none() {
56            return Err(FlowMutationError::MissingEntityType(ty).into());
57        }
58
59        if let Some(flow_id) = flow_id {
60            if reactive_flow_manager.has(flow_id) {
61                return Err(FlowMutationError::FlowAlreadyExists(flow_id).into());
62            }
63            if reactive_entity_manager.has(flow_id) {
64                return Err(FlowMutationError::WrapperEntityInstanceAlreadyExists(flow_id).into());
65            }
66        }
67
68        let properties = GraphQLPropertyInstance::to_property_instances_with_defaults(properties, entity_type.unwrap().properties);
69
70        let wrapper_entity_instance = match flow_id {
71            Some(id) => reactive_entity_manager.create_with_id(&ty, id, properties),
72            None => reactive_entity_manager.create_reactive_entity(&ty, properties),
73        };
74
75        if wrapper_entity_instance.is_err() {
76            return Err(Error::new(wrapper_entity_instance.err().unwrap().to_string()));
77        }
78        let wrapper_entity_instance = wrapper_entity_instance.unwrap();
79
80        let flow_instance: ReactiveFlow = ReactiveFlow::new(wrapper_entity_instance); // Arc::new(wrapper_entity_instance.into());
81        reactive_flow_manager.register_flow_instance(flow_instance.clone());
82
83        Ok(flow_instance.into())
84    }
85
86    /// Creates a new flow from the given type.
87    ///
88    /// The corresponding wrapper entity instance will be created with the type.
89    ///
90    /// The given entity type must exist. It provides the properties for the wrapper entity instance
91    /// and therefore defines which properties of the flow are the inputs and outputs.
92    ///
93    /// Optionally, an UUID can be specified.
94    ///
95    /// Optionally, the initial values of the properties can be specified. Specified properties
96    /// which are not provided by the given entity type are lacking of a definition (data type,
97    /// socket type).
98    async fn create_from_type(
99        &self,
100        context: &Context<'_>,
101        #[graphql(name = "type", desc = "The flow type")] flow_ty: FlowTypeIdDefinition,
102        id: Option<Uuid>,
103        #[graphql(desc = "Parametrized construction of a flow instance using variables of a flow type.")] variables: Option<Vec<GraphQLPropertyInstance>>,
104        #[graphql(desc = "A list of properties of the wrapper entity instance.")] properties: Option<Vec<GraphQLPropertyInstance>>,
105    ) -> Result<GraphQLFlowInstance> {
106        let entity_type_manager = context.data::<Arc<dyn EntityTypeManager + Send + Sync>>()?;
107        let flow_type_manager = context.data::<Arc<dyn FlowTypeManager + Send + Sync>>()?;
108        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
109
110        let flow_ty = flow_ty.into();
111
112        let Some(flow_type) = flow_type_manager.get(&flow_ty) else {
113            return Err(FlowMutationError::MissingFlowType(flow_ty.clone()).into());
114        };
115
116        let entity_ty = flow_type.wrapper_type();
117        let Some(entity_type) = entity_type_manager.get(&entity_ty) else {
118            return Err(FlowMutationError::MissingEntityType(entity_ty).into());
119        };
120
121        let variables = GraphQLPropertyInstance::to_property_instances_with_defaults(variables, flow_type.variables);
122        let properties = GraphQLPropertyInstance::to_property_instances_with_defaults(properties, entity_type.properties);
123
124        match reactive_flow_manager.create_from_type(&flow_ty, id, variables, properties) {
125            Ok(flow_instance) => Ok(flow_instance.into()),
126            Err(e) => Err(Error::new(e)),
127        }
128    }
129
130    // /// Manually ticks all entity instances and relation instances of this flow. This means, for
131    // /// each property of each entity instance and relation instance the corresponding reactive
132    // /// stream will be activated with it's last value.
133    // ///
134    // /// This leads to a recalculation if the instance is controlled by an behaviour which
135    // /// consumes the reactive streams.
136    // ///
137    // /// In case of entity instances, it furthermore leads to a new value propagation if the output
138    // /// property is connected to other properties.
139    // async fn commit(&self, context: &Context<'_>, id: Uuid) -> Result<GraphQLFlowInstance> {
140    //     let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
141    //     let reactive_flow = reactive_flow_manager.get(id);
142    //     if reactive_flow.is_none() {
143    //         return Err(FlowMutationError::MissingFlow(id).into());
144    //     }
145    //     let reactive_flow: ReactiveFlow = reactive_flow.unwrap();
146    //     reactive_flow_manager.commit(reactive_flow.id);
147    //     Ok(reactive_flow.into())
148    // }
149
150    /// Creates a new entity instance and adds the entity instance to the given flow by id.
151    async fn create_entity(
152        &self,
153        context: &Context<'_>,
154        flow_id: Uuid,
155        namespace: String,
156        #[graphql(name = "type")] type_name: String,
157        entity_id: Option<Uuid>,
158        properties: Option<Vec<GraphQLPropertyInstance>>,
159    ) -> Result<GraphQLFlowInstance> {
160        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
161        let entity_type_manager = context.data::<Arc<dyn EntityTypeManager + Send + Sync>>()?;
162        let reactive_entity_manager = context.data::<Arc<dyn ReactiveEntityManager + Send + Sync>>()?;
163
164        let Some(reactive_flow) = reactive_flow_manager.get(flow_id) else {
165            return Err(FlowMutationError::MissingFlow(flow_id).into());
166        };
167
168        let ty = EntityTypeId::new_from_type(namespace, type_name);
169        let Some(entity_type) = entity_type_manager.get(&ty) else {
170            return Err(FlowMutationError::MissingEntityType(ty).into());
171        };
172
173        let properties = GraphQLPropertyInstance::to_property_instances_with_defaults(properties, entity_type.properties);
174
175        let Ok(reactive_entity) = (match entity_id {
176            Some(id) => reactive_entity_manager.create_with_id(&ty, id, properties),
177            None => reactive_entity_manager.create_reactive_entity(&ty, properties),
178        }) else {
179            return Err(FlowMutationError::EntityInstanceCreationError().into());
180        };
181        reactive_flow.add_entity(reactive_entity);
182        Ok(reactive_flow.into())
183    }
184
185    /// Adds an existing entity instance by id to the given flow by id
186    async fn add_entity(&self, context: &Context<'_>, flow_id: Uuid, entity_id: Uuid) -> Result<GraphQLFlowInstance> {
187        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
188        let reactive_entity_manager = context.data::<Arc<dyn ReactiveEntityManager + Send + Sync>>()?;
189        let Some(reactive_flow) = reactive_flow_manager.get(flow_id) else {
190            return Err(FlowMutationError::MissingFlow(flow_id).into());
191        };
192        let Some(reactive_entity) = reactive_entity_manager.get(entity_id) else {
193            return Err(FlowMutationError::MissingEntityInstance(entity_id).into());
194        };
195        reactive_flow.add_entity(reactive_entity);
196        // No commit necessary _> The reactive entity  is registered in the reactive_entity_manager
197        Ok(reactive_flow.into())
198    }
199
200    /// Removes an entity instance from flow.
201    async fn remove_entity(&self, context: &Context<'_>, flow_id: Uuid, entity_id: Uuid) -> Result<GraphQLFlowInstance> {
202        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
203        let reactive_entity_manager = context.data::<Arc<dyn ReactiveEntityManager + Send + Sync>>()?;
204
205        let flow_instance = reactive_flow_manager.get(flow_id);
206        if flow_instance.is_none() {
207            return Err(FlowMutationError::MissingFlow(flow_id).into());
208        }
209        let flow_instance = flow_instance.unwrap();
210
211        let entity_instance = reactive_entity_manager.get(entity_id);
212        if entity_instance.is_none() {
213            return Err(FlowMutationError::MissingEntityInstance(entity_id).into());
214        }
215        let entity_instance = entity_instance.unwrap();
216
217        if !flow_instance.has_entity_by_id(entity_id) {
218            return Err(FlowMutationError::FlowInstanceDoesNotContainEntityInstance(entity_id).into());
219        }
220
221        flow_instance.remove_entity(entity_instance.id);
222        // The entity is removed from the flow but not yet deleted
223        // TODO: How to handle this? It may be that a entity is used in multiple flows?
224        // Orphaned instances / Do not delete instances used in other flows?
225
226        Ok(flow_instance.into())
227    }
228
229    /// Creates a new relation instance and adds the relation instance to the given flow by id.
230    async fn create_relation(
231        &self,
232        context: &Context<'_>,
233        flow_id: Uuid,
234        relation_instance_id: GraphQLRelationInstanceId,
235        properties: Option<Vec<GraphQLPropertyInstance>>,
236    ) -> Result<GraphQLFlowInstance> {
237        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
238        let relation_type_manager = context.data::<Arc<dyn RelationTypeManager + Send + Sync>>()?;
239        let reactive_relation_manager = context.data::<Arc<dyn ReactiveRelationManager + Send + Sync>>()?;
240
241        let ty = relation_instance_id.ty();
242        let relation_ty = ty.relation_type_id();
243
244        let relation_type = relation_type_manager
245            .get(&relation_ty)
246            .ok_or::<FlowMutationError>(FlowMutationError::MissingRelationType(relation_ty))?;
247
248        let flow_instance = reactive_flow_manager
249            .get(flow_id)
250            .ok_or::<FlowMutationError>(FlowMutationError::MissingFlow(flow_id))?;
251
252        if !flow_instance.has_entity_by_id(relation_instance_id.outbound_id) {
253            return Err(FlowMutationError::MissingOutboundEntityInstance(relation_instance_id.outbound_id).into());
254        }
255
256        if !flow_instance.has_entity_by_id(relation_instance_id.inbound_id) {
257            return Err(FlowMutationError::MissingInboundEntityInstance(relation_instance_id.inbound_id).into());
258        }
259
260        // TODO: optionally we could check if the reactive_entity_manager contains the outbound_id and inbound_id
261
262        let properties = GraphQLPropertyInstance::to_property_instances_with_defaults(properties, relation_type.properties);
263
264        let relation_instance = reactive_relation_manager.create_reactive_relation(&relation_instance_id.into(), properties);
265
266        if relation_instance.is_err() {
267            return Err(FlowMutationError::RelationInstanceCreationError().into());
268        }
269
270        let relation_instance = relation_instance.unwrap();
271
272        // Add relation to flow
273        flow_instance.add_relation(relation_instance);
274
275        Ok(flow_instance.into())
276    }
277
278    /// Adds an existing relation instance by relation_instance_id to the given flow by id
279    async fn add_relation(&self, context: &Context<'_>, flow_id: Uuid, relation_instance_id: GraphQLRelationInstanceId) -> Result<GraphQLFlowInstance> {
280        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
281        let reactive_relation_manager = context.data::<Arc<dyn ReactiveRelationManager + Send + Sync>>()?;
282
283        let flow_instance = reactive_flow_manager.get(flow_id);
284        if flow_instance.is_none() {
285            return Err(FlowMutationError::MissingFlow(flow_id).into());
286        }
287        let flow_instance = flow_instance.unwrap();
288
289        let relation_instance_id: RelationInstanceId = relation_instance_id.into();
290        let relation_instance = reactive_relation_manager.get(&relation_instance_id);
291        if relation_instance.is_none() {
292            return Err(FlowMutationError::MissingRelationInstance(relation_instance_id).into());
293        }
294        let relation_instance = relation_instance.unwrap();
295
296        flow_instance.add_relation(relation_instance);
297
298        Ok(flow_instance.into())
299    }
300
301    /// Removes an existing relation instance by relation_instance_id from the given flow by id
302    async fn remove_relation(&self, context: &Context<'_>, flow_id: Uuid, relation_instance_id: GraphQLRelationInstanceId) -> Result<GraphQLFlowInstance> {
303        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
304
305        let flow_instance = reactive_flow_manager.get(flow_id);
306        if flow_instance.is_none() {
307            return Err(FlowMutationError::MissingFlow(flow_id).into());
308        }
309        let flow_instance = flow_instance.unwrap();
310
311        let relation_instance_id: RelationInstanceId = relation_instance_id.into();
312
313        if !flow_instance.has_relation_by_key(&relation_instance_id) {
314            return Err(FlowMutationError::FlowInstanceDoesNotContainRelationInstance(relation_instance_id).into());
315        }
316
317        flow_instance.remove_relation(&relation_instance_id);
318        // The relation is removed from flow, but not yet deleted
319        // TODO: How to handle this? It may be that a relation is used in multiple flows?
320        // Orphaned instances / Do not delete instances used in other flows?
321
322        Ok(flow_instance.into())
323    }
324
325    async fn delete(&self, context: &Context<'_>, #[graphql(desc = "The id of the entity instance")] id: Uuid) -> Result<bool> {
326        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
327        Ok(reactive_flow_manager.delete(id))
328    }
329
330    /// Imports the given flow. Creates entity instances and relation instances which are contained
331    /// in the given flow.
332    async fn import(&self, context: &Context<'_>, flow: GraphQLFlowInstanceDefinition) -> Result<GraphQLFlowInstance> {
333        let reactive_flow_manager = context.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
334        let flow_instance = reactive_flow_manager.create_reactive_flow(flow.into())?;
335        Ok(flow_instance.into())
336    }
337}