Skip to content

Commit bf78bc4

Browse files
committed
graph: Remove caching from IpfsResolver
Since the IpsClient is now responsible for caching, we shouldn't also cache in IpfsResolver
1 parent f8f8b03 commit bf78bc4

File tree

1 file changed

+1
-115
lines changed

1 file changed

+1
-115
lines changed

graph/src/components/link_resolver/ipfs.rs

Lines changed: 1 addition & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use std::path::PathBuf;
21
use std::sync::Arc;
3-
use std::sync::Mutex;
42
use std::time::Duration;
53

64
use anyhow::anyhow;
@@ -10,10 +8,6 @@ use derivative::Derivative;
108
use futures03::compat::Stream01CompatExt;
119
use futures03::stream::StreamExt;
1210
use futures03::stream::TryStreamExt;
13-
use lru_time_cache::LruCache;
14-
use object_store::local::LocalFileSystem;
15-
use object_store::path::Path;
16-
use object_store::ObjectStore;
1711
use serde_json::Value;
1812

1913
use crate::derive::CheapClone;
@@ -28,99 +22,15 @@ use crate::ipfs::IpfsClient;
2822
use crate::ipfs::RetryPolicy;
2923
use crate::prelude::{LinkResolver as LinkResolverTrait, *};
3024

31-
#[derive(Clone, CheapClone)]
32-
enum Cache {
33-
Memory {
34-
cache: Arc<Mutex<LruCache<ContentPath, Vec<u8>>>>,
35-
},
36-
Disk {
37-
store: Arc<dyn ObjectStore>,
38-
},
39-
}
40-
41-
fn log_err(logger: &Logger, e: &object_store::Error, log_not_found: bool) {
42-
if log_not_found || !matches!(e, object_store::Error::NotFound { .. }) {
43-
warn!(
44-
logger,
45-
"Failed to get IPFS object from disk cache; fetching from IPFS";
46-
"error" => e.to_string(),
47-
);
48-
}
49-
}
50-
51-
impl Cache {
52-
fn new(capacity: usize, path: Option<PathBuf>) -> Self {
53-
match path {
54-
Some(path) => {
55-
let fs = match LocalFileSystem::new_with_prefix(&path) {
56-
Err(e) => {
57-
panic!(
58-
"Failed to create IPFS file based cache at {}: {}",
59-
path.display(),
60-
e
61-
);
62-
}
63-
Ok(fs) => fs,
64-
};
65-
Cache::Disk {
66-
store: Arc::new(fs),
67-
}
68-
}
69-
None => Self::Memory {
70-
cache: Arc::new(Mutex::new(LruCache::with_capacity(capacity))),
71-
},
72-
}
73-
}
74-
75-
async fn find(&self, logger: &Logger, path: &ContentPath) -> Option<Vec<u8>> {
76-
match self {
77-
Cache::Memory { cache } => cache.lock().unwrap().get(path).cloned(),
78-
Cache::Disk { store } => {
79-
let log_err = |e: &object_store::Error| log_err(logger, e, false);
80-
81-
let path = Path::from(path.cid().to_string());
82-
let object = store.get(&path).await.inspect_err(log_err).ok()?;
83-
let data = object.bytes().await.inspect_err(log_err).ok()?;
84-
Some(data.to_vec())
85-
}
86-
}
87-
}
88-
89-
async fn insert(&self, logger: &Logger, path: ContentPath, data: Vec<u8>) {
90-
match self {
91-
Cache::Memory { cache } => {
92-
let mut cache = cache.lock().unwrap();
93-
94-
if !cache.contains_key(&path) {
95-
cache.insert(path.clone(), data.clone());
96-
}
97-
}
98-
Cache::Disk { store } => {
99-
let log_err = |e: &object_store::Error| log_err(logger, e, true);
100-
let path = Path::from(path.cid().to_string());
101-
store
102-
.put(&path, data.into())
103-
.await
104-
.inspect_err(log_err)
105-
.ok();
106-
}
107-
}
108-
}
109-
}
110-
11125
#[derive(Clone, CheapClone, Derivative)]
11226
#[derivative(Debug)]
11327
pub struct IpfsResolver {
11428
#[derivative(Debug = "ignore")]
11529
client: Arc<dyn IpfsClient>,
11630

117-
#[derivative(Debug = "ignore")]
118-
cache: Cache,
119-
12031
timeout: Duration,
12132
max_file_size: usize,
12233
max_map_file_size: usize,
123-
max_cache_file_size: usize,
12434

12535
/// When set to `true`, it means infinite retries, ignoring the timeout setting.
12636
retry: bool,
@@ -132,14 +42,9 @@ impl IpfsResolver {
13242

13343
Self {
13444
client,
135-
cache: Cache::new(
136-
env.max_ipfs_cache_size as usize,
137-
env.ipfs_cache_location.clone(),
138-
),
13945
timeout: env.ipfs_timeout,
14046
max_file_size: env.max_ipfs_file_bytes,
14147
max_map_file_size: env.max_ipfs_map_file_size,
142-
max_cache_file_size: env.max_ipfs_cache_file_size,
14348
retry: false,
14449
}
14550
}
@@ -159,18 +64,10 @@ impl LinkResolverTrait for IpfsResolver {
15964
Box::new(s)
16065
}
16166

162-
async fn cat(&self, logger: &Logger, link: &Link) -> Result<Vec<u8>, Error> {
67+
async fn cat(&self, _logger: &Logger, link: &Link) -> Result<Vec<u8>, Error> {
16368
let path = ContentPath::new(&link.link)?;
16469
let timeout = self.timeout;
16570
let max_file_size = self.max_file_size;
166-
let max_cache_file_size = self.max_cache_file_size;
167-
168-
if let Some(data) = self.cache.find(&logger, &path).await {
169-
trace!(logger, "IPFS cat cache hit"; "hash" => path.to_string());
170-
return Ok(data.to_owned());
171-
}
172-
173-
trace!(logger, "IPFS cat cache miss"; "hash" => path.to_string());
17471

17572
let (timeout, retry_policy) = if self.retry {
17673
(None, RetryPolicy::NonDeterministic)
@@ -185,17 +82,6 @@ impl LinkResolverTrait for IpfsResolver {
18582
.await?
18683
.to_vec();
18784

188-
if data.len() <= max_cache_file_size {
189-
self.cache.insert(&logger, path.clone(), data.clone()).await;
190-
} else {
191-
debug!(
192-
logger,
193-
"IPFS file too large for cache";
194-
"path" => path.to_string(),
195-
"size" => data.len(),
196-
);
197-
}
198-
19985
Ok(data)
20086
}
20187

0 commit comments

Comments
 (0)