Skip to content

Commit e29f1df

Browse files
committed
graph: Move IPFS caching functionality to an IpfsClient
1 parent bca8613 commit e29f1df

File tree

2 files changed

+192
-0
lines changed

2 files changed

+192
-0
lines changed

graph/src/ipfs/cache.rs

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
use std::{
2+
path::PathBuf,
3+
sync::{Arc, Mutex},
4+
time::Duration,
5+
};
6+
7+
use async_trait::async_trait;
8+
use bytes::Bytes;
9+
use graph_derive::CheapClone;
10+
use lru_time_cache::LruCache;
11+
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
12+
use slog::{warn, Logger};
13+
14+
use crate::prelude::CheapClone;
15+
16+
use super::{ContentPath, IpfsClient, IpfsRequest, IpfsResponse, IpfsResult, RetryPolicy};
17+
18+
#[derive(Clone, CheapClone)]
19+
enum Cache {
20+
Memory {
21+
cache: Arc<Mutex<LruCache<ContentPath, Bytes>>>,
22+
max_entry_size: usize,
23+
},
24+
Disk {
25+
store: Arc<dyn ObjectStore>,
26+
},
27+
}
28+
29+
fn log_err(logger: &Logger, e: &object_store::Error, log_not_found: bool) {
30+
if log_not_found || !matches!(e, object_store::Error::NotFound { .. }) {
31+
warn!(
32+
logger,
33+
"Failed to get IPFS object from disk cache; fetching from IPFS";
34+
"error" => e.to_string(),
35+
);
36+
}
37+
}
38+
39+
impl Cache {
40+
fn new(capacity: usize, max_entry_size: usize, path: Option<PathBuf>) -> Self {
41+
match path {
42+
Some(path) => {
43+
let fs = match LocalFileSystem::new_with_prefix(&path) {
44+
Err(e) => {
45+
panic!(
46+
"Failed to create IPFS file based cache at {}: {}",
47+
path.display(),
48+
e
49+
);
50+
}
51+
Ok(fs) => fs,
52+
};
53+
Cache::Disk {
54+
store: Arc::new(fs),
55+
}
56+
}
57+
None => Self::Memory {
58+
cache: Arc::new(Mutex::new(LruCache::with_capacity(capacity))),
59+
max_entry_size,
60+
},
61+
}
62+
}
63+
64+
async fn find(&self, logger: &Logger, path: &ContentPath) -> Option<Bytes> {
65+
match self {
66+
Cache::Memory {
67+
cache,
68+
max_entry_size: _,
69+
} => cache.lock().unwrap().get(path).cloned(),
70+
Cache::Disk { store } => {
71+
let log_err = |e: &object_store::Error| log_err(logger, e, false);
72+
73+
let path = Self::disk_path(path);
74+
let object = store.get(&path).await.inspect_err(log_err).ok()?;
75+
let data = object.bytes().await.inspect_err(log_err).ok()?;
76+
Some(data)
77+
}
78+
}
79+
}
80+
81+
async fn insert(&self, logger: &Logger, path: ContentPath, data: Bytes) {
82+
match self {
83+
Cache::Memory { max_entry_size, .. } if data.len() > *max_entry_size => {
84+
return;
85+
}
86+
Cache::Memory { cache, .. } => {
87+
let mut cache = cache.lock().unwrap();
88+
89+
if !cache.contains_key(&path) {
90+
cache.insert(path.clone(), data.clone());
91+
}
92+
}
93+
Cache::Disk { store } => {
94+
let log_err = |e: &object_store::Error| log_err(logger, e, true);
95+
let path = Self::disk_path(&path);
96+
store
97+
.put(&path, data.into())
98+
.await
99+
.inspect_err(log_err)
100+
.ok();
101+
}
102+
}
103+
}
104+
105+
/// The path where we cache content on disk
106+
fn disk_path(path: &ContentPath) -> Path {
107+
Path::from(path.to_string())
108+
}
109+
}
110+
111+
/// An IPFS client that caches the results of `cat` and `get_block` calls in
112+
/// memory or on disk, depending on settings in the environment.
113+
///
114+
/// The cache is used to avoid repeated calls to the IPFS API for the same
115+
/// content.
116+
pub struct CachingClient {
117+
client: Arc<dyn IpfsClient>,
118+
cache: Cache,
119+
}
120+
121+
impl CachingClient {
122+
#[allow(dead_code)]
123+
pub fn new(
124+
client: Arc<dyn IpfsClient>,
125+
max_entry_size: usize,
126+
cache_capacity: usize,
127+
cache_path: Option<PathBuf>,
128+
) -> Self {
129+
let cache = Cache::new(cache_capacity, max_entry_size, cache_path);
130+
CachingClient { client, cache }
131+
}
132+
133+
async fn with_cache<F>(&self, path: &ContentPath, f: F) -> IpfsResult<Bytes>
134+
where
135+
F: AsyncFnOnce() -> IpfsResult<Bytes>,
136+
{
137+
if let Some(data) = self.cache.find(self.logger(), path).await {
138+
return Ok(data);
139+
}
140+
141+
let data = f().await?;
142+
self.cache
143+
.insert(self.logger(), path.clone(), data.clone())
144+
.await;
145+
Ok(data)
146+
}
147+
}
148+
149+
#[async_trait]
150+
impl IpfsClient for CachingClient {
151+
fn logger(&self) -> &Logger {
152+
self.client.logger()
153+
}
154+
155+
async fn call(self: Arc<Self>, req: IpfsRequest) -> IpfsResult<IpfsResponse> {
156+
self.client.cheap_clone().call(req).await
157+
}
158+
159+
async fn cat(
160+
self: Arc<Self>,
161+
path: &ContentPath,
162+
max_size: usize,
163+
timeout: Option<Duration>,
164+
retry_policy: RetryPolicy,
165+
) -> IpfsResult<Bytes> {
166+
self.with_cache(path, async || {
167+
{
168+
self.client
169+
.cheap_clone()
170+
.cat(path, max_size, timeout, retry_policy)
171+
.await
172+
}
173+
})
174+
.await
175+
}
176+
177+
async fn get_block(
178+
self: Arc<Self>,
179+
path: &ContentPath,
180+
timeout: Option<Duration>,
181+
retry_policy: RetryPolicy,
182+
) -> IpfsResult<Bytes> {
183+
self.with_cache(path, async || {
184+
self.client
185+
.cheap_clone()
186+
.get_block(path, timeout, retry_policy)
187+
.await
188+
})
189+
.await
190+
}
191+
}

graph/src/ipfs/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use slog::Logger;
99

1010
use crate::util::security::SafeDisplay;
1111

12+
mod cache;
1213
mod client;
1314
mod content_path;
1415
mod error;

0 commit comments

Comments
 (0)