reactive_graph_plugin_service_impl/
plugin_repository_manager_impl.rs

1use std::collections::HashMap;
2use std::env::consts::DLL_EXTENSION;
3use std::fs;
4use std::path::Path;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::RwLock;
8
9use async_trait::async_trait;
10use log::debug;
11use log::error;
12use log::info;
13use log::trace;
14use log::warn;
15use notify::Event;
16use notify::EventKind::Access;
17use notify::RecommendedWatcher;
18use notify::RecursiveMode;
19use notify::Watcher;
20use notify::event::AccessKind::Close;
21use notify::event::AccessMode::Write;
22use springtime_di::Component;
23use springtime_di::component_alias;
24use tokio::sync::mpsc;
25use uuid::Uuid;
26use walkdir::WalkDir;
27
28use reactive_graph_config_api::ConfigManager;
29use reactive_graph_lifecycle::Lifecycle;
30use reactive_graph_plugin_api::HotDeployError;
31use reactive_graph_plugin_api::PLUGIN_NAME_PREFIX;
32use reactive_graph_plugin_api::PluginState;
33use reactive_graph_plugin_service_api::PluginContainerManager;
34use reactive_graph_plugin_service_api::PluginRepositoryManager;
35use reactive_graph_plugin_service_api::PluginResolver;
36
37use crate::plugin_paths::get_install_path;
38use crate::plugin_paths::get_stem;
39
40pub type HotDeployWatcher = RwLock<Option<RecommendedWatcher>>;
41
42fn create_hot_deploy_watcher() -> HotDeployWatcher {
43    RwLock::new(None)
44}
45
46#[derive(Component)]
47pub struct PluginRepositoryManagerImpl {
48    plugin_container_manager: Arc<dyn PluginContainerManager + Send + Sync>,
49
50    plugin_resolver: Arc<dyn PluginResolver + Send + Sync>,
51
52    config_manager: Arc<dyn ConfigManager + Send + Sync>,
53
54    #[component(default = "create_hot_deploy_watcher")]
55    hot_deploy_watcher: HotDeployWatcher,
56}
57
58impl PluginRepositoryManagerImpl {
59    fn create_and_register_plugin_container(&self, path: PathBuf) -> Option<Uuid> {
60        if !is_dll(&path) {
61            return None;
62        }
63        if let Some(stem) = get_stem(&path) {
64            return self.plugin_container_manager.create_and_register_plugin_container(stem, path);
65        }
66        None
67    }
68
69    async fn create_hot_deploy_watcher(&self) {
70        let plugin_container_manager = self.plugin_container_manager.clone();
71        let plugin_resolver = self.plugin_resolver.clone();
72        let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(32);
73        tokio::spawn(async move {
74            trace!("Hot Deploy Watcher started");
75            while let Some(r) = rx.recv().await {
76                match r {
77                    Ok(event) => {
78                        if event.kind != Access(Close(Write)) {
79                            continue;
80                        }
81                        trace!("Hot Deploy Watcher: Detected file system activity: {event:?}");
82                        for path in event.paths.clone() {
83                            let Some(stem) = get_stem(&path) else {
84                                continue;
85                            };
86                            if !is_dll(&path) {
87                                continue;
88                            }
89                            if plugin_container_manager.has(&stem) {
90                                // If plugin with the same stem is already installed, redeploy and start resolver
91                                if let Some(id) = plugin_container_manager.get_id(&stem) {
92                                    match plugin_container_manager.redeploy(&id) {
93                                        Ok(_) => {
94                                            plugin_resolver.resolve_until_idle().await;
95                                            // Start dependent plugins
96                                            while plugin_container_manager.start_dependent_with_satisfied_dependencies(&id) {
97                                                // Resolve until all dependent plugins are started
98                                                plugin_resolver.resolve_until_idle().await;
99                                            }
100                                            plugin_resolver.transition_to_fallback_states().await;
101                                        }
102                                        Err(e) => {
103                                            error!("Failed to redeploy plugin {} {}: {:?}", &stem, &id, e);
104                                        }
105                                    }
106                                }
107                            } else {
108                                // Deploy new plugins to the installation folder
109                                if let Ok(install_path) = deploy_plugin(path) {
110                                    // And register a new plugin container and start resolver
111                                    if let Some(id) = plugin_container_manager.create_and_register_plugin_container(stem, install_path) {
112                                        plugin_resolver.resolve_until_idle().await;
113                                        if plugin_container_manager.start(&id).is_ok() {
114                                            plugin_resolver.resolve_until_idle().await;
115                                            // Start dependent plugins
116                                            while plugin_container_manager.start_dependent_with_satisfied_dependencies(&id) {
117                                                // Resolve until all dependent plugins are started
118                                                plugin_resolver.resolve_until_idle().await;
119                                            }
120                                        }
121                                        plugin_resolver.transition_to_fallback_states().await;
122                                    }
123                                }
124                            }
125                        }
126                        for path in event.paths {
127                            let Some(stem) = get_stem(&path) else {
128                                continue;
129                            };
130                            if !is_dll(&path) {
131                                continue;
132                            }
133                            let Some(id) = plugin_container_manager.get_id(&stem) else {
134                                continue;
135                            };
136                            let name = plugin_container_manager.name(&id).unwrap_or_default().replace(PLUGIN_NAME_PREFIX, "");
137                            let version = plugin_container_manager.version(&id).unwrap_or(String::from("?.?.?"));
138                            // Warn about disabled plugins
139                            if let Some(state) = plugin_container_manager.get_plugin_state(&id) {
140                                if state == PluginState::Disabled {
141                                    info!("[DISABLED] {name} {version}");
142                                }
143                            }
144                            // Warn about unsatisfied dependencies
145                            for d in plugin_container_manager.get_unsatisfied_dependencies(&id) {
146                                trace!("Plugin {} {} has unsatisfied dependency: {}:{}", id, &name, d.name.replace(PLUGIN_NAME_PREFIX, ""), d.version);
147                                match plugin_container_manager.get_plugin_by_dependency(&d) {
148                                    Some(dependency_id) => {
149                                        let dependency_name = plugin_container_manager
150                                            .name(&dependency_id)
151                                            .unwrap_or_default()
152                                            .replace(PLUGIN_NAME_PREFIX, "");
153                                        let dependency_version = plugin_container_manager.version(&dependency_id).unwrap_or(String::from("?.?.?"));
154                                        let dependency_state = plugin_container_manager.get_plugin_state(&dependency_id).unwrap_or(PluginState::Uninstalled);
155                                        warn!(
156                                            "Plugin {} has unsatisfied dependency: {}:{} - which exists ({} {}) but has state {:?}",
157                                            &name,
158                                            d.name.replace(PLUGIN_NAME_PREFIX, ""),
159                                            d.version,
160                                            dependency_name,
161                                            dependency_version,
162                                            dependency_state
163                                        );
164                                    }
165                                    None => {
166                                        warn!(
167                                            "Plugin {} has unsatisfied dependency: {}:{} - which doesn't exist",
168                                            &name,
169                                            d.name.replace(PLUGIN_NAME_PREFIX, ""),
170                                            d.version
171                                        );
172                                    }
173                                }
174                            }
175                        }
176                    }
177                    Err(e) => {
178                        error!("Hot Deploy Watcher: Error: {e}");
179                    }
180                }
181            }
182            trace!("Hot Deploy Watcher: Finished");
183        });
184        let watcher = notify::recommended_watcher(move |r: notify::Result<Event>| {
185            let tx = tx.clone();
186            futures::executor::block_on(async {
187                match tx.send(r).await {
188                    Ok(_) => {}
189                    Err(e) => {
190                        trace!("SendError {e}");
191                    }
192                }
193            });
194        })
195        .ok();
196        let mut writer = self.hot_deploy_watcher.write().unwrap();
197        *writer = watcher;
198    }
199
200    fn destroy_hot_deploy_watcher(&self) {
201        let mut writer = self.hot_deploy_watcher.write().unwrap();
202        *writer = None;
203    }
204}
205
206#[async_trait]
207#[component_alias]
208impl PluginRepositoryManager for PluginRepositoryManagerImpl {
209    fn scan_deploy_repository(&self) {
210        let plugins_config = self.config_manager.get_plugins_config();
211        let Some(hot_deploy_location) = plugins_config.get_hot_deploy_location() else {
212            return;
213        };
214        trace!("Scanning plugin hot deploy folder {hot_deploy_location:?}");
215        let Ok(dir) = fs::read_dir(hot_deploy_location) else {
216            return;
217        };
218        for entry in dir.flatten() {
219            if let Ok(file_type) = entry.file_type() {
220                if !file_type.is_file() {
221                    continue;
222                }
223                let _ = deploy_plugin(entry.path());
224            }
225        }
226    }
227
228    fn remove_duplicates(&self) {
229        let plugins_config = self.config_manager.get_plugins_config();
230        let Some(install_location) = plugins_config.get_install_location() else {
231            return;
232        };
233        let mut installed_plugins: HashMap<String, (u64, PathBuf)> = HashMap::new();
234        let mut plugins_to_remove: Vec<PathBuf> = Vec::new();
235
236        for entry in WalkDir::new(install_location)
237            .into_iter()
238            .filter_map(Result::ok)
239            .filter(|e| !e.file_type().is_dir())
240        {
241            let p = entry.path();
242            if let Some((stem, timestamp)) = p
243                .file_stem()
244                .and_then(|stem| {
245                    stem.to_string_lossy()
246                        .rsplit_once('.')
247                        .map(|(stem, timestamp)| (String::from(stem), String::from(timestamp)))
248                })
249                .and_then(|(stem, timestamp)| timestamp.parse::<u64>().ok().map(|timestamp| (stem, timestamp)))
250            {
251                // let timestamp = timestamp.parse::<u64>();
252                match installed_plugins.get_mut(&stem) {
253                    Some(entry) => {
254                        // (timestamp2, p2)
255                        if entry.0 < timestamp {
256                            plugins_to_remove.push(entry.1.clone());
257                            entry.0 = timestamp;
258                            entry.1 = PathBuf::from(p);
259                            // filenames.insert(stem, (timestamp, PathBuf::from(p)));
260                        } else {
261                            plugins_to_remove.push(PathBuf::from(p));
262                        }
263                    }
264                    None => {
265                        installed_plugins.insert(stem, (timestamp, PathBuf::from(p)));
266                    }
267                }
268            }
269        }
270        for plugin_to_remove in plugins_to_remove {
271            if fs::remove_file(&plugin_to_remove).is_ok() {
272                trace!("Removed duplicate plugin: {}", plugin_to_remove.display());
273            }
274        }
275    }
276
277    fn scan_plugin_repository(&self) {
278        let plugins_config = self.config_manager.get_plugins_config();
279        let Some(install_location) = plugins_config.get_install_location() else {
280            return;
281        };
282        trace!("Scanning plugin installation folder {install_location:?}");
283        let Ok(dir) = fs::read_dir(install_location) else {
284            return;
285        };
286        for entry in dir.flatten() {
287            if entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
288                self.create_and_register_plugin_container(entry.path());
289            }
290        }
291    }
292
293    fn watch_hot_deploy(&self) {
294        let Some(hot_deploy_location) = self.config_manager.get_plugins_config().get_hot_deploy_location() else {
295            return;
296        };
297        let mut writer = self.hot_deploy_watcher.write().unwrap();
298        if let Some(recommended_watcher) = writer.as_mut() {
299            // Add a path to be watched. All files and directories at that path and
300            // below will be monitored for changes.
301            match recommended_watcher.watch(&hot_deploy_location, RecursiveMode::NonRecursive) {
302                Ok(_) => {
303                    trace!("Watching hot deploy folder {hot_deploy_location:?}");
304                }
305                Err(e) => {
306                    error!("Failed to watch hot deploy folder {hot_deploy_location:?}: {e}");
307                }
308            }
309        }
310    }
311
312    fn unwatch_hot_deploy(&self) {
313        let Some(hot_deploy_location) = self.config_manager.get_plugins_config().get_hot_deploy_location() else {
314            return;
315        };
316        let mut writer = self.hot_deploy_watcher.write().unwrap();
317        if let Some(recommended_watcher) = writer.as_mut() {
318            let _ = recommended_watcher.unwatch(&hot_deploy_location);
319        }
320    }
321}
322
323#[async_trait]
324impl Lifecycle for PluginRepositoryManagerImpl {
325    async fn init(&self) {
326        // Initially, the deploy folder will be scanned. Detected plugins will be copied to the
327        // install folder before the install folder will be scanned. Eventually existing plugins
328        // will be overwritten by the version in the deploy folder.
329        self.scan_deploy_repository();
330
331        self.remove_duplicates();
332
333        // Initially, scans the plugin installation folder and creates and registers plugin
334        // containers for each plugin.
335        self.scan_plugin_repository();
336
337        // Create a deploy watcher.
338        self.create_hot_deploy_watcher().await;
339    }
340
341    async fn post_init(&self) {
342        // Initiates watching the hot deployment folder.
343        self.watch_hot_deploy();
344    }
345
346    async fn pre_shutdown(&self) {
347        self.unwatch_hot_deploy();
348    }
349
350    async fn shutdown(&self) {
351        self.destroy_hot_deploy_watcher();
352    }
353}
354
355fn is_dll(path: &Path) -> bool {
356    if let Some(extension) = path.extension().and_then(|e| e.to_str()) {
357        return extension == DLL_EXTENSION;
358    }
359    false
360}
361
362fn deploy_plugin(deploy_path: PathBuf) -> Result<PathBuf, HotDeployError> {
363    debug!("Detected new plugin {}", deploy_path.display());
364    if !is_dll(&deploy_path) {
365        return Err(HotDeployError::NoDynamicLinkLibrary);
366    }
367    let Some(install_path) = get_install_path(&deploy_path) else {
368        return Err(HotDeployError::InvalidInstallPath);
369    };
370    match fs::rename(&deploy_path, &install_path) {
371        Ok(_) => {
372            debug!("Moved plugin from {} to {}", deploy_path.display(), install_path.display());
373            Ok(install_path)
374        }
375        Err(_) => {
376            error!("Failed to moved plugin from {} to {}", deploy_path.display(), install_path.display());
377            Err(HotDeployError::MoveError)
378        }
379    }
380}