Skip to content

Commit 7a50938

Browse files
committed
all: Narrow how much of the ChainStore can be accessed through Chain
1 parent 12a7f50 commit 7a50938

File tree

7 files changed

+23
-16
lines changed

7 files changed

+23
-16
lines changed

chain/ethereum/src/chain.rs

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use graph::firehose::{FirehoseEndpoint, ForkStep};
1313
use graph::futures03::TryStreamExt;
1414
use graph::prelude::{
1515
retry, BlockHash, ComponentLoggerConfig, ElasticComponentLoggerConfig, EthereumBlock,
16-
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry,
16+
EthereumCallCache, LightEthereumBlock, LightEthereumBlockExt, MetricsRegistry, StoreError,
1717
};
1818
use graph::schema::InputSchema;
1919
use graph::slog::{debug, error, trace, warn};
@@ -175,7 +175,7 @@ impl BlockStreamBuilder<Chain> for EthereumStreamBuilder {
175175
.logger_factory
176176
.subgraph_logger(&deployment)
177177
.new(o!("component" => "BlockStream"));
178-
let chain_store = chain.chain_store();
178+
let chain_store = chain.chain_store.cheap_clone();
179179
let chain_head_update_stream = chain
180180
.chain_head_update_listener
181181
.subscribe(chain.name.to_string(), logger.clone());
@@ -403,6 +403,13 @@ impl Chain {
403403
self.call_cache.clone()
404404
}
405405

406+
pub async fn block_number(
407+
&self,
408+
hash: &BlockHash,
409+
) -> Result<Option<(String, BlockNumber, Option<u64>, Option<BlockHash>)>, StoreError> {
410+
self.chain_store.block_number(hash).await
411+
}
412+
406413
// TODO: This is only used to build the block stream which could prolly
407414
// be moved to the chain itself and return a block stream future that the
408415
// caller can spawn.
@@ -507,8 +514,8 @@ impl Blockchain for Chain {
507514
}
508515
}
509516

510-
fn chain_store(&self) -> Arc<dyn ChainStore> {
511-
self.chain_store.clone()
517+
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
518+
self.chain_store.cheap_clone().chain_head_ptr().await
512519
}
513520

514521
async fn block_pointer_from_number(
@@ -615,7 +622,7 @@ impl Blockchain for Chain {
615622
logger,
616623
graph::env::ENV_VARS.reorg_threshold(),
617624
self.chain_client(),
618-
self.chain_store().cheap_clone(),
625+
self.chain_store.cheap_clone(),
619626
self.polling_ingestor_interval,
620627
self.name.clone(),
621628
)?)

chain/near/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -275,8 +275,8 @@ impl Blockchain for Chain {
275275
unimplemented!("This chain does not support Dynamic Data Sources. is_refetch_block_required always returns false, this shouldn't be called.")
276276
}
277277

278-
fn chain_store(&self) -> Arc<dyn ChainStore> {
279-
self.chain_store.clone()
278+
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
279+
self.chain_store.cheap_clone().chain_head_ptr().await
280280
}
281281

282282
async fn block_pointer_from_number(

chain/substreams/src/chain.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -167,8 +167,8 @@ impl Blockchain for Chain {
167167
unimplemented!("This chain does not support Dynamic Data Sources. is_refetch_block_required always returns false, this shouldn't be called.")
168168
}
169169

170-
fn chain_store(&self) -> Arc<dyn ChainStore> {
171-
self.chain_store.clone()
170+
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
171+
self.chain_store.cheap_clone().chain_head_ptr().await
172172
}
173173

174174
async fn block_pointer_from_number(

core/src/subgraph/runner.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1141,7 +1141,7 @@ where
11411141
if cached_head_ptr.is_none()
11421142
|| close_to_chain_head(&block_ptr, &cached_head_ptr, CAUGHT_UP_DISTANCE)
11431143
{
1144-
self.state.cached_head_ptr = self.inputs.chain.chain_store().chain_head_ptr().await?;
1144+
self.state.cached_head_ptr = self.inputs.chain.chain_head_ptr().await?;
11451145
}
11461146
let is_caught_up =
11471147
close_to_chain_head(&block_ptr, &self.state.cached_head_ptr, CAUGHT_UP_DISTANCE);
@@ -1463,7 +1463,7 @@ where
14631463
&& !self.inputs.store.is_deployment_synced()
14641464
&& !close_to_chain_head(
14651465
&block_ptr,
1466-
&self.inputs.chain.chain_store().chain_head_ptr().await?,
1466+
&self.inputs.chain.chain_head_ptr().await?,
14671467
// The "skip ptr updates timer" is ignored when a subgraph is at most 1000 blocks
14681468
// behind the chain head.
14691469
1000,

graph/src/blockchain/mock.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ impl Blockchain for MockBlockchain {
438438
todo!()
439439
}
440440

441-
fn chain_store(&self) -> std::sync::Arc<dyn crate::components::store::ChainStore> {
441+
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
442442
todo!()
443443
}
444444

graph/src/blockchain/mod.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use crate::{
3131
runtime::{gas::GasCounter, AscHeap, HostExportError},
3232
};
3333
use crate::{
34-
components::store::{BlockNumber, ChainStore},
34+
components::store::BlockNumber,
3535
prelude::{thiserror::Error, LinkResolver},
3636
};
3737
use anyhow::{anyhow, Context, Error};
@@ -196,7 +196,8 @@ pub trait Blockchain: Debug + Sized + Send + Sync + Unpin + 'static {
196196
unified_api_version: UnifiedMappingApiVersion,
197197
) -> Result<Box<dyn BlockStream<Self>>, Error>;
198198

199-
fn chain_store(&self) -> Arc<dyn ChainStore>;
199+
/// Return the pointer for the latest block that we are aware of
200+
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error>;
200201

201202
async fn block_pointer_from_number(
202203
&self,

server/index-node/src/resolver.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -279,10 +279,9 @@ impl<S: Store> IndexNodeResolver<S> {
279279
);
280280
return Ok(r::Value::Null);
281281
};
282-
let chain_store = chain.chain_store();
283282
let call_cache = chain.call_cache();
284283

285-
let (block_number, timestamp) = match chain_store.block_number(&block_hash).await {
284+
let (block_number, timestamp) = match chain.block_number(&block_hash).await {
286285
Ok(Some((_, n, timestamp, _))) => (n, timestamp),
287286
Ok(None) => {
288287
error!(

0 commit comments

Comments
 (0)