reactive_graph_plugin_service_impl/
plugin_repository_manager_impl.rs1use std::collections::HashMap;
2use std::env::consts::DLL_EXTENSION;
3use std::fs;
4use std::path::Path;
5use std::path::PathBuf;
6use std::sync::Arc;
7use std::sync::RwLock;
8
9use async_trait::async_trait;
10use log::debug;
11use log::error;
12use log::info;
13use log::trace;
14use log::warn;
15use notify::Event;
16use notify::EventKind::Access;
17use notify::RecommendedWatcher;
18use notify::RecursiveMode;
19use notify::Watcher;
20use notify::event::AccessKind::Close;
21use notify::event::AccessMode::Write;
22use springtime_di::Component;
23use springtime_di::component_alias;
24use tokio::sync::mpsc;
25use uuid::Uuid;
26use walkdir::WalkDir;
27
28use reactive_graph_config_api::ConfigManager;
29use reactive_graph_lifecycle::Lifecycle;
30use reactive_graph_plugin_api::HotDeployError;
31use reactive_graph_plugin_api::PLUGIN_NAME_PREFIX;
32use reactive_graph_plugin_api::PluginState;
33use reactive_graph_plugin_service_api::PluginContainerManager;
34use reactive_graph_plugin_service_api::PluginRepositoryManager;
35use reactive_graph_plugin_service_api::PluginResolver;
36
37use crate::plugin_paths::get_install_path;
38use crate::plugin_paths::get_stem;
39
40pub type HotDeployWatcher = RwLock<Option<RecommendedWatcher>>;
41
42fn create_hot_deploy_watcher() -> HotDeployWatcher {
43 RwLock::new(None)
44}
45
46#[derive(Component)]
47pub struct PluginRepositoryManagerImpl {
48 plugin_container_manager: Arc<dyn PluginContainerManager + Send + Sync>,
49
50 plugin_resolver: Arc<dyn PluginResolver + Send + Sync>,
51
52 config_manager: Arc<dyn ConfigManager + Send + Sync>,
53
54 #[component(default = "create_hot_deploy_watcher")]
55 hot_deploy_watcher: HotDeployWatcher,
56}
57
58impl PluginRepositoryManagerImpl {
59 fn create_and_register_plugin_container(&self, path: PathBuf) -> Option<Uuid> {
60 if !is_dll(&path) {
61 return None;
62 }
63 if let Some(stem) = get_stem(&path) {
64 return self.plugin_container_manager.create_and_register_plugin_container(stem, path);
65 }
66 None
67 }
68
69 async fn create_hot_deploy_watcher(&self) {
70 let plugin_container_manager = self.plugin_container_manager.clone();
71 let plugin_resolver = self.plugin_resolver.clone();
72 let (tx, mut rx) = mpsc::channel::<notify::Result<Event>>(32);
73 tokio::spawn(async move {
74 trace!("Hot Deploy Watcher started");
75 while let Some(r) = rx.recv().await {
76 match r {
77 Ok(event) => {
78 if event.kind != Access(Close(Write)) {
79 continue;
80 }
81 trace!("Hot Deploy Watcher: Detected file system activity: {event:?}");
82 for path in event.paths.clone() {
83 let Some(stem) = get_stem(&path) else {
84 continue;
85 };
86 if !is_dll(&path) {
87 continue;
88 }
89 if plugin_container_manager.has(&stem) {
90 if let Some(id) = plugin_container_manager.get_id(&stem) {
92 match plugin_container_manager.redeploy(&id) {
93 Ok(_) => {
94 plugin_resolver.resolve_until_idle().await;
95 while plugin_container_manager.start_dependent_with_satisfied_dependencies(&id) {
97 plugin_resolver.resolve_until_idle().await;
99 }
100 plugin_resolver.transition_to_fallback_states().await;
101 }
102 Err(e) => {
103 error!("Failed to redeploy plugin {} {}: {:?}", &stem, &id, e);
104 }
105 }
106 }
107 } else {
108 if let Ok(install_path) = deploy_plugin(path) {
110 if let Some(id) = plugin_container_manager.create_and_register_plugin_container(stem, install_path) {
112 plugin_resolver.resolve_until_idle().await;
113 if plugin_container_manager.start(&id).is_ok() {
114 plugin_resolver.resolve_until_idle().await;
115 while plugin_container_manager.start_dependent_with_satisfied_dependencies(&id) {
117 plugin_resolver.resolve_until_idle().await;
119 }
120 }
121 plugin_resolver.transition_to_fallback_states().await;
122 }
123 }
124 }
125 }
126 for path in event.paths {
127 let Some(stem) = get_stem(&path) else {
128 continue;
129 };
130 if !is_dll(&path) {
131 continue;
132 }
133 let Some(id) = plugin_container_manager.get_id(&stem) else {
134 continue;
135 };
136 let name = plugin_container_manager.name(&id).unwrap_or_default().replace(PLUGIN_NAME_PREFIX, "");
137 let version = plugin_container_manager.version(&id).unwrap_or(String::from("?.?.?"));
138 if let Some(state) = plugin_container_manager.get_plugin_state(&id) {
140 if state == PluginState::Disabled {
141 info!("[DISABLED] {name} {version}");
142 }
143 }
144 for d in plugin_container_manager.get_unsatisfied_dependencies(&id) {
146 trace!("Plugin {} {} has unsatisfied dependency: {}:{}", id, &name, d.name.replace(PLUGIN_NAME_PREFIX, ""), d.version);
147 match plugin_container_manager.get_plugin_by_dependency(&d) {
148 Some(dependency_id) => {
149 let dependency_name = plugin_container_manager
150 .name(&dependency_id)
151 .unwrap_or_default()
152 .replace(PLUGIN_NAME_PREFIX, "");
153 let dependency_version = plugin_container_manager.version(&dependency_id).unwrap_or(String::from("?.?.?"));
154 let dependency_state = plugin_container_manager.get_plugin_state(&dependency_id).unwrap_or(PluginState::Uninstalled);
155 warn!(
156 "Plugin {} has unsatisfied dependency: {}:{} - which exists ({} {}) but has state {:?}",
157 &name,
158 d.name.replace(PLUGIN_NAME_PREFIX, ""),
159 d.version,
160 dependency_name,
161 dependency_version,
162 dependency_state
163 );
164 }
165 None => {
166 warn!(
167 "Plugin {} has unsatisfied dependency: {}:{} - which doesn't exist",
168 &name,
169 d.name.replace(PLUGIN_NAME_PREFIX, ""),
170 d.version
171 );
172 }
173 }
174 }
175 }
176 }
177 Err(e) => {
178 error!("Hot Deploy Watcher: Error: {e}");
179 }
180 }
181 }
182 trace!("Hot Deploy Watcher: Finished");
183 });
184 let watcher = notify::recommended_watcher(move |r: notify::Result<Event>| {
185 let tx = tx.clone();
186 futures::executor::block_on(async {
187 match tx.send(r).await {
188 Ok(_) => {}
189 Err(e) => {
190 trace!("SendError {e}");
191 }
192 }
193 });
194 })
195 .ok();
196 let mut writer = self.hot_deploy_watcher.write().unwrap();
197 *writer = watcher;
198 }
199
200 fn destroy_hot_deploy_watcher(&self) {
201 let mut writer = self.hot_deploy_watcher.write().unwrap();
202 *writer = None;
203 }
204}
205
206#[async_trait]
207#[component_alias]
208impl PluginRepositoryManager for PluginRepositoryManagerImpl {
209 fn scan_deploy_repository(&self) {
210 let plugins_config = self.config_manager.get_plugins_config();
211 let Some(hot_deploy_location) = plugins_config.get_hot_deploy_location() else {
212 return;
213 };
214 trace!("Scanning plugin hot deploy folder {hot_deploy_location:?}");
215 let Ok(dir) = fs::read_dir(hot_deploy_location) else {
216 return;
217 };
218 for entry in dir.flatten() {
219 if let Ok(file_type) = entry.file_type() {
220 if !file_type.is_file() {
221 continue;
222 }
223 let _ = deploy_plugin(entry.path());
224 }
225 }
226 }
227
228 fn remove_duplicates(&self) {
229 let plugins_config = self.config_manager.get_plugins_config();
230 let Some(install_location) = plugins_config.get_install_location() else {
231 return;
232 };
233 let mut installed_plugins: HashMap<String, (u64, PathBuf)> = HashMap::new();
234 let mut plugins_to_remove: Vec<PathBuf> = Vec::new();
235
236 for entry in WalkDir::new(install_location)
237 .into_iter()
238 .filter_map(Result::ok)
239 .filter(|e| !e.file_type().is_dir())
240 {
241 let p = entry.path();
242 if let Some((stem, timestamp)) = p
243 .file_stem()
244 .and_then(|stem| {
245 stem.to_string_lossy()
246 .rsplit_once('.')
247 .map(|(stem, timestamp)| (String::from(stem), String::from(timestamp)))
248 })
249 .and_then(|(stem, timestamp)| timestamp.parse::<u64>().ok().map(|timestamp| (stem, timestamp)))
250 {
251 match installed_plugins.get_mut(&stem) {
253 Some(entry) => {
254 if entry.0 < timestamp {
256 plugins_to_remove.push(entry.1.clone());
257 entry.0 = timestamp;
258 entry.1 = PathBuf::from(p);
259 } else {
261 plugins_to_remove.push(PathBuf::from(p));
262 }
263 }
264 None => {
265 installed_plugins.insert(stem, (timestamp, PathBuf::from(p)));
266 }
267 }
268 }
269 }
270 for plugin_to_remove in plugins_to_remove {
271 if fs::remove_file(&plugin_to_remove).is_ok() {
272 trace!("Removed duplicate plugin: {}", plugin_to_remove.display());
273 }
274 }
275 }
276
277 fn scan_plugin_repository(&self) {
278 let plugins_config = self.config_manager.get_plugins_config();
279 let Some(install_location) = plugins_config.get_install_location() else {
280 return;
281 };
282 trace!("Scanning plugin installation folder {install_location:?}");
283 let Ok(dir) = fs::read_dir(install_location) else {
284 return;
285 };
286 for entry in dir.flatten() {
287 if entry.file_type().map(|f| f.is_file()).unwrap_or(false) {
288 self.create_and_register_plugin_container(entry.path());
289 }
290 }
291 }
292
293 fn watch_hot_deploy(&self) {
294 let Some(hot_deploy_location) = self.config_manager.get_plugins_config().get_hot_deploy_location() else {
295 return;
296 };
297 let mut writer = self.hot_deploy_watcher.write().unwrap();
298 if let Some(recommended_watcher) = writer.as_mut() {
299 match recommended_watcher.watch(&hot_deploy_location, RecursiveMode::NonRecursive) {
302 Ok(_) => {
303 trace!("Watching hot deploy folder {hot_deploy_location:?}");
304 }
305 Err(e) => {
306 error!("Failed to watch hot deploy folder {hot_deploy_location:?}: {e}");
307 }
308 }
309 }
310 }
311
312 fn unwatch_hot_deploy(&self) {
313 let Some(hot_deploy_location) = self.config_manager.get_plugins_config().get_hot_deploy_location() else {
314 return;
315 };
316 let mut writer = self.hot_deploy_watcher.write().unwrap();
317 if let Some(recommended_watcher) = writer.as_mut() {
318 let _ = recommended_watcher.unwatch(&hot_deploy_location);
319 }
320 }
321}
322
323#[async_trait]
324impl Lifecycle for PluginRepositoryManagerImpl {
325 async fn init(&self) {
326 self.scan_deploy_repository();
330
331 self.remove_duplicates();
332
333 self.scan_plugin_repository();
336
337 self.create_hot_deploy_watcher().await;
339 }
340
341 async fn post_init(&self) {
342 self.watch_hot_deploy();
344 }
345
346 async fn pre_shutdown(&self) {
347 self.unwatch_hot_deploy();
348 }
349
350 async fn shutdown(&self) {
351 self.destroy_hot_deploy_watcher();
352 }
353}
354
355fn is_dll(path: &Path) -> bool {
356 if let Some(extension) = path.extension().and_then(|e| e.to_str()) {
357 return extension == DLL_EXTENSION;
358 }
359 false
360}
361
362fn deploy_plugin(deploy_path: PathBuf) -> Result<PathBuf, HotDeployError> {
363 debug!("Detected new plugin {}", deploy_path.display());
364 if !is_dll(&deploy_path) {
365 return Err(HotDeployError::NoDynamicLinkLibrary);
366 }
367 let Some(install_path) = get_install_path(&deploy_path) else {
368 return Err(HotDeployError::InvalidInstallPath);
369 };
370 match fs::rename(&deploy_path, &install_path) {
371 Ok(_) => {
372 debug!("Moved plugin from {} to {}", deploy_path.display(), install_path.display());
373 Ok(install_path)
374 }
375 Err(_) => {
376 error!("Failed to moved plugin from {} to {}", deploy_path.display(), install_path.display());
377 Err(HotDeployError::MoveError)
378 }
379 }
380}