reactive_graph_dynamic_graph_impl/field/
flow.rs

1use crate::field::create_properties_from_field_arguments;
2use crate::field::to_field_value;
3use crate::field::to_input_type_ref;
4use crate::field::to_type_ref;
5use crate::interface::flow::INTERFACE_FLOW_FIELD_ID;
6use crate::object::types::DynamicGraphTypeDefinition;
7use async_graphql::Error;
8use async_graphql::ID;
9use async_graphql::dynamic::Field;
10use async_graphql::dynamic::FieldFuture;
11use async_graphql::dynamic::FieldValue;
12use async_graphql::dynamic::InputValue;
13use async_graphql::dynamic::ResolverContext;
14use async_graphql::dynamic::TypeRef;
15use reactive_graph_dynamic_graph_api::FlowInstanceIsNotOfType;
16use reactive_graph_dynamic_graph_api::FlowInstanceNotFound;
17use reactive_graph_graph::DataType;
18use reactive_graph_graph::FlowType;
19use reactive_graph_graph::PropertyInstanceGetter;
20use reactive_graph_graph::PropertyType;
21use reactive_graph_graph::PropertyTypeDefinition;
22use reactive_graph_graph::TypeDefinitionGetter;
23use reactive_graph_reactive_model_impl::ReactiveFlow;
24use reactive_graph_reactive_service_api::ReactiveFlowManager;
25use reactive_graph_runtime_model::LabeledProperties::LABEL;
26use reactive_graph_type_system_api::EntityTypeManager;
27use serde_json::Value;
28use std::str::FromStr;
29use std::sync::Arc;
30use uuid::Uuid;
31
32pub fn flow_id_field() -> Field {
33    Field::new(INTERFACE_FLOW_FIELD_ID, TypeRef::named_nn(TypeRef::ID), |ctx| {
34        FieldFuture::new(async move {
35            let flow_instance = ctx.parent_value.try_downcast_ref::<ReactiveFlow>()?;
36            Ok(Some(FieldValue::value(ID(flow_instance.id.to_string()))))
37        })
38    })
39}
40
41pub fn flow_property_field(property_type: &PropertyType) -> Field {
42    let property_type_inner = property_type.clone();
43    Field::new(&property_type.name, to_type_ref(&property_type.data_type), move |ctx| {
44        let property_type = property_type_inner.clone();
45        FieldFuture::new(async move {
46            let flow_instance = ctx.parent_value.try_downcast_ref::<ReactiveFlow>()?;
47            Ok(flow_instance.get(&property_type.name).and_then(to_field_value))
48        })
49    })
50    .description(&property_type.description)
51}
52
53pub fn flow_query_field(flow_type: &FlowType) -> Field {
54    let ty = flow_type.ty.clone();
55    let flow_type_inner = flow_type.clone();
56    let dy_ty = DynamicGraphTypeDefinition::from(&ty);
57    let mut field = Field::new(dy_ty.field_name(), TypeRef::named_nn_list_nn(dy_ty.to_string()), move |ctx| {
58        let ty = ty.clone();
59        let flow_type = flow_type_inner.clone();
60        let entity_ty = flow_type.wrapper_type();
61        FieldFuture::new(async move {
62            let flow_instance_manager = ctx.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
63            if let Ok(id) = ctx.args.try_get("id") {
64                let id = Uuid::from_str(id.string()?)?;
65                let flow_instance = flow_instance_manager.get(id).ok_or(Error::new("Uuid not found"))?;
66                if flow_instance.ty != entity_ty {
67                    return Err(Error::new(format!("Flow {} is not a {}", id, &ty)));
68                }
69                return Ok(Some(FieldValue::list(vec![FieldValue::owned_any(flow_instance)])));
70            }
71            if let Ok(label) = ctx.args.try_get("label") {
72                let flow_instance = flow_instance_manager.get_by_label(label.string()?).ok_or(Error::new("Label not found"))?;
73                if flow_instance.ty != entity_ty {
74                    return Err(Error::new(format!("Flow {} is not a {}", flow_instance.id, &ty)));
75                }
76                return Ok(Some(FieldValue::list(vec![FieldValue::owned_any(flow_instance)])));
77            }
78            let instances = get_flow_instances_by_type_filter_by_properties(&ctx, &flow_type)?;
79            Ok(Some(FieldValue::list(instances.into_iter().map(FieldValue::owned_any))))
80        })
81    })
82    .description(flow_type.description.clone())
83    .argument(InputValue::new("id", TypeRef::named(TypeRef::STRING)))
84    .argument(InputValue::new("label", TypeRef::named(TypeRef::STRING)));
85    field = add_flow_type_variables_as_field_arguments(field, flow_type, true, true);
86    field
87}
88
89// , flow_instance_manager: Arc<dyn ReactiveFlowManager + Send + Sync>
90pub fn flow_creation_field(flow_type: &FlowType) -> Option<Field> {
91    let flow_type_inner = flow_type.clone();
92    let dy_ty = DynamicGraphTypeDefinition::from(&flow_type.ty);
93    let mut field = Field::new(dy_ty.mutation_field_name("create"), TypeRef::named_nn(dy_ty.to_string()), move |ctx| {
94        let ty = flow_type_inner.ty.clone();
95        let flow_type = flow_type_inner.clone();
96        FieldFuture::new(async move {
97            let flow_instance_manager = ctx.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
98            let entity_type_manager = ctx.data::<Arc<dyn EntityTypeManager + Send + Sync>>()?;
99
100            let entity_ty = flow_type.wrapper_type();
101            let Some(entity_type) = entity_type_manager.get(&entity_ty) else {
102                return Err(Error::new(format!("Missing entity type {}", entity_ty.type_definition())));
103            };
104            let id = ctx.args.get("id").and_then(|id| id.string().ok().and_then(|s| Uuid::from_str(s).ok()));
105            if let Some(id) = id {
106                if flow_instance_manager.has(id) {
107                    return Err(Error::new(format!("Uuid {id} is already taken")));
108                }
109            }
110
111            // let id = if let Some(id) = ctx.args.get("id") {
112            //     let id = Uuid::from_str(id.string()?)?;
113            //     if flow_instance_manager.has(id) {
114            //         return Err(Error::new(format!("Uuid {} is already taken", id)));
115            //     }
116            //     Some(id)
117            // } else {
118            //     None
119            // };
120            let properties = create_properties_from_field_arguments(&ctx, &entity_type.properties)?;
121            // let properties = ReactiveProperties::new_with_id_from_properties(id, properties);
122
123            let variables = create_properties_from_field_arguments(&ctx, &flow_type.variables)?;
124            // let variables = ReactiveProperties::new_with_id_from_properties(id, variables);
125
126            match flow_instance_manager.create_from_type(&ty, id, variables, properties) {
127                Ok(reactive_flow) => Ok(Some(FieldValue::owned_any(reactive_flow))),
128                Err(e) => Err(Error::new(format!("Failed to create reactive flow: {e:?}"))),
129            }
130            // let Ok(reactive_flow) = flow_instance_manager.create_from_type(&ty, variables, properties) else {
131            //     return Err(Error::new(format!("Failed to create reactive flow: {}",)));
132            // };
133            //
134            // let reactive_flow = ReactiveFlow::builder().ty(&ty).id(id).properties(properties).build();
135            // // TODO: flow_instance_manager.create
136            // let x = flow_instance_manager.register_flow_instance_and_reactive_instances(reactive_flow);
137            // if let Ok(reactive_flow) = flow_instance_manager.register_reactive_instance(reactive_flow) {
138            //     return Ok(Some(FieldValue::owned_any(reactive_flow)));
139            // }
140            // Ok(None)
141        })
142    })
143    .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)));
144    field = add_flow_type_variables_as_field_arguments(field, flow_type, false, false);
145    Some(field)
146}
147
148pub fn flow_mutation_field(flow_type: &FlowType) -> Option<Field> {
149    let ty = flow_type.ty.clone();
150    let flow_type_inner = flow_type.clone();
151    let dy_ty = DynamicGraphTypeDefinition::from(&flow_type.ty);
152    let mut field = Field::new(dy_ty.field_name(), TypeRef::named_nn(dy_ty.mutation_type_name()), move |ctx| {
153        let ty = ty.clone();
154        let flow_type = flow_type_inner.clone();
155        FieldFuture::new(async move {
156            let flow_instance_manager = ctx.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
157            // Multiple ids
158            if let Ok(ids) = ctx.args.try_get("ids") {
159                let mut flow_instances = Vec::new();
160                for id in ids
161                    .list()?
162                    .iter()
163                    .filter_map(|id| id.string().map(str::to_string).ok())
164                    .filter_map(|id| Uuid::from_str(&id).ok())
165                {
166                    if let Some(flow_instance) = flow_instance_manager.get(id) {
167                        if flow_instance.ty != flow_type.wrapper_type() {
168                            return Err(FlowInstanceIsNotOfType(id, ty.clone(), flow_type.wrapper_type()).into());
169                        }
170                        flow_instances.push(flow_instance);
171                    }
172                }
173                let field_value = FieldValue::owned_any(flow_instances);
174                return Ok(Some(field_value));
175            }
176            // Single ids
177            if let Ok(id) = ctx.args.try_get("id") {
178                let id = Uuid::from_str(id.string()?)?;
179                let flow_instance = flow_instance_manager.get(id).ok_or(FlowInstanceNotFound(id))?;
180
181                if flow_instance.ty != flow_type.wrapper_type() {
182                    return Err(FlowInstanceIsNotOfType(id, ty.clone(), flow_type.wrapper_type()).into());
183                }
184                let flow_instances = vec![flow_instance];
185                let field_value = FieldValue::owned_any(flow_instances);
186                return Ok(Some(field_value));
187            }
188            // TODO: implement label matching
189            let instances = get_flow_instances_by_type_filter_by_properties(&ctx, &flow_type)?;
190            let field_value = FieldValue::owned_any(instances);
191            Ok(Some(field_value))
192        })
193    })
194    .description(flow_type.description.clone())
195    .argument(InputValue::new("ids", TypeRef::named_nn_list(TypeRef::ID)))
196    .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))
197    // TODO: implement label matching
198    .argument(InputValue::new("label", TypeRef::named(TypeRef::STRING)));
199    field = add_flow_type_variables_as_field_arguments(field, flow_type, true, true);
200    Some(field)
201}
202
203fn get_flow_instances_by_type_filter_by_properties(ctx: &ResolverContext, flow_type: &FlowType) -> async_graphql::Result<Vec<ReactiveFlow>> {
204    let reactive_flow_manager = ctx.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
205    let mut instances = reactive_flow_manager.get_by_type(&flow_type.ty);
206    for property in flow_type.variables.iter() {
207        let Some(expected_value) = ctx.args.get(&property.name) else {
208            continue;
209        };
210        instances.retain(|instance| match instance.get(&property.name) {
211            Some(actual_value) => match &property.data_type {
212                DataType::Null => false,
213                DataType::Bool => expected_value
214                    .boolean()
215                    .map(|expected_value| actual_value.as_bool().map(|actual_value| expected_value == actual_value).unwrap_or(false))
216                    .unwrap_or(false),
217                DataType::Number => {
218                    if let Ok(expected_value) = expected_value.i64() {
219                        actual_value.as_i64().map(|actual_value| expected_value == actual_value).unwrap_or(false)
220                    } else if let Ok(expected_value) = expected_value.u64() {
221                        actual_value.as_u64().map(|actual_value| expected_value == actual_value).unwrap_or(false)
222                    } else if let Ok(expected_value) = expected_value.f64() {
223                        actual_value.as_f64().map(|actual_value| expected_value == actual_value).unwrap_or(false)
224                    } else {
225                        false
226                    }
227                }
228                DataType::String => expected_value
229                    .string()
230                    .map(|expected_value| actual_value.as_str().map(|actual_value| expected_value == actual_value).unwrap_or(false))
231                    .unwrap_or(false),
232                DataType::Array => {
233                    if let Ok(_l) = expected_value.list() {
234                        if let Ok(expected_value) = expected_value.deserialize::<Value>() {
235                            if expected_value.is_array() && actual_value.is_array() {
236                                expected_value == actual_value
237                            } else {
238                                false
239                            }
240                        } else {
241                            false
242                        }
243                    } else {
244                        false
245                    }
246                }
247                DataType::Object => {
248                    if let Ok(_o) = expected_value.object() {
249                        if let Ok(expected_value) = expected_value.deserialize::<Value>() {
250                            if expected_value.is_object() && actual_value.is_object() {
251                                expected_value == actual_value
252                            } else {
253                                false
254                            }
255                        } else {
256                            false
257                        }
258                    } else {
259                        false
260                    }
261                }
262                DataType::Any => match expected_value.deserialize::<Value>() {
263                    Ok(expected_value) => expected_value == actual_value,
264                    Err(_) => false,
265                },
266            },
267            None => false,
268        });
269    }
270    Ok(instances)
271}
272
273fn add_flow_type_variables_as_field_arguments(mut field: Field, flow_type: &FlowType, is_optional: bool, exclude_label: bool) -> Field {
274    for property in flow_type.variables.iter() {
275        if exclude_label && property.name == LABEL.property_name() {
276            continue;
277        }
278        if let Some(type_ref) = to_input_type_ref(property.value(), is_optional) {
279            field = field.argument(InputValue::new(&property.name, type_ref));
280        }
281    }
282    field
283}