reactive_graph_remotes_impl/
remotes_manager_impl.rs1use std::collections::HashSet;
2use std::sync::Arc;
3
4use async_trait::async_trait;
5use chrono::Utc;
6use dashmap::DashSet;
7use gql_client::Client;
8use log::error;
9use log::info;
10use serde::Deserialize;
11
12use reactive_graph_config_api::ConfigManager;
13use reactive_graph_config_model::RemotesConfig;
14use reactive_graph_lifecycle::Lifecycle;
15use reactive_graph_remotes_api::FailedToAddInstance;
16use reactive_graph_remotes_api::FailedToFetchInstanceInfo;
17use reactive_graph_remotes_api::FailedToFetchRemoteInstances;
18use reactive_graph_remotes_api::FailedToUpdateInstance;
19use reactive_graph_remotes_api::RemotesManager;
20use reactive_graph_remotes_model::InstanceAddress;
21use reactive_graph_remotes_model::InstanceInfo;
22use springtime_di::Component;
23use springtime_di::component_alias;
24
25#[derive(Component)]
26pub struct RemotesManagerImpl {
27 config_manager: Arc<dyn ConfigManager + Send + Sync>,
28
29 #[component(default = "DashSet::new")]
30 remote_instances: DashSet<InstanceInfo>,
31}
32
33#[async_trait]
34#[component_alias]
35impl RemotesManager for RemotesManagerImpl {
36 fn get_all(&self) -> Vec<InstanceInfo> {
38 self.remote_instances.iter().map(|i| i.key().clone()).collect()
39 }
41
42 fn get(&self, address: &InstanceAddress) -> Option<InstanceInfo> {
43 self.remote_instances.iter().find(|i| i.key() == address).map(|i| i.key().clone()) }
45
46 fn has(&self, address: &InstanceAddress) -> bool {
47 self.remote_instances.iter().any(|i| i.key() == address)
48 }
49
50 fn get_all_addresses(&self) -> HashSet<InstanceAddress> {
51 self.remote_instances.iter().map(|i| i.address()).collect()
52 }
53
54 async fn add(&self, address: &InstanceAddress) -> Result<InstanceInfo, FailedToAddInstance> {
55 if self.has(address) {
56 return Err(FailedToAddInstance::InstanceAddressAlreadyExists);
57 }
58 match self.inspect_remote(address).await {
59 Ok(instance) => {
60 self.add_checked(instance);
61 self.get(address).ok_or(FailedToAddInstance::InstanceNotAdded)
62 }
63 Err(e) => Err(FailedToAddInstance::FailedToFetchInstanceInfo(e)),
64 }
65 }
66
67 fn remove(&self, address: &InstanceAddress) -> bool {
68 if !self.has(address) {
69 return false;
70 }
71 self.remote_instances.retain(|i| i != address);
74 let mut remotes = self.config_manager.get_remotes_config();
75 remotes.remove(address);
76 self.config_manager.set_remotes_config(remotes);
77 self.config_manager.write_remotes_config();
78 true
79 }
80
81 fn remove_all(&self) {
82 self.remote_instances.clear();
85 self.config_manager.set_remotes_config(RemotesConfig::new(HashSet::new()));
86 self.config_manager.write_remotes_config();
87 }
88
89 async fn update(&self, address: &InstanceAddress) -> Result<InstanceInfo, FailedToUpdateInstance> {
90 if !self.has(address) {
91 return Err(FailedToUpdateInstance::InstanceAddressDoesNotExist);
92 }
93 match self.inspect_remote(address).await {
94 Ok(instance) => {
95 self.replace(instance);
96 self.get(address).ok_or(FailedToUpdateInstance::InstanceNotUpdated)
97 }
98 Err(e) => Err(FailedToUpdateInstance::FailedToFetchInstanceInfo(e)),
99 }
100 }
101
102 async fn update_all(&self) -> Vec<InstanceInfo> {
103 let mut updated_remotes = vec![];
104 for address in self.get_all_addresses().iter() {
105 if let Ok(instance) = self.update(address).await {
106 self.replace(instance.clone());
107 updated_remotes.push(instance);
108 }
109 }
110 updated_remotes
111 }
112
113 async fn fetch_and_add_remotes_from_remote(&self, address: &InstanceAddress) -> Result<Vec<InstanceInfo>, FailedToFetchRemoteInstances> {
114 let remote_instances = self.fetch_remotes_from_remote(address).await?;
115 let mut added_instances = Vec::new();
116 for remote_instance in remote_instances {
117 info!("{}", remote_instance.url_reactive_graph_runtime());
118 if let Ok(instance) = self.add(&remote_instance).await {
119 added_instances.push(instance);
120 }
121 }
122 Ok(added_instances)
123 }
124
125 async fn fetch_and_add_remotes_from_all_remotes(&self) -> Vec<InstanceInfo> {
126 let mut all_added_instances = Vec::new();
127 for address in self.get_all_addresses().iter() {
128 if let Ok(mut added_instances) = self.fetch_and_add_remotes_from_remote(address).await {
129 all_added_instances.append(&mut added_instances);
130 };
131 }
132 all_added_instances
133 }
134}
135
136impl RemotesManagerImpl {
137 async fn inspect_remote(&self, address: &InstanceAddress) -> Result<InstanceInfo, FailedToFetchInstanceInfo> {
138 let query = include_str!("get_instance_info.graphql");
139 let client = Client::new(address.url_reactive_graph_runtime());
140 let data = client.query::<InstanceInfoQuery>(query).await;
141 match data {
142 Ok(Some(query)) => Ok(query.instance_info),
143 Ok(None) => Err(FailedToFetchInstanceInfo::InvalidResponseData),
144 Err(e) => {
145 error!("{e}");
146 Err(FailedToFetchInstanceInfo::RequestError(e))
147 }
148 }
149 }
150
151 async fn fetch_remotes_from_remote(&self, address: &InstanceAddress) -> Result<Vec<InstanceAddress>, FailedToFetchRemoteInstances> {
152 let query = include_str!("get_all_remotes.graphql");
153 let client = Client::new(address.url_reactive_graph_runtime());
154 let data = client.query::<FetchRemotesFromRemoteQuery>(query).await;
155 match data {
156 Ok(Some(query)) => Ok(query.remotes),
157 Ok(None) => Err(FailedToFetchRemoteInstances::InvalidResponseData),
158 Err(e) => {
159 error!("{e}");
160 Err(FailedToFetchRemoteInstances::RequestError(e))
161 }
162 }
163 }
164
165 fn add_checked(&self, instance: InstanceInfo) {
166 if self.has(&instance.address()) {
167 return;
168 }
169 let mut instance = instance;
170 instance.last_seen = Utc::now();
171 let address = instance.address.clone();
172 self.remote_instances.insert(instance);
175 let mut remotes = self.config_manager.get_remotes_config();
176 remotes.merge(address);
177 self.config_manager.set_remotes_config(remotes);
178 self.config_manager.write_remotes_config();
179 }
180
181 fn replace(&self, instance: InstanceInfo) {
182 self.remove(&instance.address());
183 let mut instance = instance;
184 instance.last_seen = Utc::now();
185 self.add_checked(instance);
186 }
187}
188
189#[async_trait]
190impl Lifecycle for RemotesManagerImpl {
191 async fn post_init(&self) {
192 for address in self.config_manager.get_remotes_config().into_iter() {
193 match self.add(&address).await {
194 Ok(instance_info) => {
195 info!("Added remote instance {} from {}", instance_info.name, instance_info.address().url_reactive_graph_runtime());
196 }
197 Err(_) => {
198 error!("Failed to add remote instance {}", address.url_reactive_graph_runtime())
199 }
200 }
201 }
202 }
203}
204
205#[derive(Deserialize)]
206#[serde(rename_all = "camelCase")]
207struct InstanceInfoQuery {
208 instance_info: InstanceInfo,
210}
211
212#[derive(Deserialize)]
219#[serde(rename_all = "camelCase")]
220struct FetchRemotesFromRemoteQuery {
221 remotes: Vec<InstanceAddress>,
223}
224
225