reactive_graph_dynamic_graph_impl/field/
flow.rs1use crate::field::create_properties_from_field_arguments;
2use crate::field::to_field_value;
3use crate::field::to_input_type_ref;
4use crate::field::to_type_ref;
5use crate::interface::flow::INTERFACE_FLOW_FIELD_ID;
6use crate::object::types::DynamicGraphTypeDefinition;
7use async_graphql::Error;
8use async_graphql::ID;
9use async_graphql::dynamic::Field;
10use async_graphql::dynamic::FieldFuture;
11use async_graphql::dynamic::FieldValue;
12use async_graphql::dynamic::InputValue;
13use async_graphql::dynamic::ResolverContext;
14use async_graphql::dynamic::TypeRef;
15use reactive_graph_dynamic_graph_api::FlowInstanceIsNotOfType;
16use reactive_graph_dynamic_graph_api::FlowInstanceNotFound;
17use reactive_graph_graph::DataType;
18use reactive_graph_graph::FlowType;
19use reactive_graph_graph::PropertyInstanceGetter;
20use reactive_graph_graph::PropertyType;
21use reactive_graph_graph::PropertyTypeDefinition;
22use reactive_graph_graph::TypeDefinitionGetter;
23use reactive_graph_reactive_model_impl::ReactiveFlow;
24use reactive_graph_reactive_service_api::ReactiveFlowManager;
25use reactive_graph_runtime_model::LabeledProperties::LABEL;
26use reactive_graph_type_system_api::EntityTypeManager;
27use serde_json::Value;
28use std::str::FromStr;
29use std::sync::Arc;
30use uuid::Uuid;
31
32pub fn flow_id_field() -> Field {
33 Field::new(INTERFACE_FLOW_FIELD_ID, TypeRef::named_nn(TypeRef::ID), |ctx| {
34 FieldFuture::new(async move {
35 let flow_instance = ctx.parent_value.try_downcast_ref::<ReactiveFlow>()?;
36 Ok(Some(FieldValue::value(ID(flow_instance.id.to_string()))))
37 })
38 })
39}
40
41pub fn flow_property_field(property_type: &PropertyType) -> Field {
42 let property_type_inner = property_type.clone();
43 Field::new(&property_type.name, to_type_ref(&property_type.data_type), move |ctx| {
44 let property_type = property_type_inner.clone();
45 FieldFuture::new(async move {
46 let flow_instance = ctx.parent_value.try_downcast_ref::<ReactiveFlow>()?;
47 Ok(flow_instance.get(&property_type.name).and_then(to_field_value))
48 })
49 })
50 .description(&property_type.description)
51}
52
53pub fn flow_query_field(flow_type: &FlowType) -> Field {
54 let ty = flow_type.ty.clone();
55 let flow_type_inner = flow_type.clone();
56 let dy_ty = DynamicGraphTypeDefinition::from(&ty);
57 let mut field = Field::new(dy_ty.field_name(), TypeRef::named_nn_list_nn(dy_ty.to_string()), move |ctx| {
58 let ty = ty.clone();
59 let flow_type = flow_type_inner.clone();
60 let entity_ty = flow_type.wrapper_type();
61 FieldFuture::new(async move {
62 let flow_instance_manager = ctx.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
63 if let Ok(id) = ctx.args.try_get("id") {
64 let id = Uuid::from_str(id.string()?)?;
65 let flow_instance = flow_instance_manager.get(id).ok_or(Error::new("Uuid not found"))?;
66 if flow_instance.ty != entity_ty {
67 return Err(Error::new(format!("Flow {} is not a {}", id, &ty)));
68 }
69 return Ok(Some(FieldValue::list(vec![FieldValue::owned_any(flow_instance)])));
70 }
71 if let Ok(label) = ctx.args.try_get("label") {
72 let flow_instance = flow_instance_manager.get_by_label(label.string()?).ok_or(Error::new("Label not found"))?;
73 if flow_instance.ty != entity_ty {
74 return Err(Error::new(format!("Flow {} is not a {}", flow_instance.id, &ty)));
75 }
76 return Ok(Some(FieldValue::list(vec![FieldValue::owned_any(flow_instance)])));
77 }
78 let instances = get_flow_instances_by_type_filter_by_properties(&ctx, &flow_type)?;
79 Ok(Some(FieldValue::list(instances.into_iter().map(FieldValue::owned_any))))
80 })
81 })
82 .description(flow_type.description.clone())
83 .argument(InputValue::new("id", TypeRef::named(TypeRef::STRING)))
84 .argument(InputValue::new("label", TypeRef::named(TypeRef::STRING)));
85 field = add_flow_type_variables_as_field_arguments(field, flow_type, true, true);
86 field
87}
88
89pub fn flow_creation_field(flow_type: &FlowType) -> Option<Field> {
91 let flow_type_inner = flow_type.clone();
92 let dy_ty = DynamicGraphTypeDefinition::from(&flow_type.ty);
93 let mut field = Field::new(dy_ty.mutation_field_name("create"), TypeRef::named_nn(dy_ty.to_string()), move |ctx| {
94 let ty = flow_type_inner.ty.clone();
95 let flow_type = flow_type_inner.clone();
96 FieldFuture::new(async move {
97 let flow_instance_manager = ctx.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
98 let entity_type_manager = ctx.data::<Arc<dyn EntityTypeManager + Send + Sync>>()?;
99
100 let entity_ty = flow_type.wrapper_type();
101 let Some(entity_type) = entity_type_manager.get(&entity_ty) else {
102 return Err(Error::new(format!("Missing entity type {}", entity_ty.type_definition())));
103 };
104 let id = ctx.args.get("id").and_then(|id| id.string().ok().and_then(|s| Uuid::from_str(s).ok()));
105 if let Some(id) = id {
106 if flow_instance_manager.has(id) {
107 return Err(Error::new(format!("Uuid {id} is already taken")));
108 }
109 }
110
111 let properties = create_properties_from_field_arguments(&ctx, &entity_type.properties)?;
121 let variables = create_properties_from_field_arguments(&ctx, &flow_type.variables)?;
124 match flow_instance_manager.create_from_type(&ty, id, variables, properties) {
127 Ok(reactive_flow) => Ok(Some(FieldValue::owned_any(reactive_flow))),
128 Err(e) => Err(Error::new(format!("Failed to create reactive flow: {e:?}"))),
129 }
130 })
142 })
143 .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)));
144 field = add_flow_type_variables_as_field_arguments(field, flow_type, false, false);
145 Some(field)
146}
147
148pub fn flow_mutation_field(flow_type: &FlowType) -> Option<Field> {
149 let ty = flow_type.ty.clone();
150 let flow_type_inner = flow_type.clone();
151 let dy_ty = DynamicGraphTypeDefinition::from(&flow_type.ty);
152 let mut field = Field::new(dy_ty.field_name(), TypeRef::named_nn(dy_ty.mutation_type_name()), move |ctx| {
153 let ty = ty.clone();
154 let flow_type = flow_type_inner.clone();
155 FieldFuture::new(async move {
156 let flow_instance_manager = ctx.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
157 if let Ok(ids) = ctx.args.try_get("ids") {
159 let mut flow_instances = Vec::new();
160 for id in ids
161 .list()?
162 .iter()
163 .filter_map(|id| id.string().map(str::to_string).ok())
164 .filter_map(|id| Uuid::from_str(&id).ok())
165 {
166 if let Some(flow_instance) = flow_instance_manager.get(id) {
167 if flow_instance.ty != flow_type.wrapper_type() {
168 return Err(FlowInstanceIsNotOfType(id, ty.clone(), flow_type.wrapper_type()).into());
169 }
170 flow_instances.push(flow_instance);
171 }
172 }
173 let field_value = FieldValue::owned_any(flow_instances);
174 return Ok(Some(field_value));
175 }
176 if let Ok(id) = ctx.args.try_get("id") {
178 let id = Uuid::from_str(id.string()?)?;
179 let flow_instance = flow_instance_manager.get(id).ok_or(FlowInstanceNotFound(id))?;
180
181 if flow_instance.ty != flow_type.wrapper_type() {
182 return Err(FlowInstanceIsNotOfType(id, ty.clone(), flow_type.wrapper_type()).into());
183 }
184 let flow_instances = vec![flow_instance];
185 let field_value = FieldValue::owned_any(flow_instances);
186 return Ok(Some(field_value));
187 }
188 let instances = get_flow_instances_by_type_filter_by_properties(&ctx, &flow_type)?;
190 let field_value = FieldValue::owned_any(instances);
191 Ok(Some(field_value))
192 })
193 })
194 .description(flow_type.description.clone())
195 .argument(InputValue::new("ids", TypeRef::named_nn_list(TypeRef::ID)))
196 .argument(InputValue::new("id", TypeRef::named(TypeRef::ID)))
197 .argument(InputValue::new("label", TypeRef::named(TypeRef::STRING)));
199 field = add_flow_type_variables_as_field_arguments(field, flow_type, true, true);
200 Some(field)
201}
202
203fn get_flow_instances_by_type_filter_by_properties(ctx: &ResolverContext, flow_type: &FlowType) -> async_graphql::Result<Vec<ReactiveFlow>> {
204 let reactive_flow_manager = ctx.data::<Arc<dyn ReactiveFlowManager + Send + Sync>>()?;
205 let mut instances = reactive_flow_manager.get_by_type(&flow_type.ty);
206 for property in flow_type.variables.iter() {
207 let Some(expected_value) = ctx.args.get(&property.name) else {
208 continue;
209 };
210 instances.retain(|instance| match instance.get(&property.name) {
211 Some(actual_value) => match &property.data_type {
212 DataType::Null => false,
213 DataType::Bool => expected_value
214 .boolean()
215 .map(|expected_value| actual_value.as_bool().map(|actual_value| expected_value == actual_value).unwrap_or(false))
216 .unwrap_or(false),
217 DataType::Number => {
218 if let Ok(expected_value) = expected_value.i64() {
219 actual_value.as_i64().map(|actual_value| expected_value == actual_value).unwrap_or(false)
220 } else if let Ok(expected_value) = expected_value.u64() {
221 actual_value.as_u64().map(|actual_value| expected_value == actual_value).unwrap_or(false)
222 } else if let Ok(expected_value) = expected_value.f64() {
223 actual_value.as_f64().map(|actual_value| expected_value == actual_value).unwrap_or(false)
224 } else {
225 false
226 }
227 }
228 DataType::String => expected_value
229 .string()
230 .map(|expected_value| actual_value.as_str().map(|actual_value| expected_value == actual_value).unwrap_or(false))
231 .unwrap_or(false),
232 DataType::Array => {
233 if let Ok(_l) = expected_value.list() {
234 if let Ok(expected_value) = expected_value.deserialize::<Value>() {
235 if expected_value.is_array() && actual_value.is_array() {
236 expected_value == actual_value
237 } else {
238 false
239 }
240 } else {
241 false
242 }
243 } else {
244 false
245 }
246 }
247 DataType::Object => {
248 if let Ok(_o) = expected_value.object() {
249 if let Ok(expected_value) = expected_value.deserialize::<Value>() {
250 if expected_value.is_object() && actual_value.is_object() {
251 expected_value == actual_value
252 } else {
253 false
254 }
255 } else {
256 false
257 }
258 } else {
259 false
260 }
261 }
262 DataType::Any => match expected_value.deserialize::<Value>() {
263 Ok(expected_value) => expected_value == actual_value,
264 Err(_) => false,
265 },
266 },
267 None => false,
268 });
269 }
270 Ok(instances)
271}
272
273fn add_flow_type_variables_as_field_arguments(mut field: Field, flow_type: &FlowType, is_optional: bool, exclude_label: bool) -> Field {
274 for property in flow_type.variables.iter() {
275 if exclude_label && property.name == LABEL.property_name() {
276 continue;
277 }
278 if let Some(type_ref) = to_input_type_ref(property.value(), is_optional) {
279 field = field.argument(InputValue::new(&property.name, type_ref));
280 }
281 }
282 field
283}