reactive_graph_plugin_service_impl/
plugin_resolver_impl.rs

1use std::sync::Arc;
2use std::sync::RwLock;
3use std::time::Duration;
4
5use async_trait::async_trait;
6use log::info;
7use log::trace;
8use log::warn;
9use springtime_di::Component;
10use springtime_di::component_alias;
11use tokio::task::yield_now;
12use uuid::Uuid;
13
14use reactive_graph_config_api::ConfigManager;
15use reactive_graph_lifecycle::Lifecycle;
16use reactive_graph_plugin_api::PluginRefreshingState;
17use reactive_graph_plugin_api::PluginResolveState;
18use reactive_graph_plugin_api::PluginStartingState;
19use reactive_graph_plugin_api::PluginState;
20use reactive_graph_plugin_api::PluginStoppingState;
21use reactive_graph_plugin_api::PluginUninstallingState;
22use reactive_graph_plugin_service_api::PluginContainerManager;
23use reactive_graph_plugin_service_api::PluginContextFactory;
24use reactive_graph_plugin_service_api::PluginResolver;
25use reactive_graph_plugin_service_api::PluginResolverMode;
26use reactive_graph_plugin_service_api::PluginTransitionResult;
27use reactive_graph_plugin_service_api::PluginTransitionResult::Changed;
28use reactive_graph_plugin_service_api::PluginTransitionResult::NoChange;
29
30const MAX_ITERATIONS: u32 = 1000;
31
32// pub struct PluginResolverModeState(RwLock<PluginResolverMode>);
33
34fn create_plugin_resolver_mode() -> RwLock<PluginResolverMode> {
35    RwLock::new(PluginResolverMode::Neutral)
36}
37
38#[derive(Component)]
39pub struct PluginResolverImpl {
40    plugin_container_manager: Arc<dyn PluginContainerManager + Send + Sync>,
41
42    plugin_context_factory: Arc<dyn PluginContextFactory + Send + Sync>,
43
44    config_manager: Arc<dyn ConfigManager + Send + Sync>,
45
46    /// The resolver can be in three modes: Starting, Neutral and Stopping.
47    #[component(default = "create_plugin_resolver_mode")]
48    pub mode: RwLock<PluginResolverMode>,
49}
50
51impl PluginResolverImpl {
52    fn is_disabled(&self) -> bool {
53        self.config_manager.get_plugins_config().disabled.unwrap_or(false)
54    }
55
56    fn is_plugin_disabled(&self, id: Uuid) -> bool {
57        let stem = self.plugin_container_manager.get_stem(&id);
58        let name = self.plugin_container_manager.name(&id);
59        let short_name = self.plugin_container_manager.name_canonicalized(&id);
60
61        if let Some(enabled_plugins) = self.config_manager.get_plugins_config().enabled_plugins {
62            if let (Some(name), Some(short_name)) = (name.clone(), short_name.clone()) {
63                if !enabled_plugins.contains(&name) && !enabled_plugins.contains(&short_name) {
64                    return true;
65                }
66            }
67            return false;
68        }
69
70        if let Some(disabled_plugins) = self.config_manager.get_plugins_config().disabled_plugins {
71            if let Some(stem) = stem {
72                if disabled_plugins.contains(&stem) {
73                    return true;
74                }
75            }
76            if let Some(name) = name {
77                if disabled_plugins.contains(&name) {
78                    return true;
79                }
80            }
81            if let Some(short_name) = short_name {
82                if disabled_plugins.contains(&short_name) {
83                    return true;
84                }
85            }
86        }
87        false
88    }
89
90    fn log_unsatisfied_dependencies(&self) {
91        for id in self.plugin_container_manager.get_plugins_not_having_state(PluginState::Active) {
92            let name = self.plugin_container_manager.name_canonicalized(&id).unwrap_or(id.to_string());
93            for d in self.plugin_container_manager.get_unsatisfied_dependencies(&id) {
94                trace!("Plugin {} {} has unsatisfied dependency: {}", id, &name, d.name_version());
95                match self.plugin_container_manager.get_plugin_by_dependency(&d) {
96                    Some(dependency_id) => {
97                        let dependency_name_version = self.plugin_container_manager.name_version(&dependency_id).unwrap_or(dependency_id.to_string());
98                        // let dependency_name = self.plugin_container_manager.name_canonicalized(&dependency_id).unwrap_or_default();
99                        // let dependency_version = self.plugin_container_manager.version(&dependency_id).unwrap_or_default();
100                        let dependency_state = self
101                            .plugin_container_manager
102                            .get_plugin_state(&dependency_id)
103                            .unwrap_or(PluginState::Uninstalled);
104                        warn!(
105                            "Plugin {} has unsatisfied dependency: {} - which exists ({}) but has state {:?}",
106                            &name,
107                            d.name_version(),
108                            dependency_name_version,
109                            dependency_state
110                        );
111                    }
112                    None => {
113                        warn!("Plugin {} has unsatisfied dependency: {} - which doesn't exist", &name, d.name_version());
114                    }
115                }
116            }
117        }
118    }
119}
120
121#[async_trait]
122#[component_alias]
123impl PluginResolver for PluginResolverImpl {
124    async fn resolve_until_idle(&self) {
125        if self.is_disabled() {
126            trace!("Skipping all plugins");
127            return;
128        }
129        let mut i = 0;
130        while self.resolve().await == Changed && i < MAX_ITERATIONS {
131            i += 1;
132            if i % 50 == 0 {
133                yield_now().await
134            }
135        }
136
137        if i >= MAX_ITERATIONS {
138            warn!("Plugin resolver force stopped after {i} iterations");
139        } else {
140            trace!("Plugin resolver finished after {i} iterations");
141        }
142    }
143
144    async fn stop_until_all_stopped(&self) {
145        self.transition_to_fallback_states().await;
146        let mut i = 0;
147        while !self.plugin_container_manager.are_all_stopped() && i < MAX_ITERATIONS {
148            self.resolve_until_idle().await;
149            tokio::time::sleep(Duration::from_millis(10)).await;
150            i += 1;
151            if i % 50 == 0 {
152                yield_now().await
153            }
154            // TODO: force stop after timeout
155        }
156        if i >= MAX_ITERATIONS {
157            warn!("Plugin resolver force stopped after {i} iterations");
158        } else {
159            trace!("Plugin resolver finished after {i} iterations");
160        }
161    }
162
163    async fn resolve(&self) -> PluginTransitionResult {
164        let mode = self.get_mode();
165        trace!("Resolving plugins (mode: {:?})", mode);
166        // PluginUninstallingState::UnloadDll --> PluginUninstallingState::UninstallDll
167        for id in self.plugin_container_manager.get_plugins_with_states(
168            PluginState::Uninstalling(PluginUninstallingState::UnloadDll),
169            PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll)),
170        ) {
171            if self.plugin_container_manager.unload_dll(&id) == Changed {
172                return Changed;
173            };
174        }
175        // PluginUninstallingState::UninstallDll --> Uninstalled
176        if let Some(id) = self
177            .plugin_container_manager
178            .get_plugins_with_states(
179                PluginState::Uninstalling(PluginUninstallingState::UninstallDll),
180                PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll)),
181            )
182            .into_iter()
183            .next()
184        {
185            self.plugin_container_manager.uninstall_dll(&id);
186            return Changed;
187        }
188        // Uninstalled --> Removed
189        if let Some(id) = self
190            .plugin_container_manager
191            .get_plugins_with_state(PluginState::Uninstalled)
192            .into_iter()
193            .next()
194        {
195            self.plugin_container_manager.remove_plugin_container(&id);
196            return Changed;
197        }
198        // * --> PluginState::Disabled
199        for id in self.plugin_container_manager.get_plugins_not_having_state(PluginState::Disabled) {
200            if self.is_plugin_disabled(id) && self.plugin_container_manager.disable(&id).is_ok() {
201                return Changed;
202            }
203        }
204        // PluginResolveState::CompilerVersionMismatch --> Uninstalling
205        for id in self.plugin_container_manager.get_plugins_with_states(
206            PluginState::Resolving(PluginResolveState::CompilerVersionMismatch),
207            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::CompilerVersionMismatch)),
208        ) {
209            // TODO: Add configuration property: auto_uninstall_incompatible_plugins
210            if self
211                .plugin_container_manager
212                .set_state(&id, PluginState::Uninstalling(PluginUninstallingState::UnloadDll))
213                == Changed
214            {
215                return Changed;
216            }
217        }
218        // PluginResolveState::PluginApiVersionMismatch --> Uninstalling
219        for id in self.plugin_container_manager.get_plugins_with_states(
220            PluginState::Resolving(PluginResolveState::PluginApiVersionMismatch),
221            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginApiVersionMismatch)),
222        ) {
223            // TODO: Add configuration property: auto_uninstall_incompatible_plugins
224            if self
225                .plugin_container_manager
226                .set_state(&id, PluginState::Uninstalling(PluginUninstallingState::UnloadDll))
227                == Changed
228            {
229                return Changed;
230            }
231        }
232        // Deploying --> Installed
233        for id in self
234            .plugin_container_manager
235            .get_plugins_with_state(PluginState::Refreshing(PluginRefreshingState::Deploying))
236        {
237            if self.plugin_container_manager.deploy_dll(&id) == Changed {
238                return Changed;
239            }
240        }
241        // Installed --> PluginResolveState::Loaded
242        //           --> Uninstalling
243        for id in self
244            .plugin_container_manager
245            .get_plugins_with_states(PluginState::Installed, PluginState::Refreshing(PluginRefreshingState::Installed))
246        {
247            if self.plugin_container_manager.load_dll(&id) == Changed {
248                return Changed;
249            }
250        }
251        // PluginResolveState::Loaded --> PluginResolveState::PluginDeclarationLoaded
252        for id in self.plugin_container_manager.get_plugins_with_states(
253            PluginState::Resolving(PluginResolveState::Loaded),
254            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded)),
255        ) {
256            if self.plugin_container_manager.load_plugin_declaration(&id) == Changed {
257                return Changed;
258            }
259        }
260        // PluginResolveState::PluginDeclarationLoaded --> PluginResolveState::PluginCompatible
261        //                                             --> PluginResolveState::CompilerVersionMismatch
262        //                                             --> PluginResolveState::PluginApiVersionMismatch
263        for id in self.plugin_container_manager.get_plugins_with_states(
264            PluginState::Resolving(PluginResolveState::PluginDeclarationLoaded),
265            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded)),
266        ) {
267            if self.plugin_container_manager.check_plugin_compatibility(&id) == Changed {
268                return Changed;
269            }
270        }
271        // PluginResolveState::PluginCompatible --> PluginResolveState::DependenciesNotActive
272        for id in self.plugin_container_manager.get_plugins_with_states(
273            PluginState::Resolving(PluginResolveState::PluginCompatible),
274            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible)),
275        ) {
276            if self.plugin_container_manager.load_plugin_dependencies(&id) == Changed {
277                return Changed;
278            }
279        }
280        // PluginResolveState::DependenciesNotActive --> Resolved
281        for id in self
282            .plugin_container_manager
283            .get_plugins_with_state(PluginState::Resolving(PluginResolveState::DependenciesNotActive))
284        {
285            if self.plugin_container_manager.resolve_dependencies_state(&id, false) == Changed {
286                return Changed;
287            }
288        }
289        // Refreshing::PluginResolveState::DependenciesNotActive --> Starting(ConstructingProxy)
290        for id in self
291            .plugin_container_manager
292            .get_plugins_with_state(PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::DependenciesNotActive)))
293        {
294            if self.plugin_container_manager.resolve_dependencies_state(&id, true) == Changed {
295                return Changed;
296            }
297        }
298        // Resolved --> Starting(ConstructingProxy)
299        //          --> PluginResolveState::DependenciesNotActive
300        match mode {
301            PluginResolverMode::Starting => {
302                for id in self.plugin_container_manager.get_plugins_with_state(PluginState::Resolved) {
303                    if self.plugin_container_manager.start(&id).map_err(|_| ()).is_ok() {
304                        return Changed;
305                    }
306                }
307            }
308            PluginResolverMode::Neutral => {
309                for id in self.plugin_container_manager.get_plugins_with_state(PluginState::Resolved) {
310                    if self.plugin_container_manager.resolve_dependencies_state(&id, false) == Changed {
311                        return Changed;
312                    }
313                }
314            }
315            PluginResolverMode::Stopping => {}
316        }
317        // Starting(ConstructingProxy) --> Starting(Registering)
318        for id in self.plugin_container_manager.get_plugins_with_states(
319            PluginState::Starting(PluginStartingState::ConstructingProxy),
320            PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ConstructingProxy)),
321        ) {
322            if let Some(plugin_context) = self.plugin_context_factory.get() {
323                if self.plugin_container_manager.construct_proxy(&id, plugin_context) == Changed {
324                    return Changed;
325                }
326            }
327        }
328        // Starting(Registering) --> Starting(Activating)
329        for id in self.plugin_container_manager.get_plugins_with_states(
330            PluginState::Starting(PluginStartingState::Registering),
331            PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Registering)),
332        ) {
333            if self.plugin_container_manager.register(&id) == Changed {
334                return Changed;
335            }
336        }
337        // Starting(Activating) --> Active
338        for id in self.plugin_container_manager.get_plugins_with_states(
339            PluginState::Starting(PluginStartingState::Activating),
340            PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Activating)),
341        ) {
342            if self.plugin_container_manager.activate(&id).await == Changed {
343                return Changed;
344            }
345        }
346        // Stopping(Deactivating) --> Stopping(Unregistering)
347        for id in self.plugin_container_manager.get_plugins_with_states(
348            PluginState::Stopping(PluginStoppingState::Deactivating),
349            PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating)),
350        ) {
351            if self.plugin_container_manager.deactivate(&id).await == Changed {
352                return Changed;
353            }
354        }
355        // Stopping(Unregistering) --> Stopping(RemoveProxy)
356        for id in self.plugin_container_manager.get_plugins_with_states(
357            PluginState::Stopping(PluginStoppingState::Unregistering),
358            PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Unregistering)),
359        ) {
360            if self.plugin_container_manager.unregister(&id) == Changed {
361                return Changed;
362            }
363        }
364        // Stopping(RemoveProxy) --> Resolved
365        for id in self.plugin_container_manager.get_plugins_with_states(
366            PluginState::Stopping(PluginStoppingState::RemoveProxy),
367            PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::RemoveProxy)),
368        ) {
369            if self.plugin_container_manager.remove_proxy(&id) == Changed {
370                return Changed;
371            }
372        }
373        // Active --> Deactivating
374        for id in self.plugin_container_manager.get_plugins_with_state(PluginState::Active) {
375            if mode == PluginResolverMode::Stopping {
376                return match self.plugin_container_manager.stop(&id) {
377                    Ok(_) => Changed,
378                    Err(_) => NoChange,
379                };
380            }
381        }
382        // No more actions possible
383        info!("Plugin resolver finished\n{}\n", self.plugin_container_manager.count_by_states());
384        NoChange
385    }
386
387    async fn transition_to_fallback_states(&self) {
388        // Stop any failed transitions
389        for id in self.plugin_container_manager.get_plugins() {
390            if let Some(PluginState::Starting(_) | PluginState::Refreshing(PluginRefreshingState::Starting(_))) =
391                self.plugin_container_manager.get_plugin_state(&id)
392            {
393                info!("Plugin {id} Starting -> Resolved");
394                self.plugin_container_manager.set_state(&id, PluginState::Resolved);
395            }
396        }
397    }
398
399    fn set_mode(&self, mode: PluginResolverMode) {
400        let mut writer = self.mode.write().unwrap();
401        *writer = mode;
402    }
403
404    fn get_mode(&self) -> PluginResolverMode {
405        let reader = self.mode.read().unwrap();
406        *reader
407    }
408}
409
410#[async_trait]
411impl Lifecycle for PluginResolverImpl {
412    async fn init(&self) {
413        self.set_mode(PluginResolverMode::Starting);
414        self.resolve_until_idle().await;
415        self.log_unsatisfied_dependencies();
416        self.set_mode(PluginResolverMode::Neutral);
417    }
418
419    async fn shutdown(&self) {
420        self.set_mode(PluginResolverMode::Stopping);
421        self.plugin_container_manager.stop_all();
422        self.stop_until_all_stopped().await;
423    }
424}