reactive_graph_plugin_service_impl/
plugin_repository_manager_impl.rs

1use std::collections::HashMap;
2use std::env::consts::DLL_EXTENSION;
3use std::ffi::OsStr;
4use std::fs;
5use std::fs::File;
6use std::fs::create_dir_all;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::RwLock;
11
12use async_trait::async_trait;
13use log::debug;
14use log::error;
15use log::info;
16use log::trace;
17use log::warn;
18use notify::Event;
19use notify::EventKind::Access;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher;
23use notify::event::AccessKind::Close;
24use notify::event::AccessMode::Write;
25use springtime_di::Component;
26use springtime_di::component_alias;
27use tokio::sync::mpsc;
28use uuid::Uuid;
29use walkdir::WalkDir;
30
31use reactive_graph_config_api::ConfigManager;
32use reactive_graph_lifecycle::Lifecycle;
33use reactive_graph_plugin_api::HotDeployError;
34use reactive_graph_plugin_api::PLUGIN_NAME_PREFIX;
35use reactive_graph_plugin_api::PluginState;
36use reactive_graph_plugin_service_api::PluginContainerManager;
37use reactive_graph_plugin_service_api::PluginRepositoryManager;
38use reactive_graph_plugin_service_api::PluginResolver;
39
40use flate2::read::GzDecoder;
41use reactive_graph_plugin_service_api::get_deploy_folder;
42use reactive_graph_plugin_service_api::get_install_path;
43use reactive_graph_plugin_service_api::get_stem;
44use tar::Archive;
45use zip::ZipArchive;
46
47pub type HotDeployWatcher = RwLock<Option<RecommendedWatcher>>;
48
49enum ArchiveType {
50    TarGz,
51    Zip,
52}
53
54fn create_hot_deploy_watcher() -> HotDeployWatcher {
55    RwLock::new(None)
56}
57
58#[derive(Component)]
59pub struct PluginRepositoryManagerImpl {
60    plugin_container_manager: Arc<dyn PluginContainerManager + Send + Sync>,
61
62    plugin_resolver: Arc<dyn PluginResolver + Send + Sync>,
63
64    config_manager: Arc<dyn ConfigManager + Send + Sync>,
65
66    #[component(default = "create_hot_deploy_watcher")]
67    hot_deploy_watcher: HotDeployWatcher,
68}
69
70impl PluginRepositoryManagerImpl {
71    fn create_and_register_plugin_container(&self, path: PathBuf) -> Option<Uuid> {
72        if !is_dll(&path) {
73            return None;
74        }
75        if let Some(stem) = get_stem(&path) {
76            return self.plugin_container_manager.create_and_register_plugin_container(stem, path);
77        }
78        None
79    }
80
81    async fn create_hot_deploy_watcher(&self) {
82        let plugin_container_manager = self.plugin_container_manager.clone();
83        let plugin_resolver = self.plugin_resolver.clone();
84        let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(32);
85        tokio::spawn(async move {
86            trace!("Hot Deploy Watcher started");
87            while let Some(r) = rx.recv().await {
88                match r {
89                    Ok(event) => {
90                        if event.kind != Access(Close(Write)) {
91                            continue;
92                        }
93                        trace!("Hot Deploy Watcher: Detected file system activity: {event:?}");
94                        for path in event.paths.clone() {
95                            let Some(stem) = get_stem(&path) else {
96                                continue;
97                            };
98                            if let Some(archive_type) = is_archive(&path) {
99                                match archive_type {
100                                    ArchiveType::TarGz => {
101                                        let _ = extract_tar_gz(&path);
102                                    }
103                                    ArchiveType::Zip => {
104                                        let _ = extract_zip(&path);
105                                    }
106                                }
107                                continue;
108                            }
109                            if !is_dll(&path) {
110                                continue;
111                            }
112                            if plugin_container_manager.has(&stem) {
113                                // If plugin with the same stem is already installed, redeploy and start resolver
114                                if let Some(id) = plugin_container_manager.get_id(&stem) {
115                                    match plugin_container_manager.redeploy(&id) {
116                                        Ok(_) => {
117                                            plugin_resolver.resolve_until_idle().await;
118                                            // Start dependent plugins
119                                            while plugin_container_manager.start_dependent_with_satisfied_dependencies(&id) {
120                                                // Resolve until all dependent plugins are started
121                                                plugin_resolver.resolve_until_idle().await;
122                                            }
123                                            plugin_resolver.transition_to_fallback_states().await;
124                                        }
125                                        Err(e) => {
126                                            error!("Failed to redeploy plugin {} {}: {:?}", &stem, &id, e);
127                                        }
128                                    }
129                                }
130                            } else {
131                                // Deploy new plugins to the installation folder
132                                if let Ok(install_path) = deploy_plugin(path) {
133                                    // And register a new plugin container and start resolver
134                                    if let Some(id) = plugin_container_manager.create_and_register_plugin_container(stem, install_path) {
135                                        plugin_resolver.resolve_until_idle().await;
136                                        if plugin_container_manager.start(&id).is_ok() {
137                                            plugin_resolver.resolve_until_idle().await;
138                                            // Start dependent plugins
139                                            while plugin_container_manager.start_dependent_with_satisfied_dependencies(&id) {
140                                                // Resolve until all dependent plugins are started
141                                                plugin_resolver.resolve_until_idle().await;
142                                            }
143                                        }
144                                        plugin_resolver.transition_to_fallback_states().await;
145                                    }
146                                }
147                            }
148                        }
149                        for path in event.paths {
150                            let Some(stem) = get_stem(&path) else {
151                                continue;
152                            };
153                            if !is_dll(&path) {
154                                continue;
155                            }
156                            let Some(id) = plugin_container_manager.get_id(&stem) else {
157                                continue;
158                            };
159                            let name = plugin_container_manager.name(&id).unwrap_or_default().replace(PLUGIN_NAME_PREFIX, "");
160                            let version = plugin_container_manager.version(&id).unwrap_or(String::from("?.?.?"));
161                            // Warn about disabled plugins
162                            if let Some(state) = plugin_container_manager.get_plugin_state(&id) {
163                                if state == PluginState::Disabled {
164                                    info!("[DISABLED] {name} {version}");
165                                }
166                            }
167                            // Warn about unsatisfied dependencies
168                            for d in plugin_container_manager.get_unsatisfied_dependencies(&id) {
169                                trace!("Plugin {} {} has unsatisfied dependency: {}:{}", id, &name, d.name.replace(PLUGIN_NAME_PREFIX, ""), d.version);
170                                match plugin_container_manager.get_plugin_by_dependency(&d) {
171                                    Some(dependency_id) => {
172                                        let dependency_name = plugin_container_manager
173                                            .name(&dependency_id)
174                                            .unwrap_or_default()
175                                            .replace(PLUGIN_NAME_PREFIX, "");
176                                        let dependency_version = plugin_container_manager.version(&dependency_id).unwrap_or(String::from("?.?.?"));
177                                        let dependency_state = plugin_container_manager.get_plugin_state(&dependency_id).unwrap_or(PluginState::Uninstalled);
178                                        warn!(
179                                            "Plugin {} has unsatisfied dependency: {}:{} - which exists ({} {}) but has state {:?}",
180                                            &name,
181                                            d.name.replace(PLUGIN_NAME_PREFIX, ""),
182                                            d.version,
183                                            dependency_name,
184                                            dependency_version,
185                                            dependency_state
186                                        );
187                                    }
188                                    None => {
189                                        warn!(
190                                            "Plugin {} has unsatisfied dependency: {}:{} - which doesn't exist",
191                                            &name,
192                                            d.name.replace(PLUGIN_NAME_PREFIX, ""),
193                                            d.version
194                                        );
195                                    }
196                                }
197                            }
198                        }
199                    }
200                    Err(e) => {
201                        error!("Hot Deploy Watcher: Error: {e}");
202                    }
203                }
204            }
205            trace!("Hot Deploy Watcher: Finished");
206        });
207        let watcher = notify::recommended_watcher(move |r: notify::Result<Event>| {
208            let tx = tx.clone();
209            futures::executor::block_on(async {
210                match tx.send(r).await {
211                    Ok(_) => {}
212                    Err(e) => {
213                        trace!("SendError {e}");
214                    }
215                }
216            });
217        })
218        .ok();
219        let mut writer = self.hot_deploy_watcher.write().unwrap();
220        *writer = watcher;
221    }
222
223    fn destroy_hot_deploy_watcher(&self) {
224        let mut writer = self.hot_deploy_watcher.write().unwrap();
225        *writer = None;
226    }
227}
228
229#[async_trait]
230#[component_alias]
231impl PluginRepositoryManager for PluginRepositoryManagerImpl {
232    fn scan_deploy_repository(&self) {
233        let plugins_config = self.config_manager.get_plugins_config();
234        let Some(hot_deploy_location) = plugins_config.get_hot_deploy_location() else {
235            return;
236        };
237        trace!("Scanning plugin hot deploy folder {hot_deploy_location:?}");
238        let Ok(dir) = fs::read_dir(hot_deploy_location) else {
239            return;
240        };
241        for entry in dir.flatten() {
242            if let Ok(file_type) = entry.file_type() {
243                if !file_type.is_file() {
244                    continue;
245                }
246                let path = entry.path();
247                if let Some(archive_type) = is_archive(&path) {
248                    match archive_type {
249                        ArchiveType::TarGz => {
250                            let _ = extract_tar_gz(&path);
251                        }
252                        ArchiveType::Zip => {
253                            let _ = extract_zip(&path);
254                        }
255                    }
256                    continue;
257                }
258                let _ = deploy_plugin(path);
259            }
260        }
261    }
262
263    fn remove_duplicates(&self) {
264        let plugins_config = self.config_manager.get_plugins_config();
265        let Some(install_location) = plugins_config.get_install_location() else {
266            return;
267        };
268        let mut installed_plugins: HashMap<String, (u64, PathBuf)> = HashMap::new();
269        let mut plugins_to_remove: Vec<PathBuf> = Vec::new();
270
271        for entry in WalkDir::new(install_location)
272            .into_iter()
273            .filter_map(Result::ok)
274            .filter(|e| !e.file_type().is_dir())
275        {
276            let p = entry.path();
277            if let Some((stem, timestamp)) = p
278                .file_stem()
279                .and_then(|stem| {
280                    stem.to_string_lossy()
281                        .rsplit_once('.')
282                        .map(|(stem, timestamp)| (String::from(stem), String::from(timestamp)))
283                })
284                .and_then(|(stem, timestamp)| timestamp.parse::<u64>().ok().map(|timestamp| (stem, timestamp)))
285            {
286                // let timestamp = timestamp.parse::<u64>();
287                match installed_plugins.get_mut(&stem) {
288                    Some(entry) => {
289                        // (timestamp2, p2)
290                        if entry.0 < timestamp {
291                            plugins_to_remove.push(entry.1.clone());
292                            entry.0 = timestamp;
293                            entry.1 = PathBuf::from(p);
294                            // filenames.insert(stem, (timestamp, PathBuf::from(p)));
295                        } else {
296                            plugins_to_remove.push(PathBuf::from(p));
297                        }
298                    }
299                    None => {
300                        installed_plugins.insert(stem, (timestamp, PathBuf::from(p)));
301                    }
302                }
303            }
304        }
305        for plugin_to_remove in plugins_to_remove {
306            if fs::remove_file(&plugin_to_remove).is_ok() {
307                trace!("Removed duplicate plugin: {}", plugin_to_remove.display());
308            }
309        }
310    }
311
312    fn scan_plugin_repository(&self) {
313        let plugins_config = self.config_manager.get_plugins_config();
314        let Some(install_location) = plugins_config.get_install_location() else {
315            return;
316        };
317        trace!("Scanning plugin installation folder {install_location:?}");
318        let Ok(dir) = fs::read_dir(install_location) else {
319            return;
320        };
321        for entry in dir.flatten() {
322            if entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
323                self.create_and_register_plugin_container(entry.path());
324            }
325        }
326    }
327
328    fn watch_hot_deploy(&self) {
329        let Some(hot_deploy_location) = self.config_manager.get_plugins_config().get_hot_deploy_location() else {
330            return;
331        };
332        let mut writer = self.hot_deploy_watcher.write().unwrap();
333        if let Some(recommended_watcher) = writer.as_mut() {
334            // Add a path to be watched. All files and directories at that path and
335            // below will be monitored for changes.
336            match recommended_watcher.watch(&hot_deploy_location, RecursiveMode::NonRecursive) {
337                Ok(_) => {
338                    trace!("Watching hot deploy folder {hot_deploy_location:?}");
339                }
340                Err(e) => {
341                    error!("Failed to watch hot deploy folder {hot_deploy_location:?}: {e}");
342                }
343            }
344        }
345    }
346
347    fn unwatch_hot_deploy(&self) {
348        let Some(hot_deploy_location) = self.config_manager.get_plugins_config().get_hot_deploy_location() else {
349            return;
350        };
351        let mut writer = self.hot_deploy_watcher.write().unwrap();
352        if let Some(recommended_watcher) = writer.as_mut() {
353            let _ = recommended_watcher.unwatch(&hot_deploy_location);
354        }
355    }
356}
357
358#[async_trait]
359impl Lifecycle for PluginRepositoryManagerImpl {
360    async fn init(&self) {
361        // Initially, the deploy folder will be scanned. Detected plugins will be copied to the
362        // install folder before the install folder will be scanned. Eventually existing plugins
363        // will be overwritten by the version in the deploy folder.
364        self.scan_deploy_repository();
365
366        self.remove_duplicates();
367
368        // Initially, scans the plugin installation folder and creates and registers plugin
369        // containers for each plugin.
370        self.scan_plugin_repository();
371
372        // Create a deploy watcher.
373        self.create_hot_deploy_watcher().await;
374    }
375
376    async fn post_init(&self) {
377        // Initiates watching the hot deployment folder.
378        self.watch_hot_deploy();
379    }
380
381    async fn pre_shutdown(&self) {
382        self.unwatch_hot_deploy();
383    }
384
385    async fn shutdown(&self) {
386        self.destroy_hot_deploy_watcher();
387    }
388}
389
390fn is_dll(path: &Path) -> bool {
391    if let Some(extension) = path.extension().and_then(|e| e.to_str()) {
392        return extension == DLL_EXTENSION;
393    }
394    false
395}
396
397fn is_archive(path: &Path) -> Option<ArchiveType> {
398    path.file_name().and_then(OsStr::to_str).and_then(|file_name| {
399        if file_name.ends_with(".tar.gz") {
400            Some(ArchiveType::TarGz)
401        } else if file_name.ends_with(".zip") {
402            Some(ArchiveType::Zip)
403        } else {
404            None
405        }
406    })
407}
408
409fn extract_tar_gz(archive_path: &Path) -> Result<(), HotDeployError> {
410    let deploy_folder = get_deploy_folder(archive_path).ok_or(HotDeployError::ArchiveError)?;
411    let archive_file = File::open(archive_path).map_err(|_| HotDeployError::ArchiveError)?;
412    let decoder = GzDecoder::new(archive_file);
413    let mut archive = Archive::new(decoder);
414    archive.unpack(deploy_folder).map_err(|_| HotDeployError::ArchiveError)?;
415    fs::remove_file(archive_path).map_err(|_| HotDeployError::ArchiveError)?;
416    Ok(())
417}
418
419fn extract_zip(archive_path: &Path) -> Result<(), HotDeployError> {
420    let deploy_folder = get_deploy_folder(archive_path).ok_or(HotDeployError::ArchiveError)?;
421    let archive_file = File::open(archive_path).map_err(|_| HotDeployError::ArchiveError)?;
422    let mut zip_archive = ZipArchive::new(archive_file).map_err(|_| HotDeployError::ArchiveError)?;
423    for i in 0..zip_archive.len() {
424        let mut zip_file = zip_archive.by_index(i).map_err(|_| HotDeployError::ArchiveError)?;
425        let plugin_path = match zip_file.enclosed_name() {
426            Some(path) => Path::new(&deploy_folder).join(path),
427            None => continue,
428        };
429        if (*zip_file.name()).ends_with('/') {
430            create_dir_all(&plugin_path).map_err(|_| HotDeployError::ArchiveError)?;
431        } else {
432            if let Some(plugin_path_parent) = plugin_path.parent() {
433                if !plugin_path_parent.exists() {
434                    create_dir_all(plugin_path_parent).map_err(|_| HotDeployError::ArchiveError)?;
435                }
436            }
437            let mut outfile = File::create(&plugin_path).map_err(|_| HotDeployError::ArchiveError)?;
438            std::io::copy(&mut zip_file, &mut outfile).map_err(|_| HotDeployError::ArchiveError)?;
439        }
440    }
441    fs::remove_file(archive_path).map_err(|_| HotDeployError::ArchiveError)?;
442    Ok(())
443}
444
445fn deploy_plugin(deploy_path: PathBuf) -> Result<PathBuf, HotDeployError> {
446    debug!("Detected new plugin {}", deploy_path.display());
447    if !is_dll(&deploy_path) {
448        return Err(HotDeployError::NoDynamicLinkLibrary);
449    }
450    let Some(install_path) = get_install_path(&deploy_path) else {
451        return Err(HotDeployError::InvalidInstallPath);
452    };
453    let install_path = match fs::copy(&deploy_path, &install_path) {
454        Ok(_) => {
455            debug!("Copied plugin from {} to {}", deploy_path.display(), &install_path.display());
456            Ok(install_path)
457        }
458        Err(e) => {
459            error!(
460                "Failed to deploy plugin: Failed to copy plugin from {} to {}: {:?}",
461                deploy_path.display(),
462                install_path.display(),
463                e
464            );
465            Err(HotDeployError::MoveError)
466        }
467    }?;
468    match fs::remove_file(&deploy_path) {
469        Ok(_) => {
470            debug!("Removed plugin from {}", deploy_path.display());
471            Ok(install_path)
472        }
473        Err(e) => {
474            error!("Failed to deploy plugin: Failed to remove plugin from {}: {:?}", deploy_path.display(), e);
475            Err(HotDeployError::MoveError)
476        }
477    }
478}