Skip to content

Commit 11daed0

Browse files
committed
all: Reduce the ChainStore interface for basic blockchains
1 parent 7a50938 commit 11daed0

File tree

12 files changed

+199
-162
lines changed

12 files changed

+199
-162
lines changed

chain/ethereum/src/chain.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -585,7 +585,7 @@ impl Blockchain for Chain {
585585
let ingestor: Box<dyn BlockIngestor> = match self.chain_client().as_ref() {
586586
ChainClient::Firehose(_) => {
587587
let ingestor = FirehoseBlockIngestor::<HeaderOnlyBlock, Self>::new(
588-
self.chain_store.cheap_clone(),
588+
self.chain_store.cheap_clone().as_head_store(),
589589
self.chain_client(),
590590
self.logger_factory
591591
.component_logger("EthereumFirehoseBlockIngestor", None),

chain/near/src/chain.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use graph::blockchain::{
88
};
99
use graph::cheap_clone::CheapClone;
1010
use graph::components::network_provider::ChainName;
11-
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
11+
use graph::components::store::{ChainHeadStore, DeploymentCursorTracker, SourceableStore};
1212
use graph::data::subgraph::UnifiedMappingApiVersion;
1313
use graph::env::EnvVars;
1414
use graph::firehose::FirehoseEndpoint;
@@ -29,7 +29,7 @@ use graph::{
2929
},
3030
components::store::DeploymentLocator,
3131
firehose::{self as firehose, ForkStep},
32-
prelude::{async_trait, o, BlockNumber, ChainStore, Error, Logger, LoggerFactory},
32+
prelude::{async_trait, o, BlockNumber, Error, Logger, LoggerFactory},
3333
};
3434
use prost::Message;
3535
use std::collections::BTreeSet;
@@ -165,7 +165,7 @@ pub struct Chain {
165165
logger_factory: LoggerFactory,
166166
name: ChainName,
167167
client: Arc<ChainClient<Self>>,
168-
chain_store: Arc<dyn ChainStore>,
168+
chain_head_store: Arc<dyn ChainHeadStore>,
169169
metrics_registry: Arc<MetricsRegistry>,
170170
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
171171
prefer_substreams: bool,
@@ -183,7 +183,7 @@ impl BlockchainBuilder<Chain> for BasicBlockchainBuilder {
183183
Chain {
184184
logger_factory: self.logger_factory,
185185
name: self.name,
186-
chain_store: self.chain_store,
186+
chain_head_store: self.chain_head_store,
187187
client: Arc::new(ChainClient::new_firehose(self.firehose_endpoints)),
188188
metrics_registry: self.metrics_registry,
189189
block_stream_builder: Arc::new(NearStreamBuilder {}),
@@ -276,7 +276,7 @@ impl Blockchain for Chain {
276276
}
277277

278278
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
279-
self.chain_store.cheap_clone().chain_head_ptr().await
279+
self.chain_head_store.cheap_clone().chain_head_ptr().await
280280
}
281281

282282
async fn block_pointer_from_number(
@@ -302,7 +302,7 @@ impl Blockchain for Chain {
302302

303303
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
304304
let ingestor = FirehoseBlockIngestor::<crate::HeaderOnlyBlock, Self>::new(
305-
self.chain_store.cheap_clone(),
305+
self.chain_head_store.cheap_clone(),
306306
self.chain_client(),
307307
self.logger_factory
308308
.component_logger("NearFirehoseBlockIngestor", None),

chain/substreams/src/block_ingestor.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,14 @@ use graph::blockchain::{
88
client::ChainClient, substreams_block_stream::SubstreamsBlockStream, BlockIngestor,
99
};
1010
use graph::components::network_provider::ChainName;
11+
use graph::components::store::ChainHeadStore;
1112
use graph::prelude::MetricsRegistry;
1213
use graph::slog::trace;
1314
use graph::substreams::Package;
1415
use graph::tokio_stream::StreamExt;
1516
use graph::{
1617
blockchain::block_stream::BlockStreamEvent,
1718
cheap_clone::CheapClone,
18-
components::store::ChainStore,
1919
prelude::{async_trait, error, info, DeploymentHash, Logger},
2020
util::backoff::ExponentialBackoff,
2121
};
@@ -26,7 +26,7 @@ const SUBSTREAMS_HEAD_TRACKER_BYTES: &[u8; 89935] = include_bytes!(
2626
);
2727

2828
pub struct SubstreamsBlockIngestor {
29-
chain_store: Arc<dyn ChainStore>,
29+
chain_store: Arc<dyn ChainHeadStore>,
3030
client: Arc<ChainClient<super::Chain>>,
3131
logger: Logger,
3232
chain_name: ChainName,
@@ -35,7 +35,7 @@ pub struct SubstreamsBlockIngestor {
3535

3636
impl SubstreamsBlockIngestor {
3737
pub fn new(
38-
chain_store: Arc<dyn ChainStore>,
38+
chain_store: Arc<dyn ChainHeadStore>,
3939
client: Arc<ChainClient<super::Chain>>,
4040
logger: Logger,
4141
chain_name: ChainName,

chain/substreams/src/chain.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use graph::blockchain::{
77
NoopRuntimeAdapter, TriggerFilterWrapper,
88
};
99
use graph::components::network_provider::ChainName;
10-
use graph::components::store::{DeploymentCursorTracker, SourceableStore};
10+
use graph::components::store::{ChainHeadStore, DeploymentCursorTracker, SourceableStore};
1111
use graph::env::EnvVars;
1212
use graph::prelude::{BlockHash, CheapClone, Entity, LoggerFactory, MetricsRegistry};
1313
use graph::schema::EntityKey;
@@ -19,7 +19,7 @@ use graph::{
1919
},
2020
components::store::DeploymentLocator,
2121
data::subgraph::UnifiedMappingApiVersion,
22-
prelude::{async_trait, BlockNumber, ChainStore},
22+
prelude::{async_trait, BlockNumber},
2323
slog::Logger,
2424
};
2525

@@ -65,7 +65,7 @@ impl blockchain::Block for Block {
6565
}
6666

6767
pub struct Chain {
68-
chain_store: Arc<dyn ChainStore>,
68+
chain_head_store: Arc<dyn ChainHeadStore>,
6969
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
7070
chain_id: ChainName,
7171

@@ -79,15 +79,15 @@ impl Chain {
7979
logger_factory: LoggerFactory,
8080
chain_client: Arc<ChainClient<Self>>,
8181
metrics_registry: Arc<MetricsRegistry>,
82-
chain_store: Arc<dyn ChainStore>,
82+
chain_store: Arc<dyn ChainHeadStore>,
8383
block_stream_builder: Arc<dyn BlockStreamBuilder<Self>>,
8484
chain_id: ChainName,
8585
) -> Self {
8686
Self {
8787
logger_factory,
8888
client: chain_client,
8989
metrics_registry,
90-
chain_store,
90+
chain_head_store: chain_store,
9191
block_stream_builder,
9292
chain_id,
9393
}
@@ -168,7 +168,7 @@ impl Blockchain for Chain {
168168
}
169169

170170
async fn chain_head_ptr(&self) -> Result<Option<BlockPtr>, Error> {
171-
self.chain_store.cheap_clone().chain_head_ptr().await
171+
self.chain_head_store.cheap_clone().chain_head_ptr().await
172172
}
173173

174174
async fn block_pointer_from_number(
@@ -195,7 +195,7 @@ impl Blockchain for Chain {
195195

196196
async fn block_ingestor(&self) -> anyhow::Result<Box<dyn BlockIngestor>> {
197197
Ok(Box::new(SubstreamsBlockIngestor::new(
198-
self.chain_store.cheap_clone(),
198+
self.chain_head_store.cheap_clone(),
199199
self.client.cheap_clone(),
200200
self.logger_factory
201201
.component_logger("SubstreamsBlockIngestor", None),
@@ -211,13 +211,13 @@ impl blockchain::BlockchainBuilder<super::Chain> for BasicBlockchainBuilder {
211211
let BasicBlockchainBuilder {
212212
logger_factory,
213213
name,
214-
chain_store,
214+
chain_head_store,
215215
firehose_endpoints,
216216
metrics_registry,
217217
} = self;
218218

219219
Chain {
220-
chain_store,
220+
chain_head_store,
221221
block_stream_builder: Arc::new(crate::BlockStreamBuilder::new()),
222222
logger_factory,
223223
client: Arc::new(ChainClient::new_firehose(firehose_endpoints)),

graph/src/blockchain/builder.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,11 @@ use tonic::async_trait;
22

33
use super::Blockchain;
44
use crate::{
5-
components::store::ChainStore, data::value::Word, env::EnvVars, firehose::FirehoseEndpoints,
6-
prelude::LoggerFactory, prelude::MetricsRegistry,
5+
components::store::ChainHeadStore,
6+
data::value::Word,
7+
env::EnvVars,
8+
firehose::FirehoseEndpoints,
9+
prelude::{LoggerFactory, MetricsRegistry},
710
};
811
use std::sync::Arc;
912

@@ -12,7 +15,7 @@ use std::sync::Arc;
1215
pub struct BasicBlockchainBuilder {
1316
pub logger_factory: LoggerFactory,
1417
pub name: Word,
15-
pub chain_store: Arc<dyn ChainStore>,
18+
pub chain_head_store: Arc<dyn ChainHeadStore>,
1619
pub firehose_endpoints: FirehoseEndpoints,
1720
pub metrics_registry: Arc<MetricsRegistry>,
1821
}

graph/src/blockchain/firehose_block_ingestor.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use std::{marker::PhantomData, sync::Arc, time::Duration};
22

33
use crate::{
44
blockchain::Block as BlockchainBlock,
5-
components::store::ChainStore,
5+
components::store::ChainHeadStore,
66
firehose::{self, decode_firehose_block, HeaderOnly},
77
prelude::{error, info, Logger},
88
util::backoff::ExponentialBackoff,
@@ -40,7 +40,7 @@ pub struct FirehoseBlockIngestor<M, C: Blockchain>
4040
where
4141
M: prost::Message + BlockchainBlock + Default + 'static,
4242
{
43-
chain_store: Arc<dyn ChainStore>,
43+
chain_head_store: Arc<dyn ChainHeadStore>,
4444
client: Arc<ChainClient<C>>,
4545
logger: Logger,
4646
default_transforms: Vec<Transforms>,
@@ -54,13 +54,13 @@ where
5454
M: prost::Message + BlockchainBlock + Default + 'static,
5555
{
5656
pub fn new(
57-
chain_store: Arc<dyn ChainStore>,
57+
chain_head_store: Arc<dyn ChainHeadStore>,
5858
client: Arc<ChainClient<C>>,
5959
logger: Logger,
6060
chain_name: ChainName,
6161
) -> FirehoseBlockIngestor<M, C> {
6262
FirehoseBlockIngestor {
63-
chain_store,
63+
chain_head_store,
6464
client,
6565
logger,
6666
phantom: PhantomData {},
@@ -78,7 +78,7 @@ where
7878
let mut backoff =
7979
ExponentialBackoff::new(Duration::from_millis(250), Duration::from_secs(30));
8080
loop {
81-
match self.chain_store.clone().chain_head_cursor() {
81+
match self.chain_head_store.clone().chain_head_cursor() {
8282
Ok(cursor) => return cursor.unwrap_or_default(),
8383
Err(e) => {
8484
error!(self.logger, "Fetching chain head cursor failed: {:#}", e);
@@ -149,7 +149,7 @@ where
149149

150150
trace!(self.logger, "Received new block to ingest {}", block.ptr());
151151

152-
self.chain_store
152+
self.chain_head_store
153153
.clone()
154154
.set_chain_head(block, response.cursor.clone())
155155
.await

graph/src/blockchain/mock.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use crate::{
22
bail,
33
components::{
44
link_resolver::LinkResolver,
5-
store::{BlockNumber, DeploymentCursorTracker, DeploymentLocator, SourceableStore},
5+
store::{
6+
BlockNumber, ChainHeadStore, DeploymentCursorTracker, DeploymentLocator,
7+
SourceableStore,
8+
},
69
subgraph::InstanceDSTemplateInfo,
710
},
811
data::subgraph::UnifiedMappingApiVersion,
@@ -471,6 +474,23 @@ pub struct MockChainStore {
471474
pub blocks: BTreeMap<BlockNumber, Vec<ExtendedBlockPtr>>,
472475
}
473476

477+
#[async_trait]
478+
impl ChainHeadStore for MockChainStore {
479+
async fn chain_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error> {
480+
unimplemented!()
481+
}
482+
fn chain_head_cursor(&self) -> Result<Option<String>, Error> {
483+
unimplemented!()
484+
}
485+
async fn set_chain_head(
486+
self: Arc<Self>,
487+
_block: Arc<dyn Block>,
488+
_cursor: String,
489+
) -> Result<(), Error> {
490+
unimplemented!()
491+
}
492+
}
493+
474494
#[async_trait]
475495
impl ChainStore for MockChainStore {
476496
async fn block_ptrs_by_numbers(
@@ -502,19 +522,6 @@ impl ChainStore for MockChainStore {
502522
) -> Result<Option<H256>, Error> {
503523
unimplemented!()
504524
}
505-
async fn chain_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error> {
506-
unimplemented!()
507-
}
508-
fn chain_head_cursor(&self) -> Result<Option<String>, Error> {
509-
unimplemented!()
510-
}
511-
async fn set_chain_head(
512-
self: Arc<Self>,
513-
_block: Arc<dyn Block>,
514-
_cursor: String,
515-
) -> Result<(), Error> {
516-
unimplemented!()
517-
}
518525
async fn blocks(self: Arc<Self>, _hashes: Vec<BlockHash>) -> Result<Vec<Value>, Error> {
519526
unimplemented!()
520527
}
@@ -565,4 +572,7 @@ impl ChainStore for MockChainStore {
565572
fn set_chain_identifier(&self, _ident: &ChainIdentifier) -> Result<(), Error> {
566573
unimplemented!()
567574
}
575+
fn as_head_store(self: Arc<Self>) -> Arc<dyn ChainHeadStore> {
576+
self.clone()
577+
}
568578
}

graph/src/components/store/traits.rs

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -454,9 +454,36 @@ pub trait BlockStore: Send + Sync + 'static {
454454
fn chain_store(&self, network: &str) -> Option<Arc<Self::ChainStore>>;
455455
}
456456

457+
/// An interface for tracking the chain head in the store used by most chain
458+
/// implementations
459+
#[async_trait]
460+
pub trait ChainHeadStore: Send + Sync {
461+
/// Get the current head block pointer for this chain.
462+
/// Any changes to the head block pointer will be to a block with a larger block number, never
463+
/// to a block with a smaller or equal block number.
464+
///
465+
/// The head block pointer will be None on initial set up.
466+
async fn chain_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error>;
467+
468+
/// Get the current head block cursor for this chain.
469+
///
470+
/// The head block cursor will be None on initial set up.
471+
fn chain_head_cursor(&self) -> Result<Option<String>, Error>;
472+
473+
/// This method does actually three operations:
474+
/// - Upserts received block into blocks table
475+
/// - Update chain head block into networks table
476+
/// - Update chain head cursor into networks table
477+
async fn set_chain_head(
478+
self: Arc<Self>,
479+
block: Arc<dyn Block>,
480+
cursor: String,
481+
) -> Result<(), Error>;
482+
}
483+
457484
/// Common trait for blockchain store implementations.
458485
#[async_trait]
459-
pub trait ChainStore: Send + Sync + 'static {
486+
pub trait ChainStore: ChainHeadStore {
460487
/// Get a pointer to this blockchain's genesis block.
461488
fn genesis_block_ptr(&self) -> Result<BlockPtr, Error>;
462489

@@ -486,28 +513,6 @@ pub trait ChainStore: Send + Sync + 'static {
486513
ancestor_count: BlockNumber,
487514
) -> Result<Option<H256>, Error>;
488515

489-
/// Get the current head block pointer for this chain.
490-
/// Any changes to the head block pointer will be to a block with a larger block number, never
491-
/// to a block with a smaller or equal block number.
492-
///
493-
/// The head block pointer will be None on initial set up.
494-
async fn chain_head_ptr(self: Arc<Self>) -> Result<Option<BlockPtr>, Error>;
495-
496-
/// Get the current head block cursor for this chain.
497-
///
498-
/// The head block cursor will be None on initial set up.
499-
fn chain_head_cursor(&self) -> Result<Option<String>, Error>;
500-
501-
/// This method does actually three operations:
502-
/// - Upserts received block into blocks table
503-
/// - Update chain head block into networks table
504-
/// - Update chain head cursor into networks table
505-
async fn set_chain_head(
506-
self: Arc<Self>,
507-
block: Arc<dyn Block>,
508-
cursor: String,
509-
) -> Result<(), Error>;
510-
511516
/// Returns the blocks present in the store.
512517
async fn blocks(
513518
self: Arc<Self>,
@@ -587,6 +592,10 @@ pub trait ChainStore: Send + Sync + 'static {
587592

588593
/// Update the chain identifier for this store.
589594
fn set_chain_identifier(&self, ident: &ChainIdentifier) -> Result<(), Error>;
595+
596+
/// Workaround for Rust issue #65991 that keeps us from using an
597+
/// `Arc<dyn ChainStore>` as an `Arc<dyn ChainHeadStore>`
598+
fn as_head_store(self: Arc<Self>) -> Arc<dyn ChainHeadStore>;
590599
}
591600

592601
pub trait EthereumCallCache: Send + Sync + 'static {

0 commit comments

Comments
 (0)