Skip to content

Commit b44b7c9

Browse files
committed
graph: Speed up lookups in RowGroup by memoizing last entry for each id
1 parent ae392c9 commit b44b7c9

File tree

2 files changed

+78
-9
lines changed

2 files changed

+78
-9
lines changed

graph/src/components/store/write.rs

Lines changed: 70 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
//! Data structures and helpers for writing subgraph changes to the store
2-
use std::{collections::HashSet, sync::Arc};
2+
use std::{
3+
collections::{HashMap, HashSet},
4+
sync::Arc,
5+
};
36

47
use crate::{
58
blockchain::{block_stream::FirehoseCursor, BlockPtr, BlockTime},
@@ -8,6 +11,7 @@ use crate::{
811
data::{store::Id, subgraph::schema::SubgraphError},
912
data_source::CausalityRegion,
1013
derive::CacheWeight,
14+
env::ENV_VARS,
1115
internal_error,
1216
util::cache_weight::CacheWeight,
1317
};
@@ -154,9 +158,10 @@ impl EntityModification {
154158
}
155159

156160
pub fn creates_entity(&self) -> bool {
161+
use EntityModification::*;
157162
match self {
158-
EntityModification::Insert { .. } => true,
159-
EntityModification::Overwrite { .. } | EntityModification::Remove { .. } => false,
163+
Insert { .. } => true,
164+
Overwrite { .. } | Remove { .. } => false,
160165
}
161166
}
162167

@@ -310,6 +315,10 @@ pub struct RowGroup {
310315
rows: Vec<EntityModification>,
311316

312317
immutable: bool,
318+
319+
/// Map the `key.entity_id` of all entries in `rows` to the index with
320+
/// the most recent entry for that id to speed up lookups
321+
last_mod: HashMap<Id, usize>,
313322
}
314323

315324
impl RowGroup {
@@ -318,6 +327,7 @@ impl RowGroup {
318327
entity_type,
319328
rows: Vec::new(),
320329
immutable,
330+
last_mod: HashMap::new(),
321331
}
322332
}
323333

@@ -374,6 +384,21 @@ impl RowGroup {
374384
}
375385

376386
pub fn last_op(&self, key: &EntityKey, at: BlockNumber) -> Option<EntityOp<'_>> {
387+
if ENV_VARS.store.write_batch_memoize {
388+
let idx = *self.last_mod.get(&key.entity_id)?;
389+
if let Some(op) = self.rows.get(idx).and_then(|emod| {
390+
if emod.block() <= at {
391+
Some(emod.as_entity_op(at))
392+
} else {
393+
None
394+
}
395+
}) {
396+
return Some(op);
397+
}
398+
}
399+
// We are looking for the change at a block `at` that is before the
400+
// change we remember in `last_mod`, and therefore have to scan
401+
// through all changes
377402
self.rows
378403
.iter()
379404
// We are scanning backwards, i.e., in descendng order of
@@ -383,7 +408,14 @@ impl RowGroup {
383408
.map(|emod| emod.as_entity_op(at))
384409
}
385410

411+
/// Return an iterator over all changes that are effective at `at`. That
412+
/// makes it possible to construct the state that the deployment will
413+
/// have once all changes for block `at` have been written.
386414
pub fn effective_ops(&self, at: BlockNumber) -> impl Iterator<Item = EntityOp<'_>> {
415+
// We don't use `self.last_mod` here, because we need to return
416+
// operations for all entities that have pending changes at block
417+
// `at`, and there is no guarantee that `self.last_mod` is visible
418+
// at `at` since the change in `self.last_mod` might come after `at`
387419
let mut seen = HashSet::new();
388420
self.rows
389421
.iter()
@@ -400,7 +432,12 @@ impl RowGroup {
400432

401433
/// Find the most recent entry for `id`
402434
fn prev_row_mut(&mut self, id: &Id) -> Option<&mut EntityModification> {
403-
self.rows.iter_mut().rfind(|emod| emod.id() == id)
435+
if ENV_VARS.store.write_batch_memoize {
436+
let idx = *self.last_mod.get(id)?;
437+
self.rows.get_mut(idx)
438+
} else {
439+
self.rows.iter_mut().rfind(|emod| emod.id() == id)
440+
}
404441
}
405442

406443
/// Append `row` to `self.rows` by combining it with a previously
@@ -433,6 +470,14 @@ impl RowGroup {
433470
));
434471
}
435472

