reactive_graph_plugin_service_impl/
plugin_container_manager_impl.rs

1use std::path::PathBuf;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use dashmap::DashMap;
6use dashmap::DashSet;
7use log::debug;
8use log::trace;
9use semver::Version;
10use semver::VersionReq;
11use springtime_di::Component;
12use springtime_di::component_alias;
13use uuid::Uuid;
14
15use reactive_graph_lifecycle::Lifecycle;
16use reactive_graph_plugin_api::PluginContext;
17use reactive_graph_plugin_api::PluginDependency;
18use reactive_graph_plugin_api::PluginDeployError;
19use reactive_graph_plugin_api::PluginDisableError;
20use reactive_graph_plugin_api::PluginRefreshingState;
21use reactive_graph_plugin_api::PluginResolveState;
22use reactive_graph_plugin_api::PluginStartError;
23use reactive_graph_plugin_api::PluginStartingState;
24use reactive_graph_plugin_api::PluginState;
25use reactive_graph_plugin_api::PluginStopError;
26use reactive_graph_plugin_api::PluginStoppingState;
27use reactive_graph_plugin_api::PluginUninstallError;
28use reactive_graph_plugin_api::PluginUninstallingState;
29use reactive_graph_plugin_service_api::PluginContainerManager;
30use reactive_graph_plugin_service_api::PluginTransitionResult;
31use reactive_graph_plugin_service_api::PluginTransitionResult::Changed;
32use reactive_graph_plugin_service_api::PluginTransitionResult::NoChange;
33
34use crate::PluginContainer;
35
36#[derive(Component)]
37pub struct PluginContainerManagerImpl {
38    /// The plugin containers.
39    #[component(default = "DashMap::new")]
40    pub plugin_containers: DashMap<Uuid, PluginContainer>,
41}
42
43impl PluginContainerManagerImpl {}
44
45#[async_trait]
46#[component_alias]
47impl PluginContainerManager for PluginContainerManagerImpl {
48    fn create_and_register_plugin_container(&self, stem: String, path: PathBuf) -> Option<Uuid> {
49        if self.has(&stem) {
50            return None;
51        }
52        trace!("Creating plugin container for plugin {} located at {}", &stem, path.display());
53        let plugin_container = PluginContainer::new(stem.clone(), path);
54        let id = plugin_container.id;
55        trace!("Registering plugin container {} located at {}", &id, &stem);
56        self.plugin_containers.insert(id, plugin_container);
57        Some(id)
58    }
59
60    fn remove_plugin_container(&self, id: &Uuid) {
61        self.plugin_containers.remove(id);
62    }
63
64    fn has(&self, stem: &str) -> bool {
65        self.plugin_containers.iter().any(|p| p.stem.eq(stem))
66    }
67
68    fn get_id(&self, stem: &str) -> Option<Uuid> {
69        self.plugin_containers
70            .iter()
71            .find(|p| p.stem.eq(stem) || p.name().map(|name| name.eq(stem)).unwrap_or(false))
72            .map(|p| p.id)
73    }
74
75    fn get_stem(&self, id: &Uuid) -> Option<String> {
76        self.plugin_containers.get(id).map(|p| p.value().stem.clone())
77    }
78
79    fn name(&self, id: &Uuid) -> Option<String> {
80        self.plugin_containers.get(id).and_then(|p| p.value().name())
81    }
82
83    fn name_canonicalized(&self, id: &Uuid) -> Option<String> {
84        self.plugin_containers.get(id).and_then(|p| p.value().name_canonicalized())
85    }
86
87    fn name_version(&self, id: &Uuid) -> Option<String> {
88        self.plugin_containers.get(id).and_then(|p| p.value().name_version())
89    }
90
91    fn description(&self, id: &Uuid) -> Option<String> {
92        self.plugin_containers.get(id).and_then(|p| p.value().description())
93    }
94
95    fn version(&self, id: &Uuid) -> Option<String> {
96        self.plugin_containers.get(id).and_then(|p| p.value().version())
97    }
98
99    fn rustc_version(&self, id: &Uuid) -> Option<String> {
100        self.plugin_containers.get(id).and_then(|p| p.value().rustc_version())
101    }
102
103    fn plugin_api_version(&self, id: &Uuid) -> Option<String> {
104        self.plugin_containers.get(id).and_then(|p| p.value().plugin_api_version())
105    }
106
107    fn count(&self) -> usize {
108        self.plugin_containers.len()
109    }
110
111    fn count_by_state(&self, state: &PluginState) -> usize {
112        self.plugin_containers.iter().filter(|p| &p.state == state).count()
113    }
114
115    fn count_by_states(&self) -> String {
116        let states = [
117            PluginState::Installed,
118            PluginState::Resolving(PluginResolveState::Loaded),
119            PluginState::Resolving(PluginResolveState::PluginDeclarationLoaded),
120            PluginState::Resolving(PluginResolveState::CompilerVersionMismatch),
121            PluginState::Resolving(PluginResolveState::PluginApiVersionMismatch),
122            PluginState::Resolving(PluginResolveState::DependenciesNotActive),
123            PluginState::Resolving(PluginResolveState::PluginCompatible),
124            PluginState::Resolved,
125            PluginState::Starting(PluginStartingState::ConstructingProxy),
126            PluginState::Starting(PluginStartingState::Registering),
127            PluginState::Starting(PluginStartingState::Activating),
128            PluginState::Starting(PluginStartingState::ActivationFailed),
129            PluginState::Active,
130            PluginState::Stopping(PluginStoppingState::Deactivating),
131            PluginState::Stopping(PluginStoppingState::Unregistering),
132            PluginState::Stopping(PluginStoppingState::RemoveProxy),
133            PluginState::Uninstalling(PluginUninstallingState::UnloadDll),
134            PluginState::Uninstalling(PluginUninstallingState::UninstallDll),
135            PluginState::Uninstalled,
136            PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating)),
137            PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Unregistering)),
138            PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::RemoveProxy)),
139            PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll)),
140            PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll)),
141            PluginState::Refreshing(PluginRefreshingState::Deploying),
142            PluginState::Refreshing(PluginRefreshingState::Installed),
143            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded)),
144            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded)),
145            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::CompilerVersionMismatch)),
146            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginApiVersionMismatch)),
147            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::DependenciesNotActive)),
148            PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible)),
149            PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ConstructingProxy)),
150            PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Registering)),
151            PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Activating)),
152            PluginState::Disabled,
153        ];
154        states.iter().map(|state| self.count_by_state_str(state)).collect()
155    }
156
157    fn count_by_state_str(&self, state: &PluginState) -> String {
158        let count = self.count_by_state(state);
159        if count > 0 {
160            format!("\n  {:?}: {}", state, self.count_by_state(state))
161        } else {
162            "".to_owned()
163        }
164    }
165
166    fn get_plugin_path(&self, id: &Uuid) -> Option<String> {
167        self.plugin_containers.get(id).map(|e| String::from(e.path.to_string_lossy()))
168    }
169
170    fn get_plugin_state(&self, id: &Uuid) -> Option<PluginState> {
171        self.plugin_containers.get(id).map(|e| e.state)
172    }
173
174    fn get_plugins(&self) -> Vec<Uuid> {
175        self.plugin_containers.iter().map(|p| *p.key()).collect()
176    }
177
178    fn get_plugins_with_state(&self, state: PluginState) -> Vec<Uuid> {
179        self.plugin_containers.iter().filter(|p| p.state == state).map(|p| p.key().to_owned()).collect()
180    }
181
182    fn get_plugins_with_states(&self, state1: PluginState, state2: PluginState) -> Vec<Uuid> {
183        self.plugin_containers
184            .iter()
185            .filter(|p| p.state == state1 || p.state == state2)
186            .map(|p| p.key().to_owned())
187            .collect()
188    }
189
190    fn get_plugins_not_having_state(&self, state: PluginState) -> Vec<Uuid> {
191        self.plugin_containers.iter().filter(|p| p.state != state).map(|p| p.key().to_owned()).collect()
192    }
193
194    fn get_plugin_by_dependency(&self, plugin_dependency: &PluginDependency) -> Option<Uuid> {
195        let version_requirement = VersionReq::parse(plugin_dependency.version).ok()?;
196        self.plugin_containers
197            .iter()
198            .find(|e| {
199                let reader = e.plugin_declaration.read().unwrap();
200                match *reader {
201                    Some(plugin_declaration) => {
202                        plugin_declaration.name == plugin_dependency.name
203                            && Version::parse(plugin_declaration.version)
204                                .map(|version| version_requirement.matches(&version))
205                                .unwrap_or(false)
206                    }
207                    None => false,
208                }
209            })
210            .map(|e| *e.key())
211    }
212
213    fn deploy_dll(&self, id: &Uuid) -> PluginTransitionResult {
214        match self.plugin_containers.get_mut(id) {
215            Some(mut plugin_container) => {
216                trace!("Plugin {id} is deploying the dynamic linked library");
217                plugin_container.deploy_dll()
218            }
219            None => NoChange,
220        }
221    }
222
223    fn load_dll(&self, id: &Uuid) -> PluginTransitionResult {
224        match self.plugin_containers.get_mut(id) {
225            Some(mut plugin_container) => {
226                trace!("Plugin {id} is loading the dynamic linked library");
227                plugin_container.load_dll()
228            }
229            None => NoChange,
230        }
231    }
232
233    fn load_plugin_declaration(&self, id: &Uuid) -> PluginTransitionResult {
234        match self.plugin_containers.get_mut(id) {
235            Some(mut plugin_container) => {
236                trace!("Plugin {id} is loading the plugin declaration");
237                plugin_container.value_mut().load_plugin_declaration()
238            }
239            None => NoChange,
240        }
241    }
242
243    fn check_plugin_compatibility(&self, id: &Uuid) -> PluginTransitionResult {
244        match self.plugin_containers.get_mut(id) {
245            Some(mut plugin_container) => {
246                trace!("Plugin {id} is checked for compatibility");
247                plugin_container.value_mut().check_compatibility()
248            }
249            None => NoChange,
250        }
251    }
252
253    fn load_plugin_dependencies(&self, id: &Uuid) -> PluginTransitionResult {
254        match self.plugin_containers.get_mut(id) {
255            Some(mut plugin_container) => {
256                trace!("Plugin {id} is loading the list of dependencies");
257                plugin_container.value_mut().load_plugin_dependencies()
258            }
259            None => NoChange,
260        }
261    }
262
263    fn get_dependency_state(&self, dependency: &PluginDependency) -> PluginState {
264        self.get_plugin_by_dependency(dependency)
265            .and_then(|id| self.get_plugin_state(&id))
266            .unwrap_or(PluginState::Uninstalled)
267    }
268
269    fn has_dependencies(&self, id: &Uuid) -> bool {
270        self.plugin_containers.get(id).iter().any(|e| !e.dependencies.is_empty())
271    }
272
273    fn get_dependencies(&self, id: &Uuid) -> DashSet<PluginDependency> {
274        self.plugin_containers.get(id).map(|e| e.dependencies.clone()).unwrap_or_default()
275    }
276
277    fn has_unsatisfied_dependencies(&self, id: &Uuid) -> bool {
278        if !self.has_dependencies(id) {
279            return false;
280        }
281        return !self.get_dependencies(id).iter().all(|d| self.get_dependency_state(&d) == PluginState::Active);
282    }
283
284    fn get_unsatisfied_dependencies(&self, id: &Uuid) -> DashSet<PluginDependency> {
285        self.get_dependencies(id)
286            .iter()
287            .filter(|d| self.get_dependency_state(d) != PluginState::Active)
288            .map(|d| *d)
289            .collect()
290    }
291
292    fn get_dependents(&self, id: &Uuid) -> Vec<Uuid> {
293        let mut dependents = Vec::new();
294        for plugin_container in self.plugin_containers.iter() {
295            for dependency in plugin_container.dependencies.iter() {
296                if let Some(dependency_id) = self.get_plugin_by_dependency(&dependency) {
297                    if &dependency_id == id {
298                        dependents.push(plugin_container.id);
299                    }
300                }
301            }
302        }
303        dependents
304    }
305
306    fn set_state(&self, id: &Uuid, new_state: PluginState) -> PluginTransitionResult {
307        match self.plugin_containers.get_mut(id) {
308            Some(mut plugin_container) => {
309                if plugin_container.state != new_state {
310                    plugin_container.state = new_state;
311                    return Changed;
312                }
313                NoChange
314            }
315            None => NoChange,
316        }
317    }
318
319    fn resolve_dependencies_state(&self, id: &Uuid, refreshing: bool) -> PluginTransitionResult {
320        if !self.has_unsatisfied_dependencies(id) {
321            debug!("Plugin {id} has no unsatisfied dependencies");
322            let new_state = if refreshing {
323                PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ConstructingProxy))
324            } else {
325                PluginState::Resolved
326            };
327            self.set_state(id, new_state)
328        } else {
329            trace!("Plugin {id} has unsatisfied dependencies");
330            let new_state = if refreshing {
331                PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::DependenciesNotActive))
332            } else {
333                PluginState::Resolving(PluginResolveState::DependenciesNotActive)
334            };
335            self.set_state(id, new_state)
336        }
337    }
338
339    fn construct_proxy(&self, id: &Uuid, plugin_context: Arc<dyn PluginContext + Send + Sync>) -> PluginTransitionResult {
340        match self.plugin_containers.get_mut(id) {
341            Some(mut plugin_container) => plugin_container.construct_proxy(plugin_context.clone()),
342            None => NoChange,
343        }
344    }
345
346    fn register(&self, id: &Uuid) -> PluginTransitionResult {
347        match self.plugin_containers.get_mut(id) {
348            Some(mut plugin_container) => {
349                if plugin_container.state != PluginState::Starting(PluginStartingState::Registering)
350                    && plugin_container.state != PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Registering))
351                {
352                    return NoChange;
353                }
354                let refreshing = plugin_container.state == PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Registering));
355                if refreshing {
356                    plugin_container.state = PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Activating));
357                } else {
358                    plugin_container.state = PluginState::Starting(PluginStartingState::Activating);
359                }
360                Changed
361            }
362            None => NoChange,
363        }
364    }
365
366    async fn activate(&self, id: &Uuid) -> PluginTransitionResult {
367        match self.plugin_containers.get_mut(id) {
368            Some(mut plugin_container) => plugin_container.activate().await,
369            None => NoChange,
370        }
371    }
372
373    fn are_all_stopped(&self) -> bool {
374        self.plugin_containers.iter().all(|p| match p.state {
375            PluginState::Installed => true,
376            PluginState::Resolving(_) => true,
377            PluginState::Resolved => true,
378            PluginState::Starting(_) => false,
379            PluginState::Active => false,
380            PluginState::Stopping(_) => false,
381            PluginState::Refreshing(PluginRefreshingState::Stopping(_)) => false,
382            PluginState::Refreshing(PluginRefreshingState::Uninstalling(_)) => true,
383            PluginState::Refreshing(PluginRefreshingState::Deploying) => true,
384            PluginState::Refreshing(PluginRefreshingState::Installed) => true,
385            PluginState::Refreshing(PluginRefreshingState::Resolving(_)) => true,
386            PluginState::Refreshing(PluginRefreshingState::Starting(_)) => false,
387            PluginState::Uninstalling(_) => true,
388            PluginState::Uninstalled => true,
389            PluginState::Disabled => true,
390        })
391    }
392
393    async fn deactivate(&self, id: &Uuid) -> PluginTransitionResult {
394        match self.plugin_containers.get_mut(id) {
395            Some(mut plugin_container) => plugin_container.deactivate().await,
396            None => NoChange,
397        }
398    }
399
400    fn unregister(&self, id: &Uuid) -> PluginTransitionResult {
401        match self.plugin_containers.get_mut(id) {
402            Some(mut plugin_container) => {
403                if plugin_container.state != PluginState::Stopping(PluginStoppingState::Unregistering)
404                    && plugin_container.state != PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Unregistering))
405                {
406                    return NoChange;
407                }
408                let refreshing = plugin_container.state == PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Unregistering));
409                if refreshing {
410                    plugin_container.state = PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::RemoveProxy));
411                } else {
412                    plugin_container.state = PluginState::Stopping(PluginStoppingState::RemoveProxy);
413                }
414                Changed
415            }
416            None => NoChange,
417        }
418    }
419
420    fn remove_proxy(&self, id: &Uuid) -> PluginTransitionResult {
421        match self.plugin_containers.get_mut(id) {
422            Some(mut plugin_container) => plugin_container.remove_proxy(),
423            None => NoChange,
424        }
425    }
426
427    fn unload_dll(&self, id: &Uuid) -> PluginTransitionResult {
428        match self.plugin_containers.get_mut(id) {
429            Some(mut plugin_container) => plugin_container.value_mut().unload_dll(),
430            None => NoChange,
431        }
432    }
433
434    fn uninstall_dll(&self, id: &Uuid) -> PluginTransitionResult {
435        match self.plugin_containers.get_mut(id) {
436            Some(mut plugin_container) => match plugin_container.value_mut().uninstall_dll() {
437                Changed => {
438                    self.plugin_containers.remove(id);
439                    Changed
440                }
441                NoChange => NoChange,
442            },
443            None => NoChange,
444        }
445    }
446
447    fn start(&self, id: &Uuid) -> Result<(), PluginStartError> {
448        match self.plugin_containers.get_mut(id) {
449            Some(mut plugin_container) => plugin_container.start(),
450            None => Err(PluginStartError::Uninstalled),
451        }
452    }
453
454    fn start_by_stem(&self, stem: &str) -> Result<(), PluginStartError> {
455        if let Some(id) = self.get_id(stem) {
456            return self.start(&id);
457        }
458        Err(PluginStartError::Uninstalled)
459    }
460
461    fn start_dependent_with_satisfied_dependencies(&self, id: &Uuid) -> bool {
462        let mut starting_at_least_one = false;
463        for dependent_id in self.get_dependents(id) {
464            if let Some(dependency_state) = self.get_plugin_state(&dependent_id) {
465                match dependency_state {
466                    PluginState::Resolved => {
467                        // Starting dependent plugin which is now resolved
468                        trace!("Starting {:?} dependent plugin {}", dependency_state, &dependent_id);
469                        if self.start(&dependent_id).is_ok() {
470                            starting_at_least_one = true;
471                        }
472                    }
473                    PluginState::Active => {
474                        // Recursively starting dependent plugins which are now active
475                        trace!("Recursively resolving {:?} dependent plugins of {}", dependency_state, &dependent_id);
476                        if self.start_dependent_with_satisfied_dependencies(&dependent_id) {
477                            starting_at_least_one = true;
478                        }
479                    }
480                    _ => {}
481                }
482            }
483        }
484        starting_at_least_one
485    }
486
487    fn stop(&self, id: &Uuid) -> Result<(), PluginStopError> {
488        let result = match self.plugin_containers.get_mut(id) {
489            Some(mut plugin_container) => plugin_container.stop(),
490            None => Err(PluginStopError::Uninstalled),
491        };
492        while self.stop_active_with_unsatisfied_dependencies() {}
493        result
494    }
495
496    fn stop_by_stem(&self, stem: &str) -> Result<(), PluginStopError> {
497        if let Some(id) = self.get_id(stem) {
498            return self.stop(&id);
499        }
500        Err(PluginStopError::Uninstalled)
501    }
502
503    fn stop_all(&self) {
504        for id in self.get_plugins() {
505            // TODO
506            let _ = self.stop(&id);
507        }
508    }
509
510    fn stop_active_with_unsatisfied_dependencies(&self) -> bool {
511        let mut stopping_at_least_one = false;
512        for id in self.get_plugins() {
513            if let Some(state) = self.get_plugin_state(&id) {
514                if state == PluginState::Active && self.has_unsatisfied_dependencies(&id) && self.stop(&id).is_ok() {
515                    stopping_at_least_one = true;
516                }
517            }
518        }
519        stopping_at_least_one
520    }
521
522    fn uninstall(&self, id: &Uuid) -> Result<(), PluginUninstallError> {
523        match self.plugin_containers.get_mut(id) {
524            Some(mut plugin_container) => plugin_container.uninstall(),
525            None => Err(PluginUninstallError::AlreadyUninstalled),
526        }
527    }
528
529    fn redeploy(&self, id: &Uuid) -> Result<(), PluginDeployError> {
530        match self.plugin_containers.get_mut(id) {
531            Some(mut plugin_container) => plugin_container.redeploy(),
532            None => Err(PluginDeployError::NotFound),
533        }
534    }
535
536    fn disable(&self, id: &Uuid) -> Result<(), PluginDisableError> {
537        match self.plugin_containers.get_mut(id) {
538            Some(mut plugin_container) => plugin_container.disable(),
539            None => Err(PluginDisableError::NotFound),
540        }
541    }
542}
543
544#[async_trait]
545impl Lifecycle for PluginContainerManagerImpl {}