reactive_graph_plugin_service_impl/
plugin_resolver_impl.rs1use std::sync::Arc;
2use std::sync::RwLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use log::info;
7use log::trace;
8use log::warn;
9use springtime_di::Component;
10use springtime_di::component_alias;
11use tokio::task::yield_now;
12use uuid::Uuid;
13
14use reactive_graph_config_api::ConfigManager;
15use reactive_graph_lifecycle::Lifecycle;
16use reactive_graph_plugin_api::PluginRefreshingState;
17use reactive_graph_plugin_api::PluginResolveState;
18use reactive_graph_plugin_api::PluginStartingState;
19use reactive_graph_plugin_api::PluginState;
20use reactive_graph_plugin_api::PluginStoppingState;
21use reactive_graph_plugin_api::PluginUninstallingState;
22use reactive_graph_plugin_service_api::PluginContainerManager;
23use reactive_graph_plugin_service_api::PluginContextFactory;
24use reactive_graph_plugin_service_api::PluginResolver;
25use reactive_graph_plugin_service_api::PluginResolverMode;
26use reactive_graph_plugin_service_api::PluginTransitionResult;
27use reactive_graph_plugin_service_api::PluginTransitionResult::Changed;
28use reactive_graph_plugin_service_api::PluginTransitionResult::NoChange;
29
30const MAX_ITERATIONS: u32 = 1000;
31
32fn create_plugin_resolver_mode() -> RwLock<PluginResolverMode> {
35 RwLock::new(PluginResolverMode::Neutral)
36}
37
38#[derive(Component)]
39pub struct PluginResolverImpl {
40 plugin_container_manager: Arc<dyn PluginContainerManager + Send + Sync>,
41
42 plugin_context_factory: Arc<dyn PluginContextFactory + Send + Sync>,
43
44 config_manager: Arc<dyn ConfigManager + Send + Sync>,
45
46 #[component(default = "create_plugin_resolver_mode")]
48 pub mode: RwLock<PluginResolverMode>,
49}
50
51impl PluginResolverImpl {
52 fn is_disabled(&self) -> bool {
53 self.config_manager.get_plugins_config().disabled.unwrap_or(false)
54 }
55
56 fn is_plugin_disabled(&self, id: Uuid) -> bool {
57 let stem = self.plugin_container_manager.get_stem(&id);
58 let name = self.plugin_container_manager.name(&id);
59 let short_name = self.plugin_container_manager.name_canonicalized(&id);
60
61 if let Some(enabled_plugins) = self.config_manager.get_plugins_config().enabled_plugins {
62 if let (Some(name), Some(short_name)) = (name.clone(), short_name.clone()) {
63 if !enabled_plugins.contains(&name) && !enabled_plugins.contains(&short_name) {
64 return true;
65 }
66 }
67 return false;
68 }
69
70 if let Some(disabled_plugins) = self.config_manager.get_plugins_config().disabled_plugins {
71 if let Some(stem) = stem {
72 if disabled_plugins.contains(&stem) {
73 return true;
74 }
75 }
76 if let Some(name) = name {
77 if disabled_plugins.contains(&name) {
78 return true;
79 }
80 }
81 if let Some(short_name) = short_name {
82 if disabled_plugins.contains(&short_name) {
83 return true;
84 }
85 }
86 }
87 false
88 }
89
90 fn log_unsatisfied_dependencies(&self) {
91 for id in self.plugin_container_manager.get_plugins_not_having_state(PluginState::Active) {
92 let name = self.plugin_container_manager.name_canonicalized(&id).unwrap_or(id.to_string());
93 for d in self.plugin_container_manager.get_unsatisfied_dependencies(&id) {
94 trace!("Plugin {} {} has unsatisfied dependency: {}", id, &name, d.name_version());
95 match self.plugin_container_manager.get_plugin_by_dependency(&d) {
96 Some(dependency_id) => {
97 let dependency_name_version = self.plugin_container_manager.name_version(&dependency_id).unwrap_or(dependency_id.to_string());
98 let dependency_state = self
101 .plugin_container_manager
102 .get_plugin_state(&dependency_id)
103 .unwrap_or(PluginState::Uninstalled);
104 warn!(
105 "Plugin {} has unsatisfied dependency: {} - which exists ({}) but has state {:?}",
106 &name,
107 d.name_version(),
108 dependency_name_version,
109 dependency_state
110 );
111 }
112 None => {
113 warn!("Plugin {} has unsatisfied dependency: {} - which doesn't exist", &name, d.name_version());
114 }
115 }
116 }
117 }
118 }
119}
120
121#[async_trait]
122#[component_alias]
123impl PluginResolver for PluginResolverImpl {
124 async fn resolve_until_idle(&self) {
125 if self.is_disabled() {
126 trace!("Skipping all plugins");
127 return;
128 }
129 let mut i = 0;
130 while self.resolve().await == Changed && i < MAX_ITERATIONS {
131 i += 1;
132 if i % 50 == 0 {
133 yield_now().await
134 }
135 }
136
137 if i >= MAX_ITERATIONS {
138 warn!("Plugin resolver force stopped after {i} iterations");
139 } else {
140 trace!("Plugin resolver finished after {i} iterations");
141 }
142 }
143
144 async fn stop_until_all_stopped(&self) {
145 self.transition_to_fallback_states().await;
146 let mut i = 0;
147 while !self.plugin_container_manager.are_all_stopped() && i < MAX_ITERATIONS {
148 self.resolve_until_idle().await;
149 tokio::time::sleep(Duration::from_millis(10)).await;
150 i += 1;
151 if i % 50 == 0 {
152 yield_now().await
153 }
154 }
156 if i >= MAX_ITERATIONS {
157 warn!("Plugin resolver force stopped after {i} iterations");
158 } else {
159 trace!("Plugin resolver finished after {i} iterations");
160 }
161 }
162
163 async fn resolve(&self) -> PluginTransitionResult {
164 let mode = self.get_mode();
165 trace!("Resolving plugins (mode: {:?})", mode);
166 for id in self.plugin_container_manager.get_plugins_with_states(
168 PluginState::Uninstalling(PluginUninstallingState::UnloadDll),
169 PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll)),
170 ) {
171 if self.plugin_container_manager.unload_dll(&id) == Changed {
172 return Changed;
173 };
174 }
175 if let Some(id) = self
177 .plugin_container_manager
178 .get_plugins_with_states(
179 PluginState::Uninstalling(PluginUninstallingState::UninstallDll),
180 PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll)),
181 )
182 .into_iter()
183 .next()
184 {
185 self.plugin_container_manager.uninstall_dll(&id);
186 return Changed;
187 }
188 if let Some(id) = self
190 .plugin_container_manager
191 .get_plugins_with_state(PluginState::Uninstalled)
192 .into_iter()
193 .next()
194 {
195 self.plugin_container_manager.remove_plugin_container(&id);
196 return Changed;
197 }
198 for id in self.plugin_container_manager.get_plugins_not_having_state(PluginState::Disabled) {
200 if self.is_plugin_disabled(id) && self.plugin_container_manager.disable(&id).is_ok() {
201 return Changed;
202 }
203 }
204 for id in self.plugin_container_manager.get_plugins_with_states(
206 PluginState::Resolving(PluginResolveState::CompilerVersionMismatch),
207 PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::CompilerVersionMismatch)),
208 ) {
209 if self
211 .plugin_container_manager
212 .set_state(&id, PluginState::Uninstalling(PluginUninstallingState::UnloadDll))
213 == Changed
214 {
215 return Changed;
216 }
217 }
218 for id in self.plugin_container_manager.get_plugins_with_states(
220 PluginState::Resolving(PluginResolveState::PluginApiVersionMismatch),
221 PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginApiVersionMismatch)),
222 ) {
223 if self
225 .plugin_container_manager
226 .set_state(&id, PluginState::Uninstalling(PluginUninstallingState::UnloadDll))
227 == Changed
228 {
229 return Changed;
230 }
231 }
232 for id in self
234 .plugin_container_manager
235 .get_plugins_with_state(PluginState::Refreshing(PluginRefreshingState::Deploying))
236 {
237 if self.plugin_container_manager.deploy_dll(&id) == Changed {
238 return Changed;
239 }
240 }
241 for id in self
244 .plugin_container_manager
245 .get_plugins_with_states(PluginState::Installed, PluginState::Refreshing(PluginRefreshingState::Installed))
246 {
247 if self.plugin_container_manager.load_dll(&id) == Changed {
248 return Changed;
249 }
250 }
251 for id in self.plugin_container_manager.get_plugins_with_states(
253 PluginState::Resolving(PluginResolveState::Loaded),
254 PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded)),
255 ) {
256 if self.plugin_container_manager.load_plugin_declaration(&id) == Changed {
257 return Changed;
258 }
259 }
260 for id in self.plugin_container_manager.get_plugins_with_states(
264 PluginState::Resolving(PluginResolveState::PluginDeclarationLoaded),
265 PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded)),
266 ) {
267 if self.plugin_container_manager.check_plugin_compatibility(&id) == Changed {
268 return Changed;
269 }
270 }
271 for id in self.plugin_container_manager.get_plugins_with_states(
273 PluginState::Resolving(PluginResolveState::PluginCompatible),
274 PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible)),
275 ) {
276 if self.plugin_container_manager.load_plugin_dependencies(&id) == Changed {
277 return Changed;
278 }
279 }
280 for id in self
282 .plugin_container_manager
283 .get_plugins_with_state(PluginState::Resolving(PluginResolveState::DependenciesNotActive))
284 {
285 if self.plugin_container_manager.resolve_dependencies_state(&id, false) == Changed {
286 return Changed;
287 }
288 }
289 for id in self
291 .plugin_container_manager
292 .get_plugins_with_state(PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::DependenciesNotActive)))
293 {
294 if self.plugin_container_manager.resolve_dependencies_state(&id, true) == Changed {
295 return Changed;
296 }
297 }
298 match mode {
301 PluginResolverMode::Starting => {
302 for id in self.plugin_container_manager.get_plugins_with_state(PluginState::Resolved) {
303 if self.plugin_container_manager.start(&id).map_err(|_| ()).is_ok() {
304 return Changed;
305 }
306 }
307 }
308 PluginResolverMode::Neutral => {
309 for id in self.plugin_container_manager.get_plugins_with_state(PluginState::Resolved) {
310 if self.plugin_container_manager.resolve_dependencies_state(&id, false) == Changed {
311 return Changed;
312 }
313 }
314 }
315 PluginResolverMode::Stopping => {}
316 }
317 for id in self.plugin_container_manager.get_plugins_with_states(
319 PluginState::Starting(PluginStartingState::ConstructingProxy),
320 PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ConstructingProxy)),
321 ) {
322 if let Some(plugin_context) = self.plugin_context_factory.get() {
323 if self.plugin_container_manager.construct_proxy(&id, plugin_context) == Changed {
324 return Changed;
325 }
326 }
327 }
328 for id in self.plugin_container_manager.get_plugins_with_states(
330 PluginState::Starting(PluginStartingState::Registering),
331 PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Registering)),
332 ) {
333 if self.plugin_container_manager.register(&id) == Changed {
334 return Changed;
335 }
336 }
337 for id in self.plugin_container_manager.get_plugins_with_states(
339 PluginState::Starting(PluginStartingState::Activating),
340 PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Activating)),
341 ) {
342 if self.plugin_container_manager.activate(&id).await == Changed {
343 return Changed;
344 }
345 }
346 for id in self.plugin_container_manager.get_plugins_with_states(
348 PluginState::Stopping(PluginStoppingState::Deactivating),
349 PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating)),
350 ) {
351 if self.plugin_container_manager.deactivate(&id).await == Changed {
352 return Changed;
353 }
354 }
355 for id in self.plugin_container_manager.get_plugins_with_states(
357 PluginState::Stopping(PluginStoppingState::Unregistering),
358 PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Unregistering)),
359 ) {
360 if self.plugin_container_manager.unregister(&id) == Changed {
361 return Changed;
362 }
363 }
364 for id in self.plugin_container_manager.get_plugins_with_states(
366 PluginState::Stopping(PluginStoppingState::RemoveProxy),
367 PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::RemoveProxy)),
368 ) {
369 if self.plugin_container_manager.remove_proxy(&id) == Changed {
370 return Changed;
371 }
372 }
373 for id in self.plugin_container_manager.get_plugins_with_state(PluginState::Active) {
375 if mode == PluginResolverMode::Stopping {
376 return match self.plugin_container_manager.stop(&id) {
377 Ok(_) => Changed,
378 Err(_) => NoChange,
379 };
380 }
381 }
382 info!("Plugin resolver finished\n{}\n", self.plugin_container_manager.count_by_states());
384 NoChange
385 }
386
387 async fn transition_to_fallback_states(&self) {
388 for id in self.plugin_container_manager.get_plugins() {
390 if let Some(PluginState::Starting(_) | PluginState::Refreshing(PluginRefreshingState::Starting(_))) =
391 self.plugin_container_manager.get_plugin_state(&id)
392 {
393 info!("Plugin {id} Starting -> Resolved");
394 self.plugin_container_manager.set_state(&id, PluginState::Resolved);
395 }
396 }
397 }
398
399 fn set_mode(&self, mode: PluginResolverMode) {
400 let mut writer = self.mode.write().unwrap();
401 *writer = mode;
402 }
403
404 fn get_mode(&self) -> PluginResolverMode {
405 let reader = self.mode.read().unwrap();
406 *reader
407 }
408}
409
410#[async_trait]
411impl Lifecycle for PluginResolverImpl {
412 async fn init(&self) {
413 self.set_mode(PluginResolverMode::Starting);
414 self.resolve_until_idle().await;
415 self.log_unsatisfied_dependencies();
416 self.set_mode(PluginResolverMode::Neutral);
417 }
418
419 async fn shutdown(&self) {
420 self.set_mode(PluginResolverMode::Stopping);
421 self.plugin_container_manager.stop_all();
422 self.stop_until_all_stopped().await;
423 }
424}