reactive_graph_plugin_service_impl/
plugin_repository_manager_impl.rs1use std::collections::HashMap;
2use std::env::consts::DLL_EXTENSION;
3use std::ffi::OsStr;
4use std::fs;
5use std::fs::File;
6use std::fs::create_dir_all;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::Arc;
10use std::sync::RwLock;
11
12use async_trait::async_trait;
13use log::debug;
14use log::error;
15use log::info;
16use log::trace;
17use log::warn;
18use notify::Event;
19use notify::EventKind::Access;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher;
23use notify::event::AccessKind::Close;
24use notify::event::AccessMode::Write;
25use springtime_di::Component;
26use springtime_di::component_alias;
27use tokio::sync::mpsc;
28use uuid::Uuid;
29use walkdir::WalkDir;
30
31use reactive_graph_config_api::ConfigManager;
32use reactive_graph_lifecycle::Lifecycle;
33use reactive_graph_plugin_api::HotDeployError;
34use reactive_graph_plugin_api::PLUGIN_NAME_PREFIX;
35use reactive_graph_plugin_api::PluginState;
36use reactive_graph_plugin_service_api::PluginContainerManager;
37use reactive_graph_plugin_service_api::PluginRepositoryManager;
38use reactive_graph_plugin_service_api::PluginResolver;
39
40use flate2::read::GzDecoder;
41use reactive_graph_plugin_service_api::get_deploy_folder;
42use reactive_graph_plugin_service_api::get_install_path;
43use reactive_graph_plugin_service_api::get_stem;
44use tar::Archive;
45use zip::ZipArchive;
46
47pub type HotDeployWatcher = RwLock<Option<RecommendedWatcher>>;
48
49enum ArchiveType {
50 TarGz,
51 Zip,
52}
53
54fn create_hot_deploy_watcher() -> HotDeployWatcher {
55 RwLock::new(None)
56}
57
58#[derive(Component)]
59pub struct PluginRepositoryManagerImpl {
60 plugin_container_manager: Arc<dyn PluginContainerManager + Send + Sync>,
61
62 plugin_resolver: Arc<dyn PluginResolver + Send + Sync>,
63
64 config_manager: Arc<dyn ConfigManager + Send + Sync>,
65
66 #[component(default = "create_hot_deploy_watcher")]
67 hot_deploy_watcher: HotDeployWatcher,
68}
69
70impl PluginRepositoryManagerImpl {
71 fn create_and_register_plugin_container(&self, path: PathBuf) -> Option<Uuid> {
72 if !is_dll(&path) {
73 return None;
74 }
75 if let Some(stem) = get_stem(&path) {
76 return self.plugin_container_manager.create_and_register_plugin_container(stem, path);
77 }
78 None
79 }
80
81 async fn create_hot_deploy_watcher(&self) {
82 let plugin_container_manager = self.plugin_container_manager.clone();
83 let plugin_resolver = self.plugin_resolver.clone();
84 let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(32);
85 tokio::spawn(async move {
86 trace!("Hot Deploy Watcher started");
87 while let Some(r) = rx.recv().await {
88 match r {
89 Ok(event) => {
90 if event.kind != Access(Close(Write)) {
91 continue;
92 }
93 trace!("Hot Deploy Watcher: Detected file system activity: {event:?}");
94 for path in event.paths.clone() {
95 let Some(stem) = get_stem(&path) else {
96 continue;
97 };
98 if let Some(archive_type) = is_archive(&path) {
99 match archive_type {
100 ArchiveType::TarGz => {
101 let _ = extract_tar_gz(&path);
102 }
103 ArchiveType::Zip => {
104 let _ = extract_zip(&path);
105 }
106 }
107 continue;
108 }
109 if !is_dll(&path) {
110 continue;
111 }
112 if plugin_container_manager.has(&stem) {
113 if let Some(id) = plugin_container_manager.get_id(&stem) {
115 match plugin_container_manager.redeploy(&id) {
116 Ok(_) => {
117 plugin_resolver.resolve_until_idle().await;
118 while plugin_container_manager.start_dependent_with_satisfied_dependencies(&id) {
120 plugin_resolver.resolve_until_idle().await;
122 }
123 plugin_resolver.transition_to_fallback_states().await;
124 }
125 Err(e) => {
126 error!("Failed to redeploy plugin {} {}: {:?}", &stem, &id, e);
127 }
128 }
129 }
130 } else {
131 if let Ok(install_path) = deploy_plugin(path) {
133 if let Some(id) = plugin_container_manager.create_and_register_plugin_container(stem, install_path) {
135 plugin_resolver.resolve_until_idle().await;
136 if plugin_container_manager.start(&id).is_ok() {
137 plugin_resolver.resolve_until_idle().await;
138 while plugin_container_manager.start_dependent_with_satisfied_dependencies(&id) {
140 plugin_resolver.resolve_until_idle().await;
142 }
143 }
144 plugin_resolver.transition_to_fallback_states().await;
145 }
146 }
147 }
148 }
149 for path in event.paths {
150 let Some(stem) = get_stem(&path) else {
151 continue;
152 };
153 if !is_dll(&path) {
154 continue;
155 }
156 let Some(id) = plugin_container_manager.get_id(&stem) else {
157 continue;
158 };
159 let name = plugin_container_manager.name(&id).unwrap_or_default().replace(PLUGIN_NAME_PREFIX, "");
160 let version = plugin_container_manager.version(&id).unwrap_or(String::from("?.?.?"));
161 if let Some(state) = plugin_container_manager.get_plugin_state(&id) {
163 if state == PluginState::Disabled {
164 info!("[DISABLED] {name} {version}");
165 }
166 }
167 for d in plugin_container_manager.get_unsatisfied_dependencies(&id) {
169 trace!("Plugin {} {} has unsatisfied dependency: {}:{}", id, &name, d.name.replace(PLUGIN_NAME_PREFIX, ""), d.version);
170 match plugin_container_manager.get_plugin_by_dependency(&d) {
171 Some(dependency_id) => {
172 let dependency_name = plugin_container_manager
173 .name(&dependency_id)
174 .unwrap_or_default()
175 .replace(PLUGIN_NAME_PREFIX, "");
176 let dependency_version = plugin_container_manager.version(&dependency_id).unwrap_or(String::from("?.?.?"));
177 let dependency_state = plugin_container_manager.get_plugin_state(&dependency_id).unwrap_or(PluginState::Uninstalled);
178 warn!(
179 "Plugin {} has unsatisfied dependency: {}:{} - which exists ({} {}) but has state {:?}",
180 &name,
181 d.name.replace(PLUGIN_NAME_PREFIX, ""),
182 d.version,
183 dependency_name,
184 dependency_version,
185 dependency_state
186 );
187 }
188 None => {
189 warn!(
190 "Plugin {} has unsatisfied dependency: {}:{} - which doesn't exist",
191 &name,
192 d.name.replace(PLUGIN_NAME_PREFIX, ""),
193 d.version
194 );
195 }
196 }
197 }
198 }
199 }
200 Err(e) => {
201 error!("Hot Deploy Watcher: Error: {e}");
202 }
203 }
204 }
205 trace!("Hot Deploy Watcher: Finished");
206 });
207 let watcher = notify::recommended_watcher(move |r: notify::Result<Event>| {
208 let tx = tx.clone();
209 futures::executor::block_on(async {
210 match tx.send(r).await {
211 Ok(_) => {}
212 Err(e) => {
213 trace!("SendError {e}");
214 }
215 }
216 });
217 })
218 .ok();
219 let mut writer = self.hot_deploy_watcher.write().unwrap();
220 *writer = watcher;
221 }
222
223 fn destroy_hot_deploy_watcher(&self) {
224 let mut writer = self.hot_deploy_watcher.write().unwrap();
225 *writer = None;
226 }
227}
228
229#[async_trait]
230#[component_alias]
231impl PluginRepositoryManager for PluginRepositoryManagerImpl {
232 fn scan_deploy_repository(&self) {
233 let plugins_config = self.config_manager.get_plugins_config();
234 let Some(hot_deploy_location) = plugins_config.get_hot_deploy_location() else {
235 return;
236 };
237 trace!("Scanning plugin hot deploy folder {hot_deploy_location:?}");
238 let Ok(dir) = fs::read_dir(hot_deploy_location) else {
239 return;
240 };
241 for entry in dir.flatten() {
242 if let Ok(file_type) = entry.file_type() {
243 if !file_type.is_file() {
244 continue;
245 }
246 let path = entry.path();
247 if let Some(archive_type) = is_archive(&path) {
248 match archive_type {
249 ArchiveType::TarGz => {
250 let _ = extract_tar_gz(&path);
251 }
252 ArchiveType::Zip => {
253 let _ = extract_zip(&path);
254 }
255 }
256 continue;
257 }
258 let _ = deploy_plugin(path);
259 }
260 }
261 }
262
263 fn remove_duplicates(&self) {
264 let plugins_config = self.config_manager.get_plugins_config();
265 let Some(install_location) = plugins_config.get_install_location() else {
266 return;
267 };
268 let mut installed_plugins: HashMap<String, (u64, PathBuf)> = HashMap::new();
269 let mut plugins_to_remove: Vec<PathBuf> = Vec::new();
270
271 for entry in WalkDir::new(install_location)
272 .into_iter()
273 .filter_map(Result::ok)
274 .filter(|e| !e.file_type().is_dir())
275 {
276 let p = entry.path();
277 if let Some((stem, timestamp)) = p
278 .file_stem()
279 .and_then(|stem| {
280 stem.to_string_lossy()
281 .rsplit_once('.')
282 .map(|(stem, timestamp)| (String::from(stem), String::from(timestamp)))
283 })
284 .and_then(|(stem, timestamp)| timestamp.parse::<u64>().ok().map(|timestamp| (stem, timestamp)))
285 {
286 match installed_plugins.get_mut(&stem) {
288 Some(entry) => {
289 if entry.0 < timestamp {
291 plugins_to_remove.push(entry.1.clone());
292 entry.0 = timestamp;
293 entry.1 = PathBuf::from(p);
294 } else {
296 plugins_to_remove.push(PathBuf::from(p));
297 }
298 }
299 None => {
300 installed_plugins.insert(stem, (timestamp, PathBuf::from(p)));
301 }
302 }
303 }
304 }
305 for plugin_to_remove in plugins_to_remove {
306 if fs::remove_file(&plugin_to_remove).is_ok() {
307 trace!("Removed duplicate plugin: {}", plugin_to_remove.display());
308 }
309 }
310 }
311
312 fn scan_plugin_repository(&self) {
313 let plugins_config = self.config_manager.get_plugins_config();
314 let Some(install_location) = plugins_config.get_install_location() else {
315 return;
316 };
317 trace!("Scanning plugin installation folder {install_location:?}");
318 let Ok(dir) = fs::read_dir(install_location) else {
319 return;
320 };
321 for entry in dir.flatten() {
322 if entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
323 self.create_and_register_plugin_container(entry.path());
324 }
325 }
326 }
327
328 fn watch_hot_deploy(&self) {
329 let Some(hot_deploy_location) = self.config_manager.get_plugins_config().get_hot_deploy_location() else {
330 return;
331 };
332 let mut writer = self.hot_deploy_watcher.write().unwrap();
333 if let Some(recommended_watcher) = writer.as_mut() {
334 match recommended_watcher.watch(&hot_deploy_location, RecursiveMode::NonRecursive) {
337 Ok(_) => {
338 trace!("Watching hot deploy folder {hot_deploy_location:?}");
339 }
340 Err(e) => {
341 error!("Failed to watch hot deploy folder {hot_deploy_location:?}: {e}");
342 }
343 }
344 }
345 }
346
347 fn unwatch_hot_deploy(&self) {
348 let Some(hot_deploy_location) = self.config_manager.get_plugins_config().get_hot_deploy_location() else {
349 return;
350 };
351 let mut writer = self.hot_deploy_watcher.write().unwrap();
352 if let Some(recommended_watcher) = writer.as_mut() {
353 let _ = recommended_watcher.unwatch(&hot_deploy_location);
354 }
355 }
356}
357
358#[async_trait]
359impl Lifecycle for PluginRepositoryManagerImpl {
360 async fn init(&self) {
361 self.scan_deploy_repository();
365
366 self.remove_duplicates();
367
368 self.scan_plugin_repository();
371
372 self.create_hot_deploy_watcher().await;
374 }
375
376 async fn post_init(&self) {
377 self.watch_hot_deploy();
379 }
380
381 async fn pre_shutdown(&self) {
382 self.unwatch_hot_deploy();
383 }
384
385 async fn shutdown(&self) {
386 self.destroy_hot_deploy_watcher();
387 }
388}
389
390fn is_dll(path: &Path) -> bool {
391 if let Some(extension) = path.extension().and_then(|e| e.to_str()) {
392 return extension == DLL_EXTENSION;
393 }
394 false
395}
396
397fn is_archive(path: &Path) -> Option<ArchiveType> {
398 path.file_name().and_then(OsStr::to_str).and_then(|file_name| {
399 if file_name.ends_with(".tar.gz") {
400 Some(ArchiveType::TarGz)
401 } else if file_name.ends_with(".zip") {
402 Some(ArchiveType::Zip)
403 } else {
404 None
405 }
406 })
407}
408
409fn extract_tar_gz(archive_path: &Path) -> Result<(), HotDeployError> {
410 let deploy_folder = get_deploy_folder(archive_path).ok_or(HotDeployError::ArchiveError)?;
411 let archive_file = File::open(archive_path).map_err(|_| HotDeployError::ArchiveError)?;
412 let decoder = GzDecoder::new(archive_file);
413 let mut archive = Archive::new(decoder);
414 archive.unpack(deploy_folder).map_err(|_| HotDeployError::ArchiveError)?;
415 fs::remove_file(archive_path).map_err(|_| HotDeployError::ArchiveError)?;
416 Ok(())
417}
418
419fn extract_zip(archive_path: &Path) -> Result<(), HotDeployError> {
420 let deploy_folder = get_deploy_folder(archive_path).ok_or(HotDeployError::ArchiveError)?;
421 let archive_file = File::open(archive_path).map_err(|_| HotDeployError::ArchiveError)?;
422 let mut zip_archive = ZipArchive::new(archive_file).map_err(|_| HotDeployError::ArchiveError)?;
423 for i in 0..zip_archive.len() {
424 let mut zip_file = zip_archive.by_index(i).map_err(|_| HotDeployError::ArchiveError)?;
425 let plugin_path = match zip_file.enclosed_name() {
426 Some(path) => Path::new(&deploy_folder).join(path),
427 None => continue,
428 };
429 if (*zip_file.name()).ends_with('/') {
430 create_dir_all(&plugin_path).map_err(|_| HotDeployError::ArchiveError)?;
431 } else {
432 if let Some(plugin_path_parent) = plugin_path.parent() {
433 if !plugin_path_parent.exists() {
434 create_dir_all(plugin_path_parent).map_err(|_| HotDeployError::ArchiveError)?;
435 }
436 }
437 let mut outfile = File::create(&plugin_path).map_err(|_| HotDeployError::ArchiveError)?;
438 std::io::copy(&mut zip_file, &mut outfile).map_err(|_| HotDeployError::ArchiveError)?;
439 }
440 }
441 fs::remove_file(archive_path).map_err(|_| HotDeployError::ArchiveError)?;
442 Ok(())
443}
444
445fn deploy_plugin(deploy_path: PathBuf) -> Result<PathBuf, HotDeployError> {
446 debug!("Detected new plugin {}", deploy_path.display());
447 if !is_dll(&deploy_path) {
448 return Err(HotDeployError::NoDynamicLinkLibrary);
449 }
450 let Some(install_path) = get_install_path(&deploy_path) else {
451 return Err(HotDeployError::InvalidInstallPath);
452 };
453 let install_path = match fs::copy(&deploy_path, &install_path) {
454 Ok(_) => {
455 debug!("Copied plugin from {} to {}", deploy_path.display(), &install_path.display());
456 Ok(install_path)
457 }
458 Err(e) => {
459 error!(
460 "Failed to deploy plugin: Failed to copy plugin from {} to {}: {:?}",
461 deploy_path.display(),
462 install_path.display(),
463 e
464 );
465 Err(HotDeployError::MoveError)
466 }
467 }?;
468 match fs::remove_file(&deploy_path) {
469 Ok(_) => {
470 debug!("Removed plugin from {}", deploy_path.display());
471 Ok(install_path)
472 }
473 Err(e) => {
474 error!("Failed to deploy plugin: Failed to remove plugin from {}: {:?}", deploy_path.display(), e);
475 Err(HotDeployError::MoveError)
476 }
477 }
478}