reactive_graph_plugin_service_impl/
container.rs1use std::fs;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::RwLock;
5
6use dashmap::DashSet;
7use libloading::Library;
8use log::debug;
9use log::error;
10use log::info;
11use log::trace;
12use springtime_di::instance_provider::ComponentInstanceProviderError;
13use uuid::Uuid;
14
15use crate::plugin_paths::get_deploy_path;
16use crate::plugin_paths::get_install_path;
17use reactive_graph_plugin_api::PLUGIN_API_VERSION;
18use reactive_graph_plugin_api::PLUGIN_NAME_PREFIX;
19use reactive_graph_plugin_api::Plugin;
20use reactive_graph_plugin_api::PluginContext;
21use reactive_graph_plugin_api::PluginDeclaration;
22use reactive_graph_plugin_api::PluginDependency;
23use reactive_graph_plugin_api::PluginDeployError;
24use reactive_graph_plugin_api::PluginDisableError;
25use reactive_graph_plugin_api::PluginLoadingError;
26use reactive_graph_plugin_api::PluginRefreshingState;
27use reactive_graph_plugin_api::PluginResolveState;
28use reactive_graph_plugin_api::PluginStartError;
29use reactive_graph_plugin_api::PluginStartingState;
30use reactive_graph_plugin_api::PluginState;
31use reactive_graph_plugin_api::PluginStopError;
32use reactive_graph_plugin_api::PluginStoppingState;
33use reactive_graph_plugin_api::PluginUninstallError;
34use reactive_graph_plugin_api::PluginUninstallingState;
35use reactive_graph_plugin_api::RUSTC_VERSION;
36use reactive_graph_plugin_service_api::PluginTransitionResult;
37use reactive_graph_plugin_service_api::PluginTransitionResult::Changed;
38use reactive_graph_plugin_service_api::PluginTransitionResult::NoChange;
39
40use crate::PluginProxy;
41use crate::PluginRegistrar;
42
43pub struct PluginContainer {
45 pub id: Uuid,
47
48 pub stem: String,
51
52 pub path: PathBuf,
55
56 pub state: PluginState,
58
59 pub plugin_declaration: RwLock<Option<PluginDeclaration>>,
61
62 pub proxy: Arc<RwLock<Option<Arc<PluginProxy>>>>,
64
65 pub library: RwLock<Option<Arc<Library>>>,
67
68 pub dependencies: DashSet<PluginDependency>,
70}
71
72impl PluginContainer {
73 pub fn new(stem: String, path: PathBuf) -> Self {
74 PluginContainer {
75 id: Uuid::new_v4(),
76 stem,
77 path,
78 state: PluginState::Installed,
79 plugin_declaration: RwLock::new(None),
80 proxy: Arc::new(RwLock::new(None)),
81 library: RwLock::new(None),
82 dependencies: DashSet::new(),
83 }
84 }
85
86 pub fn deploy_dll(&mut self) -> PluginTransitionResult {
88 if self.state != PluginState::Refreshing(PluginRefreshingState::Deploying) {
89 return NoChange;
90 }
91 let Some(deploy_path) = get_deploy_path(&self.path) else {
92 return NoChange;
93 };
94 if !deploy_path.exists() {
95 error!("Failed to deploy dynamic link library: {} does not exist", deploy_path.display());
96 self.state = PluginState::Uninstalled;
97 return Changed;
98 }
99 let Some(install_path) = get_install_path(&self.path) else {
100 return NoChange;
101 };
102 match fs::copy(&deploy_path, &install_path) {
103 Ok(_) => {
104 trace!("Copied plugin from {} to {}", deploy_path.display(), &install_path.display());
105 match fs::remove_file(&deploy_path) {
106 Ok(_) => {
107 trace!("Removed plugin from {}", deploy_path.display());
108 self.path = install_path.clone();
109 self.state = PluginState::Refreshing(PluginRefreshingState::Installed);
110 debug!("Plugin {} successfully deployed from {} to {}", self.id, deploy_path.display(), install_path.display());
111 Changed
112 }
113 Err(e) => {
114 error!("Failed to deploy plugin {}: Failed to remove plugin from {}: {:?}", self.id, deploy_path.display(), e);
115 self.state = PluginState::Uninstalled;
116 Changed
117 }
118 }
119 }
120 Err(e) => {
121 error!(
122 "Failed to deploy plugin {}: Failed to copy plugin from {} to {}: {:?}",
123 self.id,
124 deploy_path.display(),
125 install_path.display(),
126 e
127 );
128 self.state = PluginState::Uninstalled;
129 Changed
130 }
131 }
132 }
133
134 pub fn load_dll(&mut self) -> PluginTransitionResult {
136 if self.state != PluginState::Installed && self.state != PluginState::Refreshing(PluginRefreshingState::Installed) {
137 return NoChange;
138 }
139 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Installed);
140 unsafe {
141 match Library::new(self.path.as_os_str()) {
142 Ok(library) => {
143 let mut writer = self.library.write().unwrap();
144 *writer = Some(Arc::new(library));
145 if refreshing {
146 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded));
147 } else {
148 self.state = PluginState::Resolving(PluginResolveState::Loaded);
149 }
150 debug!("Plugin {} successfully loaded dynamic link library located at {}", self.id, self.path.display());
151 Changed
152 }
153 Err(e) => {
154 error!("Plugin {} failed to load dynamic link library located at {}: {:?}", self.id, self.path.display(), e);
155 self.state = PluginState::Uninstalling(PluginUninstallingState::UnloadDll);
156 Changed
157 }
158 }
159 }
160 }
161
162 pub fn unload_dll(&mut self) -> PluginTransitionResult {
164 if self.state != PluginState::Uninstalling(PluginUninstallingState::UnloadDll)
165 && self.state != PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll))
166 {
167 return NoChange;
168 }
169 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
170 {
171 let mut writer = self.plugin_declaration.write().unwrap();
172 *writer = None;
174 }
175 {
176 let mut writer = self.library.write().unwrap();
177 *writer = None;
179 debug!("Plugin {} unloaded dynamic linked library located at {}", self.id, self.path.display());
180 }
181 self.dependencies = DashSet::new();
182 if refreshing {
183 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll));
184 } else {
185 self.state = PluginState::Uninstalling(PluginUninstallingState::UninstallDll);
186 }
187 Changed
188 }
189
190 pub fn uninstall_dll(&mut self) -> PluginTransitionResult {
192 if self.state != PluginState::Uninstalling(PluginUninstallingState::UninstallDll)
193 && self.state != PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll))
194 {
195 return NoChange;
196 }
197 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll));
198 if !self.path.exists() {
199 if refreshing {
200 self.state = PluginState::Refreshing(PluginRefreshingState::Deploying);
201 } else {
202 self.state = PluginState::Uninstalled;
203 }
204 return NoChange;
205 }
206 match fs::remove_file(self.path.clone()) {
207 Ok(_) => {
208 debug!("Plugin {} deleted dynamic linked library located at {}", self.id, self.path.display());
209 if refreshing {
210 self.state = PluginState::Refreshing(PluginRefreshingState::Deploying);
211 } else {
212 self.state = PluginState::Uninstalled;
213 }
214 NoChange
216 }
217 Err(e) => {
218 error!("Failed to delete dynamic linked library of plugin {} located at {}: {}", self.id, self.path.display(), e);
219 NoChange
220 }
221 }
222 }
223
224 pub fn load_plugin_declaration(&mut self) -> PluginTransitionResult {
227 if self.state != PluginState::Resolving(PluginResolveState::Loaded)
228 && self.state != PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded))
229 {
230 return NoChange;
231 }
232 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded));
233 let reader = self.library.read().unwrap();
234 if let Some(library) = reader.as_ref() {
235 let library = library.clone();
236 unsafe {
237 trace!("Plugin {} is reading dynamic linked library symbol plugin_declaration", self.id);
238 match library.get::<*mut PluginDeclaration>(b"plugin_declaration\0") {
239 Ok(plugin_declaration) => {
240 {
241 let mut writer = self.plugin_declaration.write().unwrap();
242 *writer = Some(plugin_declaration.read());
243 }
244 debug!("Plugin {} successfully loaded plugin declaration", self.id);
245 if refreshing {
246 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded));
247 } else {
248 self.state = PluginState::Resolving(PluginResolveState::PluginDeclarationLoaded);
249 }
250 return Changed;
251 }
252 Err(e) => {
253 error!("Plugin {} failed to get symbol plugin_declaration: {}", self.id, e);
254 }
255 }
256 }
257 }
258 NoChange
259 }
260
261 pub fn check_compatibility(&mut self) -> PluginTransitionResult {
263 if self.state != PluginState::Resolving(PluginResolveState::PluginDeclarationLoaded)
264 && self.state != PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded))
265 {
266 return NoChange;
267 }
268 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded));
269 let reader = self.plugin_declaration.read().unwrap();
270 match *reader {
271 Some(plugin_declaration) => {
272 if plugin_declaration.rustc_version != RUSTC_VERSION {
273 error!(
274 "Plugin {} is not compatible: Expected rustc_version {} - Actual {}",
275 self.id, RUSTC_VERSION, plugin_declaration.rustc_version
276 );
277 if refreshing {
278 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::CompilerVersionMismatch));
279 } else {
280 self.state = PluginState::Resolving(PluginResolveState::CompilerVersionMismatch);
281 }
282 return Changed;
283 }
284 if plugin_declaration.plugin_api_version != PLUGIN_API_VERSION {
285 error!(
286 "Plugin {} is not compatible: Expected plugin_api_version {} - Actual {}",
287 self.id, PLUGIN_API_VERSION, plugin_declaration.plugin_api_version
288 );
289 if refreshing {
290 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginApiVersionMismatch));
291 } else {
292 self.state = PluginState::Resolving(PluginResolveState::PluginApiVersionMismatch);
293 }
294 return Changed;
295 }
296 debug!("Plugin {} is compatible with the rustc_version and the plugin_api_version)", self.id);
297 if refreshing {
298 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible));
299 } else {
300 self.state = PluginState::Resolving(PluginResolveState::PluginCompatible);
301 }
302 Changed
303 }
304 None => {
305 if refreshing {
306 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded));
307 } else {
308 self.state = PluginState::Resolving(PluginResolveState::Loaded);
309 }
310 Changed
311 }
312 }
313 }
314
315 #[allow(clippy::redundant_clone)]
316 pub fn load_plugin_dependencies(&mut self) -> PluginTransitionResult {
317 if self.state != PluginState::Resolving(PluginResolveState::PluginCompatible)
318 && self.state != PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible))
319 {
320 return NoChange;
321 }
322 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible));
323 let reader = self.plugin_declaration.read().unwrap();
324 if let Some(plugin_declaration) = *reader {
325 trace!("Plugin {} is loading the list of dependencies", self.id);
326 let dependencies = unsafe { (plugin_declaration.get_dependencies)() };
327 for dependency in dependencies.clone() {
328 trace!("Plugin {} depends on {}:{}", self.id, &dependency.name, &dependency.version);
329 self.dependencies.insert(dependency);
330 }
331 if refreshing {
332 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::DependenciesNotActive));
333 } else {
334 self.state = PluginState::Resolving(PluginResolveState::DependenciesNotActive);
335 }
336 return Changed;
337 }
338 NoChange
339 }
340
341 #[allow(clippy::collapsible_match)]
343 pub fn construct_proxy(&mut self, plugin_context: Arc<dyn PluginContext + Send + Sync>) -> PluginTransitionResult {
344 if self.state != PluginState::Starting(PluginStartingState::ConstructingProxy)
345 && self.state != PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ConstructingProxy))
346 {
347 return NoChange;
348 }
349 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ConstructingProxy));
350
351 let reader = self.plugin_declaration.read().unwrap();
352 if let Some(plugin_declaration) = *reader {
353 trace!("Plugin {} is constructing proxy", self.id);
354 let mut registrar = PluginRegistrar::new(plugin_context);
355 unsafe {
356 if let Err(e) = (plugin_declaration.register)(&mut registrar) {
357 error!("Dependency injection error in plugin {}:\n\n {e}\n", self.id);
358 if let PluginLoadingError::ComponentInstanceProviderError(ComponentInstanceProviderError::NoPrimaryInstance { type_name, .. }) = e {
359 if let Some(type_name) = type_name {
360 let type_name_stripped = type_name.replace("dyn ", "").replace(" + core::marker::Send + core::marker::Sync", "");
361 let notice = if type_name_stripped == "reactive_graph_plugin_api::plugin::Plugin" {
362 "\n Notice: Every plugin must provide a component that implements reactive_graph_plugin_api::Plugin!"
363 } else {
364 ""
365 };
366 error!("Missing component\n\n Plugin: {}\n Component: {type_name_stripped}{notice}\n", plugin_declaration.name);
367 }
368 }
369 self.state = PluginState::Resolved;
370 return NoChange;
371 }
372 self.state = PluginState::Starting(PluginStartingState::Registering);
373 }
374 let mut writer = self.proxy.write().unwrap();
375 *writer = registrar.plugin;
376 debug!("Plugin {} successfully constructed proxy", self.id);
377 if refreshing {
378 self.state = PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Registering));
379 } else {
380 self.state = PluginState::Starting(PluginStartingState::Registering);
381 }
382 return Changed;
383 }
384 NoChange
385 }
386
387 pub async fn activate(&mut self) -> PluginTransitionResult {
389 if self.state != PluginState::Starting(PluginStartingState::Activating)
391 && self.state != PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Activating))
392 {
393 return NoChange;
394 }
395 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Activating));
396
397 let proxy = {
398 let reader = self.proxy.read().unwrap();
399 let Some(proxy) = reader.as_ref().cloned() else {
400 return NoChange;
401 };
402 proxy.clone()
403 };
404 trace!("Plugin {} is being activated", self.id);
405 match proxy.activate().await {
406 Ok(_) => {
407 debug!("Plugin {} has been activated successfully", self.id);
408 self.state = PluginState::Active;
409 info!(
410 "[ACTIVE] {} {}",
411 self.name().unwrap_or_default().replace(PLUGIN_NAME_PREFIX, ""),
412 self.version().unwrap_or_default()
413 );
414 }
415 Err(e) => {
416 error!(
417 "[FAILED] {} {}: {}",
418 self.name().unwrap_or_default().replace(PLUGIN_NAME_PREFIX, ""),
419 self.version().unwrap_or_default(),
420 e
421 );
422 if refreshing {
423 self.state = PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ActivationFailed));
424 } else {
425 self.state = PluginState::Starting(PluginStartingState::ActivationFailed);
426 }
427 }
428 }
429 Changed
430 }
431
432 pub async fn deactivate(&mut self) -> PluginTransitionResult {
434 if self.state != PluginState::Stopping(PluginStoppingState::Deactivating)
435 && self.state != PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating))
436 {
437 return NoChange;
438 }
439 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating));
440
441 let proxy = {
442 let reader = self.proxy.read().unwrap();
443 let Some(proxy) = reader.as_ref().cloned() else {
444 return NoChange;
445 };
446 proxy.clone()
447 };
448 trace!("Plugin {} is being deactivated", self.id);
449 match proxy.deactivate().await {
450 Ok(_) => {
451 debug!("Plugin {} has been deactivated successfully", self.id);
452 if refreshing {
453 self.state = PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Unregistering));
454 } else {
455 self.state = PluginState::Stopping(PluginStoppingState::Unregistering);
456 }
457 return Changed;
458 }
459 Err(e) => {
460 error!("Plugin {} failed to deactivate: {:?}", self.id, e);
461 }
462 }
463 NoChange
464 }
465
466 pub fn remove_proxy(&mut self) -> PluginTransitionResult {
468 if self.state != PluginState::Stopping(PluginStoppingState::RemoveProxy)
469 && self.state != PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::RemoveProxy))
470 {
471 return NoChange;
472 }
473 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::RemoveProxy));
474 let mut writer = self.proxy.write().unwrap();
475 *writer = None;
477 if refreshing {
478 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
479 } else {
480 self.state = PluginState::Resolved;
481 }
482 Changed
483 }
484
485 pub fn start(&mut self) -> Result<(), PluginStartError> {
489 match self.state {
490 PluginState::Active => Err(PluginStartError::AlreadyActive),
491 PluginState::Starting(_) | PluginState::Stopping(_) | PluginState::Refreshing(_) | PluginState::Uninstalling(_) => {
492 Err(PluginStartError::InTransition)
493 }
494 PluginState::Uninstalled | PluginState::Disabled | PluginState::Installed | PluginState::Resolving(_) => {
495 Err(PluginStartError::NotResolved(self.state))
496 }
497 PluginState::Resolved => {
498 trace!("Starting plugin {}", self.id);
499 self.state = PluginState::Starting(PluginStartingState::ConstructingProxy);
500 Ok(())
501 }
502 }
503 }
504
505 pub fn stop(&mut self) -> Result<(), PluginStopError> {
506 match self.state {
507 PluginState::Stopping(_) | PluginState::Starting(_) | PluginState::Refreshing(_) | PluginState::Uninstalling(_) => {
508 Err(PluginStopError::InTransition)
509 }
510 PluginState::Uninstalled | PluginState::Disabled | PluginState::Installed | PluginState::Resolving(_) | PluginState::Resolved => {
511 Err(PluginStopError::NotActive)
512 }
513 PluginState::Active => {
514 trace!("Stopping plugin {}", self.id);
515 self.state = PluginState::Stopping(PluginStoppingState::Deactivating);
516 Ok(())
517 }
518 }
519 }
520
521 pub fn uninstall(&mut self) -> Result<(), PluginUninstallError> {
522 match self.state {
523 PluginState::Stopping(_) | PluginState::Starting(_) | PluginState::Refreshing(_) | PluginState::Uninstalling(_) => {
524 Err(PluginUninstallError::InTransition)
525 }
526 PluginState::Uninstalled => Err(PluginUninstallError::AlreadyUninstalled),
527 PluginState::Disabled => Err(PluginUninstallError::Disabled),
528 PluginState::Active => Err(PluginUninstallError::NotStopped),
529 PluginState::Installed | PluginState::Resolving(_) | PluginState::Resolved => {
530 trace!("Uninstalling plugin {}", self.id);
531 self.state = PluginState::Uninstalling(PluginUninstallingState::UnloadDll);
532 Ok(())
533 }
534 }
535 }
536
537 pub fn redeploy(&mut self) -> Result<(), PluginDeployError> {
538 match self.state {
539 PluginState::Stopping(_) | PluginState::Starting(_) | PluginState::Uninstalling(_) => Err(PluginDeployError::InTransition),
540 PluginState::Refreshing(PluginRefreshingState::Resolving(_)) => {
541 trace!("Redeploying resolved plugin {}", self.id);
543 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
544 Ok(())
545 }
546 PluginState::Refreshing(_) => Err(PluginDeployError::InTransition),
547 PluginState::Uninstalled => Err(PluginDeployError::Uninstalled),
548 PluginState::Active => {
549 trace!("Redeploying active plugin {}", self.id);
550 self.state = PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating));
551 Ok(())
552 }
553 PluginState::Resolved => {
554 trace!("Redeploying resolved plugin {}", self.id);
555 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
556 Ok(())
557 }
558 PluginState::Installed | PluginState::Resolving(_) | PluginState::Disabled => {
559 trace!("Redeploying installed plugin {}", self.id);
560 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
561 Ok(())
562 }
563 }
564 }
565
566 pub fn disable(&mut self) -> Result<(), PluginDisableError> {
567 trace!("Disable plugin {}", self.id);
568 self.state = PluginState::Disabled;
569 Ok(())
570 }
571
572 pub fn name(&self) -> Option<String> {
575 let reader = self.plugin_declaration.read().unwrap();
576 (*reader).map(|plugin_declaration| plugin_declaration.name.to_string())
577 }
578
579 pub fn name_canonicalized(&self) -> Option<String> {
580 let reader = self.plugin_declaration.read().unwrap();
581 (*reader).map(|plugin_declaration| plugin_declaration.name.replace(PLUGIN_NAME_PREFIX, ""))
582 }
583
584 pub fn name_version(&self) -> Option<String> {
585 let reader = self.plugin_declaration.read().unwrap();
586 (*reader).map(|plugin_declaration| format!("{}:{}", plugin_declaration.name.replace(PLUGIN_NAME_PREFIX, ""), plugin_declaration.version))
587 }
588
589 pub fn description(&self) -> Option<String> {
590 let reader = self.plugin_declaration.read().unwrap();
591 (*reader).map(|plugin_declaration| plugin_declaration.description.to_string())
592 }
593
594 pub fn version(&self) -> Option<String> {
595 let reader = self.plugin_declaration.read().unwrap();
596 (*reader).map(|plugin_declaration| plugin_declaration.version.to_string())
597 }
598
599 pub fn rustc_version(&self) -> Option<String> {
600 let reader = self.plugin_declaration.read().unwrap();
601 (*reader).map(|plugin_declaration| plugin_declaration.rustc_version.to_string())
602 }
603
604 pub fn plugin_api_version(&self) -> Option<String> {
605 let reader = self.plugin_declaration.read().unwrap();
606 (*reader).map(|plugin_declaration| plugin_declaration.plugin_api_version.to_string())
607 }
608}