473+
if row.id() != prev_row.id() {
474+
return Err(internal_error!(
475+
"last_mod map is corrupted: got id {} looking up id {}",
476+
prev_row.id(),
477+
row.id()
478+
));
479+
}
480+
436481
// The heart of the matter: depending on what `row` is, clamp
437482
// `prev_row` and either ignore `row` since it is not needed, or
438483
// turn it into an `Insert`, which also does not require
@@ -460,25 +505,31 @@ impl RowGroup {
460505
Insert { .. },
461506
) => {
462507
// prev_row was deleted
463-
self.rows.push(row);
508+
self.push_row(row);
464509
}
465510
(
466511
Insert { end: None, .. } | Overwrite { end: None, .. },
467512
Overwrite { block, .. },
468513
) => {
469514
prev_row.clamp(*block)?;
470-
self.rows.push(row.as_insert(&self.entity_type)?);
515+
let row = row.as_insert(&self.entity_type)?;
516+
self.push_row(row);
471517
}
472518
(Insert { end: None, .. } | Overwrite { end: None, .. }, Remove { block, .. }) => {
473519
prev_row.clamp(*block)?;
474520
}
475521
}
476522
} else {
477-
self.rows.push(row);
523+
self.push_row(row);
478524
}
479525
Ok(())
480526
}
481527

528+
fn push_row(&mut self, row: EntityModification) {
529+
self.last_mod.insert(row.id().clone(), self.rows.len());
530+
self.rows.push(row);
531+
}
532+
482533
fn append(&mut self, group: RowGroup) -> Result<(), StoreError> {
483534
if self.entity_type != group.entity_type {
484535
return Err(internal_error!(
@@ -659,7 +710,7 @@ pub struct Batch {
659710
pub first_block: BlockNumber,
660711
/// The firehose cursor corresponding to `block_ptr`
661712
pub firehose_cursor: FirehoseCursor,
662-
mods: RowGroups,
713+
pub mods: RowGroups,
663714
/// New data sources
664715
pub data_sources: DataSources,
665716
pub deterministic_errors: Vec<SubgraphError>,
@@ -924,6 +975,7 @@ impl<'a> Iterator for WriteChunkIter<'a> {
924975

925976
#[cfg(test)]
926977
mod test {
978+
use std::collections::HashMap;
927979
use std::sync::Arc;
928980

929981
use crate::{
@@ -947,18 +999,27 @@ mod test {
947999

9481000
assert_eq!(values.len(), blocks.len());
9491001

950-
let rows = values
1002+
let rows: Vec<_> = values
9511003
.iter()
9521004
.zip(blocks.iter())
9531005
.map(|(value, block)| EntityModification::Remove {
9541006
key: ROW_GROUP_TYPE.key(Id::String(Word::from(value.to_string()))),
9551007
block: *block,
9561008
})
9571009
.collect();
1010+
let last_mod = rows
1011+
.iter()
1012+
.enumerate()
1013+
.fold(HashMap::new(), |mut map, (idx, emod)| {
1014+
map.insert(emod.id().clone(), idx);
1015+
map
1016+
});
1017+
9581018
let group = RowGroup {
9591019
entity_type: ENTRY_TYPE.clone(),
9601020
rows,
9611021
immutable: false,
1022+
last_mod,
9621023
};
9631024
let act = group
9641025
.clamps_by_block()

graph/src/env/store.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ pub struct EnvVarsStore {
129129
/// is 10_000 which corresponds to 10MB. Setting this to 0 disables
130130
/// write batching.
131131
pub write_batch_size: usize,
132+
/// Whether to memoize the last operation for each entity in a write
133+
/// batch to speed up adding more entities. Set by
134+
/// `GRAPH_STORE_WRITE_BATCH_MEMOIZE`. The default is `true`.
135+
/// Remove after 2025-07-01 if there have been no issues with it.
136+
pub write_batch_memoize: bool,
132137
/// Whether to create GIN indexes for array attributes. Set by
133138
/// `GRAPH_STORE_CREATE_GIN_INDEXES`. The default is `false`
134139
pub create_gin_indexes: bool,
@@ -184,6 +189,7 @@ impl TryFrom<InnerStore> for EnvVarsStore {
184189
connection_min_idle: x.connection_min_idle,
185190
connection_idle_timeout: Duration::from_secs(x.connection_idle_timeout_in_secs),
186191
write_queue_size: x.write_queue_size,
192+
write_batch_memoize: x.write_batch_memoize,
187193
batch_target_duration: Duration::from_secs(x.batch_target_duration_in_secs),
188194
batch_timeout: x.batch_timeout_in_secs.map(Duration::from_secs),
189195
batch_workers: x.batch_workers,
@@ -277,6 +283,8 @@ pub struct InnerStore {
277283
write_batch_duration_in_secs: u64,
278284
#[envconfig(from = "GRAPH_STORE_WRITE_BATCH_SIZE", default = "10000")]
279285
write_batch_size: usize,
286+
#[envconfig(from = "GRAPH_STORE_WRITE_BATCH_MEMOIZE", default = "true")]
287+
write_batch_memoize: bool,
280288
#[envconfig(from = "GRAPH_STORE_CREATE_GIN_INDEXES", default = "false")]
281289
create_gin_indexes: bool,
282290
#[envconfig(from = "GRAPH_STORE_USE_BRIN_FOR_ALL_QUERY_TYPES", default = "false")]

0 commit comments

Comments
 (0)