Skip to content

Commit a52bba6

Browse files
incrypto32lutter
authored andcommitted
Revert "server/index-node: Remove private explorer API"
This reverts commit ab64dc6.
1 parent cc0bec4 commit a52bba6

File tree

3 files changed

+226
-0
lines changed

3 files changed

+226
-0
lines changed

server/index-node/src/explorer.rs

Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
//! Functionality to support the explorer in the hosted service. Everything
2+
//! in this file is private API and experimental and subject to change at
3+
//! any time
4+
use graph::components::server::query::{ServerResponse, ServerResult};
5+
use graph::http_body_util::Full;
6+
use graph::hyper::header::{
7+
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN,
8+
CONTENT_TYPE,
9+
};
10+
use graph::hyper::{Response, StatusCode};
11+
use graph::prelude::r;
12+
use std::{sync::Arc, time::Instant};
13+
14+
use graph::{
15+
components::{
16+
server::{index_node::VersionInfo, query::ServerError},
17+
store::StatusStore,
18+
},
19+
data::subgraph::status,
20+
object,
21+
prelude::{serde_json, warn, Logger, ENV_VARS},
22+
util::timed_cache::TimedCache,
23+
};
24+
25+
// Do not implement `Clone` for this; the IndexNode service puts the `Explorer`
26+
// behind an `Arc` so we don't have to put each `Cache` into an `Arc`
27+
//
28+
// We cache responses for a fixed amount of time with the time given by
29+
// `GRAPH_EXPLORER_TTL`
30+
#[derive(Debug)]
31+
pub struct Explorer<S> {
32+
store: Arc<S>,
33+
versions: TimedCache<String, r::Value>,
34+
version_infos: TimedCache<String, VersionInfo>,
35+
entity_counts: TimedCache<String, r::Value>,
36+
}
37+
38+
impl<S> Explorer<S>
39+
where
40+
S: StatusStore,
41+
{
42+
pub fn new(store: Arc<S>) -> Self {
43+
Self {
44+
store,
45+
versions: TimedCache::new(ENV_VARS.explorer_ttl),
46+
version_infos: TimedCache::new(ENV_VARS.explorer_ttl),
47+
entity_counts: TimedCache::new(ENV_VARS.explorer_ttl),
48+
}
49+
}
50+
51+
pub fn handle(&self, logger: &Logger, req: &[&str]) -> ServerResult {
52+
match req {
53+
["subgraph-versions", subgraph_id] => self.handle_subgraph_versions(subgraph_id),
54+
["subgraph-version", version] => self.handle_subgraph_version(version),
55+
["subgraph-repo", version] => self.handle_subgraph_repo(version),
56+
["entity-count", deployment] => self.handle_entity_count(logger, deployment),
57+
["subgraphs-for-deployment", deployment_hash] => {
58+
self.handle_subgraphs_for_deployment(deployment_hash)
59+
}
60+
_ => handle_not_found(),
61+
}
62+
}
63+
64+
fn handle_subgraph_versions(&self, subgraph_id: &str) -> ServerResult {
65+
if let Some(value) = self.versions.get(subgraph_id) {
66+
return Ok(as_http_response(value.as_ref()));
67+
}
68+
69+
let (current, pending) = self.store.versions_for_subgraph_id(subgraph_id)?;
70+
71+
let value = object! {
72+
currentVersion: current,
73+
pendingVersion: pending
74+
};
75+
76+
let resp = as_http_response(&value);
77+
self.versions.set(subgraph_id.to_string(), Arc::new(value));
78+
Ok(resp)
79+
}
80+
81+
fn handle_subgraph_version(&self, version: &str) -> ServerResult {
82+
let vi = self.version_info(version)?;
83+
84+
let latest_ethereum_block_number = vi.latest_ethereum_block_number;
85+
let total_ethereum_blocks_count = vi.total_ethereum_blocks_count;
86+
let value = object! {
87+
createdAt: vi.created_at.as_str(),
88+
deploymentId: vi.deployment_id.as_str(),
89+
latestEthereumBlockNumber: latest_ethereum_block_number,
90+
totalEthereumBlocksCount: total_ethereum_blocks_count,
91+
synced: vi.synced,
92+
failed: vi.failed,
93+
description: vi.description.as_deref(),
94+
repository: vi.repository.as_deref(),
95+
schema: vi.schema.document_string(),
96+
network: vi.network.as_str()
97+
};
98+
Ok(as_http_response(&value))
99+
}
100+
101+
fn handle_subgraph_repo(&self, version: &str) -> ServerResult {
102+
let vi = self.version_info(version)?;
103+
104+
let value = object! {
105+
createdAt: vi.created_at.as_str(),
106+
deploymentId: vi.deployment_id.as_str(),
107+
repository: vi.repository.as_deref()
108+
};
109+
Ok(as_http_response(&value))
110+
}
111+
112+
fn handle_entity_count(&self, logger: &Logger, deployment: &str) -> ServerResult {
113+
let start = Instant::now();
114+
let count = self.entity_counts.get(deployment);
115+
if start.elapsed() > ENV_VARS.explorer_lock_threshold {
116+
let action = match count {
117+
Some(_) => "cache_hit",
118+
None => "cache_miss",
119+
};
120+
warn!(logger, "Getting entity_count takes too long";
121+
"action" => action,
122+
"deployment" => deployment,
123+
"time_ms" => start.elapsed().as_millis());
124+
}
125+
126+
if let Some(value) = count {
127+
return Ok(as_http_response(value.as_ref()));
128+
}
129+
130+
let start = Instant::now();
131+
let infos = self
132+
.store
133+
.status(status::Filter::Deployments(vec![deployment.to_string()]))?;
134+
if start.elapsed() > ENV_VARS.explorer_query_threshold {
135+
warn!(logger, "Getting entity_count takes too long";
136+
"action" => "query_status",
137+
"deployment" => deployment,
138+
"time_ms" => start.elapsed().as_millis());
139+
}
140+
let info = match infos.first() {
141+
Some(info) => info,
142+
None => {
143+
return handle_not_found();
144+
}
145+
};
146+
147+
let value = object! {
148+
entityCount: info.entity_count as i32
149+
};
150+
let start = Instant::now();
151+
let resp = as_http_response(&value);
152+
if start.elapsed() > ENV_VARS.explorer_lock_threshold {
153+
warn!(logger, "Getting entity_count takes too long";
154+
"action" => "as_http_response",
155+
"deployment" => deployment,
156+
"time_ms" => start.elapsed().as_millis());
157+
}
158+
let start = Instant::now();
159+
self.entity_counts
160+
.set(deployment.to_string(), Arc::new(value));
161+
if start.elapsed() > ENV_VARS.explorer_lock_threshold {
162+
warn!(logger, "Getting entity_count takes too long";
163+
"action" => "cache_set",
164+
"deployment" => deployment,
165+
"time_ms" => start.elapsed().as_millis());
166+
}
167+
Ok(resp)
168+
}
169+
170+
fn version_info(&self, version: &str) -> Result<Arc<VersionInfo>, ServerError> {
171+
match self.version_infos.get(version) {
172+
Some(vi) => Ok(vi),
173+
None => {
174+
let vi = Arc::new(self.store.version_info(version)?);
175+
self.version_infos.set(version.to_string(), vi.clone());
176+
Ok(vi)
177+
}
178+
}
179+
}
180+
181+
fn handle_subgraphs_for_deployment(&self, deployment_hash: &str) -> ServerResult {
182+
let name_version_pairs: Vec<r::Value> = self
183+
.store
184+
.subgraphs_for_deployment_hash(deployment_hash)?
185+
.into_iter()
186+
.map(|(name, version)| {
187+
object! {
188+
name: name,
189+
version: version
190+
}
191+
})
192+
.collect();
193+
let payload = r::Value::List(name_version_pairs);
194+
Ok(as_http_response(&payload))
195+
}
196+
}
197+
198+
fn handle_not_found() -> ServerResult {
199+
Ok(Response::builder()
200+
.status(StatusCode::NOT_FOUND)
201+
.header(CONTENT_TYPE, "text/plain")
202+
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
203+
.body(Full::from("Not found\n"))
204+
.unwrap())
205+
}
206+
207+
fn as_http_response(value: &r::Value) -> ServerResponse {
208+
let status_code = StatusCode::OK;
209+
let json = serde_json::to_string(&value).expect("Failed to serialize response to JSON");
210+
Response::builder()
211+
.status(status_code)
212+
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
213+
.header(ACCESS_CONTROL_ALLOW_HEADERS, "Content-Type, User-Agent")
214+
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET, OPTIONS, POST")
215+
.header(CONTENT_TYPE, "application/json")
216+
.body(Full::from(json))
217+
.unwrap()
218+
}

