reactive_graph_plugin_service_impl/
container.rs1use std::fs;
2use std::path::PathBuf;
3use std::sync::Arc;
4use std::sync::RwLock;
5
6use dashmap::DashSet;
7use libloading::Library;
8use log::debug;
9use log::error;
10use log::info;
11use log::trace;
12use springtime_di::instance_provider::ComponentInstanceProviderError;
13use uuid::Uuid;
14
15use crate::plugin_paths::get_deploy_path;
16use crate::plugin_paths::get_install_path;
17use reactive_graph_plugin_api::PLUGIN_API_VERSION;
18use reactive_graph_plugin_api::PLUGIN_NAME_PREFIX;
19use reactive_graph_plugin_api::Plugin;
20use reactive_graph_plugin_api::PluginContext;
21use reactive_graph_plugin_api::PluginDeclaration;
22use reactive_graph_plugin_api::PluginDependency;
23use reactive_graph_plugin_api::PluginDeployError;
24use reactive_graph_plugin_api::PluginDisableError;
25use reactive_graph_plugin_api::PluginLoadingError;
26use reactive_graph_plugin_api::PluginRefreshingState;
27use reactive_graph_plugin_api::PluginResolveState;
28use reactive_graph_plugin_api::PluginStartError;
29use reactive_graph_plugin_api::PluginStartingState;
30use reactive_graph_plugin_api::PluginState;
31use reactive_graph_plugin_api::PluginStopError;
32use reactive_graph_plugin_api::PluginStoppingState;
33use reactive_graph_plugin_api::PluginUninstallError;
34use reactive_graph_plugin_api::PluginUninstallingState;
35use reactive_graph_plugin_api::RUSTC_VERSION;
36use reactive_graph_plugin_service_api::PluginTransitionResult;
37use reactive_graph_plugin_service_api::PluginTransitionResult::Changed;
38use reactive_graph_plugin_service_api::PluginTransitionResult::NoChange;
39
40use crate::PluginProxy;
41use crate::PluginRegistrar;
42
43pub struct PluginContainer {
45 pub id: Uuid,
47
48 pub stem: String,
51
52 pub path: PathBuf,
55
56 pub state: PluginState,
58
59 pub plugin_declaration: RwLock<Option<PluginDeclaration>>,
61
62 pub proxy: Arc<RwLock<Option<Arc<PluginProxy>>>>,
64
65 pub library: RwLock<Option<Arc<Library>>>,
67
68 pub dependencies: DashSet<PluginDependency>,
70}
71
72impl PluginContainer {
73 pub fn new(stem: String, path: PathBuf) -> Self {
74 PluginContainer {
75 id: Uuid::new_v4(),
76 stem,
77 path,
78 state: PluginState::Installed,
79 plugin_declaration: RwLock::new(None),
80 proxy: Arc::new(RwLock::new(None)),
81 library: RwLock::new(None),
82 dependencies: DashSet::new(),
83 }
84 }
85
86 pub fn deploy_dll(&mut self) -> PluginTransitionResult {
88 if self.state != PluginState::Refreshing(PluginRefreshingState::Deploying) {
89 return NoChange;
90 }
91 let Some(deploy_path) = get_deploy_path(&self.path) else {
92 return NoChange;
93 };
94 if !deploy_path.exists() {
95 error!("Failed to deploy dynamic link library: {} does not exist", deploy_path.display());
96 self.state = PluginState::Uninstalled;
97 return Changed;
98 }
99 let Some(install_path) = get_install_path(&self.path) else {
100 return NoChange;
101 };
102 match fs::rename(&deploy_path, &install_path) {
103 Ok(_) => {
104 self.path = install_path.clone();
105 self.state = PluginState::Refreshing(PluginRefreshingState::Installed);
106 debug!("Plugin {} successfully deployed from {} to {}", self.id, deploy_path.display(), install_path.display());
107 Changed
108 }
109 Err(e) => {
110 error!("Failed to deploy plugin {} from {} to {}: {}", self.id, deploy_path.display(), install_path.display(), e);
111 self.state = PluginState::Uninstalled;
112 Changed
113 }
114 }
115 }
116
117 pub fn load_dll(&mut self) -> PluginTransitionResult {
119 if self.state != PluginState::Installed && self.state != PluginState::Refreshing(PluginRefreshingState::Installed) {
120 return NoChange;
121 }
122 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Installed);
123 unsafe {
124 match Library::new(self.path.as_os_str()) {
125 Ok(library) => {
126 let mut writer = self.library.write().unwrap();
127 *writer = Some(Arc::new(library));
128 if refreshing {
129 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded));
130 } else {
131 self.state = PluginState::Resolving(PluginResolveState::Loaded);
132 }
133 debug!("Plugin {} successfully loaded dynamic link library located at {}", self.id, self.path.display());
134 Changed
135 }
136 Err(e) => {
137 error!("Plugin {} failed to load dynamic link library located at {}: {:?}", self.id, self.path.display(), e);
138 self.state = PluginState::Uninstalling(PluginUninstallingState::UnloadDll);
139 Changed
140 }
141 }
142 }
143 }
144
145 pub fn unload_dll(&mut self) -> PluginTransitionResult {
147 if self.state != PluginState::Uninstalling(PluginUninstallingState::UnloadDll)
148 && self.state != PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll))
149 {
150 return NoChange;
151 }
152 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
153 {
154 let mut writer = self.plugin_declaration.write().unwrap();
155 *writer = None;
157 }
158 {
159 let mut writer = self.library.write().unwrap();
160 *writer = None;
162 debug!("Plugin {} unloaded dynamic linked library located at {}", self.id, self.path.display());
163 }
164 self.dependencies = DashSet::new();
165 if refreshing {
166 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll));
167 } else {
168 self.state = PluginState::Uninstalling(PluginUninstallingState::UninstallDll);
169 }
170 Changed
171 }
172
173 pub fn uninstall_dll(&mut self) -> PluginTransitionResult {
175 if self.state != PluginState::Uninstalling(PluginUninstallingState::UninstallDll)
176 && self.state != PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll))
177 {
178 return NoChange;
179 }
180 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UninstallDll));
181 if !self.path.exists() {
182 if refreshing {
183 self.state = PluginState::Refreshing(PluginRefreshingState::Deploying);
184 } else {
185 self.state = PluginState::Uninstalled;
186 }
187 return NoChange;
188 }
189 match fs::remove_file(self.path.clone()) {
190 Ok(_) => {
191 debug!("Plugin {} deleted dynamic linked library located at {}", self.id, self.path.display());
192 if refreshing {
193 self.state = PluginState::Refreshing(PluginRefreshingState::Deploying);
194 } else {
195 self.state = PluginState::Uninstalled;
196 }
197 NoChange
199 }
200 Err(e) => {
201 error!("Failed to delete dynamic linked library of plugin {} located at {}: {}", self.id, self.path.display(), e);
202 NoChange
203 }
204 }
205 }
206
207 pub fn load_plugin_declaration(&mut self) -> PluginTransitionResult {
210 if self.state != PluginState::Resolving(PluginResolveState::Loaded)
211 && self.state != PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded))
212 {
213 return NoChange;
214 }
215 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded));
216 let reader = self.library.read().unwrap();
217 if let Some(library) = reader.as_ref() {
218 let library = library.clone();
219 unsafe {
220 trace!("Plugin {} is reading dynamic linked library symbol plugin_declaration", self.id);
221 match library.get::<*mut PluginDeclaration>(b"plugin_declaration\0") {
222 Ok(plugin_declaration) => {
223 {
224 let mut writer = self.plugin_declaration.write().unwrap();
225 *writer = Some(plugin_declaration.read());
226 }
227 debug!("Plugin {} successfully loaded plugin declaration", self.id);
228 if refreshing {
229 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded));
230 } else {
231 self.state = PluginState::Resolving(PluginResolveState::PluginDeclarationLoaded);
232 }
233 return Changed;
234 }
235 Err(e) => {
236 error!("Plugin {} failed to get symbol plugin_declaration: {}", self.id, e);
237 }
238 }
239 }
240 }
241 NoChange
242 }
243
244 pub fn check_compatibility(&mut self) -> PluginTransitionResult {
246 if self.state != PluginState::Resolving(PluginResolveState::PluginDeclarationLoaded)
247 && self.state != PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded))
248 {
249 return NoChange;
250 }
251 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginDeclarationLoaded));
252 let reader = self.plugin_declaration.read().unwrap();
253 match *reader {
254 Some(plugin_declaration) => {
255 if plugin_declaration.rustc_version != RUSTC_VERSION {
256 error!(
257 "Plugin {} is not compatible: Expected rustc_version {} - Actual {}",
258 self.id, RUSTC_VERSION, plugin_declaration.rustc_version
259 );
260 if refreshing {
261 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::CompilerVersionMismatch));
262 } else {
263 self.state = PluginState::Resolving(PluginResolveState::CompilerVersionMismatch);
264 }
265 return Changed;
266 }
267 if plugin_declaration.plugin_api_version != PLUGIN_API_VERSION {
268 error!(
269 "Plugin {} is not compatible: Expected plugin_api_version {} - Actual {}",
270 self.id, PLUGIN_API_VERSION, plugin_declaration.plugin_api_version
271 );
272 if refreshing {
273 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginApiVersionMismatch));
274 } else {
275 self.state = PluginState::Resolving(PluginResolveState::PluginApiVersionMismatch);
276 }
277 return Changed;
278 }
279 debug!("Plugin {} is compatible with the rustc_version and the plugin_api_version)", self.id);
280 if refreshing {
281 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible));
282 } else {
283 self.state = PluginState::Resolving(PluginResolveState::PluginCompatible);
284 }
285 Changed
286 }
287 None => {
288 if refreshing {
289 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::Loaded));
290 } else {
291 self.state = PluginState::Resolving(PluginResolveState::Loaded);
292 }
293 Changed
294 }
295 }
296 }
297
298 #[allow(clippy::redundant_clone)]
299 pub fn load_plugin_dependencies(&mut self) -> PluginTransitionResult {
300 if self.state != PluginState::Resolving(PluginResolveState::PluginCompatible)
301 && self.state != PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible))
302 {
303 return NoChange;
304 }
305 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::PluginCompatible));
306 let reader = self.plugin_declaration.read().unwrap();
307 if let Some(plugin_declaration) = *reader {
308 trace!("Plugin {} is loading the list of dependencies", self.id);
309 let dependencies = unsafe { (plugin_declaration.get_dependencies)() };
310 for dependency in dependencies.clone() {
311 trace!("Plugin {} depends on {}:{}", self.id, &dependency.name, &dependency.version);
312 self.dependencies.insert(dependency);
313 }
314 if refreshing {
315 self.state = PluginState::Refreshing(PluginRefreshingState::Resolving(PluginResolveState::DependenciesNotActive));
316 } else {
317 self.state = PluginState::Resolving(PluginResolveState::DependenciesNotActive);
318 }
319 return Changed;
320 }
321 NoChange
322 }
323
324 #[allow(clippy::collapsible_match)]
326 pub fn construct_proxy(&mut self, plugin_context: Arc<dyn PluginContext + Send + Sync>) -> PluginTransitionResult {
327 if self.state != PluginState::Starting(PluginStartingState::ConstructingProxy)
328 && self.state != PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ConstructingProxy))
329 {
330 return NoChange;
331 }
332 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ConstructingProxy));
333
334 let reader = self.plugin_declaration.read().unwrap();
335 if let Some(plugin_declaration) = *reader {
336 trace!("Plugin {} is constructing proxy", self.id);
337 let mut registrar = PluginRegistrar::new(plugin_context);
338 unsafe {
339 if let Err(e) = (plugin_declaration.register)(&mut registrar) {
340 error!("Dependency injection error in plugin {}:\n\n {e}\n", self.id);
341 if let PluginLoadingError::ComponentInstanceProviderError(ComponentInstanceProviderError::NoPrimaryInstance { type_name, .. }) = e {
342 if let Some(type_name) = type_name {
343 let type_name_stripped = type_name.replace("dyn ", "").replace(" + core::marker::Send + core::marker::Sync", "");
344 let notice = if type_name_stripped == "reactive_graph_plugin_api::plugin::Plugin" {
345 "\n Notice: Every plugin must provide a component that implements reactive_graph_plugin_api::Plugin!"
346 } else {
347 ""
348 };
349 error!("Missing component\n\n Plugin: {}\n Component: {type_name_stripped}{notice}\n", plugin_declaration.name);
350 }
351 }
352 self.state = PluginState::Resolved;
353 return NoChange;
354 }
355 self.state = PluginState::Starting(PluginStartingState::Registering);
356 }
357 let mut writer = self.proxy.write().unwrap();
358 *writer = registrar.plugin;
359 debug!("Plugin {} successfully constructed proxy", self.id);
360 if refreshing {
361 self.state = PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Registering));
362 } else {
363 self.state = PluginState::Starting(PluginStartingState::Registering);
364 }
365 return Changed;
366 }
367 NoChange
368 }
369
370 pub async fn activate(&mut self) -> PluginTransitionResult {
372 if self.state != PluginState::Starting(PluginStartingState::Activating)
374 && self.state != PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Activating))
375 {
376 return NoChange;
377 }
378 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::Activating));
379
380 let proxy = {
381 let reader = self.proxy.read().unwrap();
382 let Some(proxy) = reader.as_ref().cloned() else {
383 return NoChange;
384 };
385 proxy.clone()
386 };
387 trace!("Plugin {} is being activated", self.id);
388 match proxy.activate().await {
389 Ok(_) => {
390 debug!("Plugin {} has been activated successfully", self.id);
391 self.state = PluginState::Active;
392 info!(
393 "[ACTIVE] {} {}",
394 self.name().unwrap_or_default().replace(PLUGIN_NAME_PREFIX, ""),
395 self.version().unwrap_or_default()
396 );
397 }
398 Err(e) => {
399 error!(
400 "[FAILED] {} {}: {}",
401 self.name().unwrap_or_default().replace(PLUGIN_NAME_PREFIX, ""),
402 self.version().unwrap_or_default(),
403 e
404 );
405 if refreshing {
406 self.state = PluginState::Refreshing(PluginRefreshingState::Starting(PluginStartingState::ActivationFailed));
407 } else {
408 self.state = PluginState::Starting(PluginStartingState::ActivationFailed);
409 }
410 }
411 }
412 Changed
413 }
414
415 pub async fn deactivate(&mut self) -> PluginTransitionResult {
417 if self.state != PluginState::Stopping(PluginStoppingState::Deactivating)
418 && self.state != PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating))
419 {
420 return NoChange;
421 }
422 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating));
423
424 let proxy = {
425 let reader = self.proxy.read().unwrap();
426 let Some(proxy) = reader.as_ref().cloned() else {
427 return NoChange;
428 };
429 proxy.clone()
430 };
431 trace!("Plugin {} is being deactivated", self.id);
432 match proxy.deactivate().await {
433 Ok(_) => {
434 debug!("Plugin {} has been deactivated successfully", self.id);
435 if refreshing {
436 self.state = PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Unregistering));
437 } else {
438 self.state = PluginState::Stopping(PluginStoppingState::Unregistering);
439 }
440 return Changed;
441 }
442 Err(e) => {
443 error!("Plugin {} failed to deactivate: {:?}", self.id, e);
444 }
445 }
446 NoChange
447 }
448
449 pub fn remove_proxy(&mut self) -> PluginTransitionResult {
451 if self.state != PluginState::Stopping(PluginStoppingState::RemoveProxy)
452 && self.state != PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::RemoveProxy))
453 {
454 return NoChange;
455 }
456 let refreshing = self.state == PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::RemoveProxy));
457 let mut writer = self.proxy.write().unwrap();
458 *writer = None;
460 if refreshing {
461 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
462 } else {
463 self.state = PluginState::Resolved;
464 }
465 Changed
466 }
467
468 pub fn start(&mut self) -> Result<(), PluginStartError> {
472 match self.state {
473 PluginState::Active => Err(PluginStartError::AlreadyActive),
474 PluginState::Starting(_) | PluginState::Stopping(_) | PluginState::Refreshing(_) | PluginState::Uninstalling(_) => {
475 Err(PluginStartError::InTransition)
476 }
477 PluginState::Uninstalled | PluginState::Disabled | PluginState::Installed | PluginState::Resolving(_) => {
478 Err(PluginStartError::NotResolved(self.state))
479 }
480 PluginState::Resolved => {
481 trace!("Starting plugin {}", self.id);
482 self.state = PluginState::Starting(PluginStartingState::ConstructingProxy);
483 Ok(())
484 }
485 }
486 }
487
488 pub fn stop(&mut self) -> Result<(), PluginStopError> {
489 match self.state {
490 PluginState::Stopping(_) | PluginState::Starting(_) | PluginState::Refreshing(_) | PluginState::Uninstalling(_) => {
491 Err(PluginStopError::InTransition)
492 }
493 PluginState::Uninstalled | PluginState::Disabled | PluginState::Installed | PluginState::Resolving(_) | PluginState::Resolved => {
494 Err(PluginStopError::NotActive)
495 }
496 PluginState::Active => {
497 trace!("Stopping plugin {}", self.id);
498 self.state = PluginState::Stopping(PluginStoppingState::Deactivating);
499 Ok(())
500 }
501 }
502 }
503
504 pub fn uninstall(&mut self) -> Result<(), PluginUninstallError> {
505 match self.state {
506 PluginState::Stopping(_) | PluginState::Starting(_) | PluginState::Refreshing(_) | PluginState::Uninstalling(_) => {
507 Err(PluginUninstallError::InTransition)
508 }
509 PluginState::Uninstalled => Err(PluginUninstallError::AlreadyUninstalled),
510 PluginState::Disabled => Err(PluginUninstallError::Disabled),
511 PluginState::Active => Err(PluginUninstallError::NotStopped),
512 PluginState::Installed | PluginState::Resolving(_) | PluginState::Resolved => {
513 trace!("Uninstalling plugin {}", self.id);
514 self.state = PluginState::Uninstalling(PluginUninstallingState::UnloadDll);
515 Ok(())
516 }
517 }
518 }
519
520 pub fn redeploy(&mut self) -> Result<(), PluginDeployError> {
521 match self.state {
522 PluginState::Stopping(_) | PluginState::Starting(_) | PluginState::Uninstalling(_) => Err(PluginDeployError::InTransition),
523 PluginState::Refreshing(PluginRefreshingState::Resolving(_)) => {
524 trace!("Redeploying resolved plugin {}", self.id);
526 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
527 Ok(())
528 }
529 PluginState::Refreshing(_) => Err(PluginDeployError::InTransition),
530 PluginState::Uninstalled => Err(PluginDeployError::Uninstalled),
531 PluginState::Active => {
532 trace!("Redeploying active plugin {}", self.id);
533 self.state = PluginState::Refreshing(PluginRefreshingState::Stopping(PluginStoppingState::Deactivating));
534 Ok(())
535 }
536 PluginState::Resolved => {
537 trace!("Redeploying resolved plugin {}", self.id);
538 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
539 Ok(())
540 }
541 PluginState::Installed | PluginState::Resolving(_) | PluginState::Disabled => {
542 trace!("Redeploying installed plugin {}", self.id);
543 self.state = PluginState::Refreshing(PluginRefreshingState::Uninstalling(PluginUninstallingState::UnloadDll));
544 Ok(())
545 }
546 }
547 }
548
549 pub fn disable(&mut self) -> Result<(), PluginDisableError> {
550 trace!("Disable plugin {}", self.id);
551 self.state = PluginState::Disabled;
552 Ok(())
553 }
554
555 pub fn name(&self) -> Option<String> {
558 let reader = self.plugin_declaration.read().unwrap();
559 (*reader).map(|plugin_declaration| plugin_declaration.name.to_string())
560 }
561
562 pub fn name_canonicalized(&self) -> Option<String> {
563 let reader = self.plugin_declaration.read().unwrap();
564 (*reader).map(|plugin_declaration| plugin_declaration.name.replace(PLUGIN_NAME_PREFIX, ""))
565 }
566
567 pub fn name_version(&self) -> Option<String> {
568 let reader = self.plugin_declaration.read().unwrap();
569 (*reader).map(|plugin_declaration| format!("{}:{}", plugin_declaration.name.replace(PLUGIN_NAME_PREFIX, ""), plugin_declaration.version))
570 }
571
572 pub fn description(&self) -> Option<String> {
573 let reader = self.plugin_declaration.read().unwrap();
574 (*reader).map(|plugin_declaration| plugin_declaration.description.to_string())
575 }
576
577 pub fn version(&self) -> Option<String> {
578 let reader = self.plugin_declaration.read().unwrap();
579 (*reader).map(|plugin_declaration| plugin_declaration.version.to_string())
580 }
581
582 pub fn rustc_version(&self) -> Option<String> {
583 let reader = self.plugin_declaration.read().unwrap();
584 (*reader).map(|plugin_declaration| plugin_declaration.rustc_version.to_string())
585 }
586
587 pub fn plugin_api_version(&self) -> Option<String> {
588 let reader = self.plugin_declaration.read().unwrap();
589 (*reader).map(|plugin_declaration| plugin_declaration.plugin_api_version.to_string())
590 }
591}