reactive_graph_remotes_impl/
remotes_manager_impl.rs

1use std::collections::HashSet;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use dashmap::DashSet;
7use gql_client::Client;
8use log::error;
9use log::info;
10use serde::Deserialize;
11
12use reactive_graph_config_api::ConfigManager;
13use reactive_graph_config_model::RemotesConfig;
14use reactive_graph_lifecycle::Lifecycle;
15use reactive_graph_remotes_api::FailedToAddInstance;
16use reactive_graph_remotes_api::FailedToFetchInstanceInfo;
17use reactive_graph_remotes_api::FailedToFetchRemoteInstances;
18use reactive_graph_remotes_api::FailedToUpdateInstance;
19use reactive_graph_remotes_api::RemotesManager;
20use reactive_graph_remotes_model::InstanceAddress;
21use reactive_graph_remotes_model::InstanceInfo;
22use springtime_di::Component;
23use springtime_di::component_alias;
24
25#[derive(Component)]
26pub struct RemotesManagerImpl {
27    config_manager: Arc<dyn ConfigManager + Send + Sync>,
28
29    #[component(default = "DashSet::new")]
30    remote_instances: DashSet<InstanceInfo>,
31}
32
33#[async_trait]
34#[component_alias]
35impl RemotesManager for RemotesManagerImpl {
36    // Returns a copy
37    fn get_all(&self) -> Vec<InstanceInfo> {
38        self.remote_instances.iter().map(|i| i.key().clone()).collect()
39        // self.remote_instances.to_vec()
40    }
41
42    fn get(&self, address: &InstanceAddress) -> Option<InstanceInfo> {
43        self.remote_instances.iter().find(|i| i.key() == address).map(|i| i.key().clone()) // .cloned()
44    }
45
46    fn has(&self, address: &InstanceAddress) -> bool {
47        self.remote_instances.iter().any(|i| i.key() == address)
48    }
49
50    fn get_all_addresses(&self) -> HashSet<InstanceAddress> {
51        self.remote_instances.iter().map(|i| i.address()).collect()
52    }
53
54    async fn add(&self, address: &InstanceAddress) -> Result<InstanceInfo, FailedToAddInstance> {
55        if self.has(address) {
56            return Err(FailedToAddInstance::InstanceAddressAlreadyExists);
57        }
58        match self.inspect_remote(address).await {
59            Ok(instance) => {
60                self.add_checked(instance);
61                self.get(address).ok_or(FailedToAddInstance::InstanceNotAdded)
62            }
63            Err(e) => Err(FailedToAddInstance::FailedToFetchInstanceInfo(e)),
64        }
65    }
66
67    fn remove(&self, address: &InstanceAddress) -> bool {
68        if !self.has(address) {
69            return false;
70        }
71        // let mut writer = self.remote_instances.0.write().unwrap();
72        // writer.retain(|i| i != address);
73        self.remote_instances.retain(|i| i != address);
74        let mut remotes = self.config_manager.get_remotes_config();
75        remotes.remove(address);
76        self.config_manager.set_remotes_config(remotes);
77        self.config_manager.write_remotes_config();
78        true
79    }
80
81    fn remove_all(&self) {
82        // let mut writer = self.remote_instances.0.write().unwrap();
83        // writer.clear();
84        self.remote_instances.clear();
85        self.config_manager.set_remotes_config(RemotesConfig::new(HashSet::new()));
86        self.config_manager.write_remotes_config();
87    }
88
89    async fn update(&self, address: &InstanceAddress) -> Result<InstanceInfo, FailedToUpdateInstance> {
90        if !self.has(address) {
91            return Err(FailedToUpdateInstance::InstanceAddressDoesNotExist);
92        }
93        match self.inspect_remote(address).await {
94            Ok(instance) => {
95                self.replace(instance);
96                self.get(address).ok_or(FailedToUpdateInstance::InstanceNotUpdated)
97            }
98            Err(e) => Err(FailedToUpdateInstance::FailedToFetchInstanceInfo(e)),
99        }
100    }
101
102    async fn update_all(&self) -> Vec<InstanceInfo> {
103        let mut updated_remotes = vec![];
104        for address in self.get_all_addresses().iter() {
105            if let Ok(instance) = self.update(address).await {
106                self.replace(instance.clone());
107                updated_remotes.push(instance);
108            }
109        }
110        updated_remotes
111    }
112
113    async fn fetch_and_add_remotes_from_remote(&self, address: &InstanceAddress) -> Result<Vec<InstanceInfo>, FailedToFetchRemoteInstances> {
114        let remote_instances = self.fetch_remotes_from_remote(address).await?;
115        let mut added_instances = Vec::new();
116        for remote_instance in remote_instances {
117            info!("{}", remote_instance.url_reactive_graph_runtime());
118            if let Ok(instance) = self.add(&remote_instance).await {
119                added_instances.push(instance);
120            }
121        }
122        Ok(added_instances)
123    }
124
125    async fn fetch_and_add_remotes_from_all_remotes(&self) -> Vec<InstanceInfo> {
126        let mut all_added_instances = Vec::new();
127        for address in self.get_all_addresses().iter() {
128            if let Ok(mut added_instances) = self.fetch_and_add_remotes_from_remote(address).await {
129                all_added_instances.append(&mut added_instances);
130            };
131        }
132        all_added_instances
133    }
134}
135
136impl RemotesManagerImpl {
137    async fn inspect_remote(&self, address: &InstanceAddress) -> Result<InstanceInfo, FailedToFetchInstanceInfo> {
138        let query = include_str!("get_instance_info.graphql");
139        let client = Client::new(address.url_reactive_graph_runtime());
140        let data = client.query::<InstanceInfoQuery>(query).await;
141        match data {
142            Ok(Some(query)) => Ok(query.instance_info),
143            Ok(None) => Err(FailedToFetchInstanceInfo::InvalidResponseData),
144            Err(e) => {
145                error!("{e}");
146                Err(FailedToFetchInstanceInfo::RequestError(e))
147            }
148        }
149    }
150
151    async fn fetch_remotes_from_remote(&self, address: &InstanceAddress) -> Result<Vec<InstanceAddress>, FailedToFetchRemoteInstances> {
152        let query = include_str!("get_all_remotes.graphql");
153        let client = Client::new(address.url_reactive_graph_runtime());
154        let data = client.query::<FetchRemotesFromRemoteQuery>(query).await;
155        match data {
156            Ok(Some(query)) => Ok(query.remotes),
157            Ok(None) => Err(FailedToFetchRemoteInstances::InvalidResponseData),
158            Err(e) => {
159                error!("{e}");
160                Err(FailedToFetchRemoteInstances::RequestError(e))
161            }
162        }
163    }
164
165    fn add_checked(&self, instance: InstanceInfo) {
166        if self.has(&instance.address()) {
167            return;
168        }
169        let mut instance = instance;
170        instance.last_seen = Utc::now();
171        let address = instance.address.clone();
172        // let mut writer = self.remote_instances.0.write().unwrap();
173        // writer.push(instance);
174        self.remote_instances.insert(instance);
175        let mut remotes = self.config_manager.get_remotes_config();
176        remotes.merge(address);
177        self.config_manager.set_remotes_config(remotes);
178        self.config_manager.write_remotes_config();
179    }
180
181    fn replace(&self, instance: InstanceInfo) {
182        self.remove(&instance.address());
183        let mut instance = instance;
184        instance.last_seen = Utc::now();
185        self.add_checked(instance);
186    }
187}
188
189#[async_trait]
190impl Lifecycle for RemotesManagerImpl {
191    async fn post_init(&self) {
192        for address in self.config_manager.get_remotes_config().into_iter() {
193            match self.add(&address).await {
194                Ok(instance_info) => {
195                    info!("Added remote instance {} from {}", instance_info.name, instance_info.address().url_reactive_graph_runtime());
196                }
197                Err(_) => {
198                    error!("Failed to add remote instance {}", address.url_reactive_graph_runtime())
199                }
200            }
201        }
202    }
203}
204
205#[derive(Deserialize)]
206#[serde(rename_all = "camelCase")]
207struct InstanceInfoQuery {
208    // system: InstanceInfoSystem,
209    instance_info: InstanceInfo,
210}
211
212// #[derive(Deserialize)]
213// #[serde(rename_all = "camelCase")]
214// struct InstanceInfoSystem {
215//     instance_info: InstanceInfo,
216// }
217
218#[derive(Deserialize)]
219#[serde(rename_all = "camelCase")]
220struct FetchRemotesFromRemoteQuery {
221    // system: FetchRemotesFromRemoteSystem,
222    remotes: Vec<InstanceAddress>,
223}
224
225// #[derive(Deserialize)]
226// #[serde(rename_all = "camelCase")]
227// struct FetchRemotesFromRemoteSystem {
228//     remotes: Vec<InstanceAddress>,
229// }