server/index-node/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
mod auth;
2+
mod explorer;
23
mod resolver;
34
mod schema;
45
mod server;

server/index-node/src/service.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use graph_graphql::prelude::{execute_query, Query as PreparedQuery, QueryExecuti
2323

2424
use crate::auth::bearer_token;
2525

26+
use crate::explorer::Explorer;
2627
use crate::resolver::IndexNodeResolver;
2728
use crate::schema::SCHEMA;
2829

@@ -42,6 +43,7 @@ pub struct IndexNodeService<S> {
4243
logger: Logger,
4344
blockchain_map: Arc<BlockchainMap>,
4445
store: Arc<S>,
46+
explorer: Arc<Explorer<S>>,
4547
link_resolver: Arc<dyn LinkResolver>,
4648
}
4749

@@ -55,10 +57,13 @@ where
5557
store: Arc<S>,
5658
link_resolver: Arc<dyn LinkResolver>,
5759
) -> Self {
60+
let explorer = Arc::new(Explorer::new(store.clone()));
61+
5862
IndexNodeService {
5963
logger,
6064
blockchain_map,
6165
store,
66+
explorer,
6267
link_resolver,
6368
}
6469
}
@@ -224,6 +229,8 @@ where
224229
}
225230
(Method::OPTIONS, ["graphql"]) => Ok(Self::handle_graphql_options(req)),
226231

232+
(Method::GET, ["explorer", rest @ ..]) => self.explorer.handle(&self.logger, rest),
233+
227234
_ => Ok(Self::handle_not_found()),
228235
}
229236
}

0 commit comments

Comments
 (0)