Skip to content

Commit d26cdf9

Browse files
committed
fix(perf): bulk-read get+read for massive speed
1 parent db18a85 commit d26cdf9

File tree

4 files changed

+88
-31
lines changed

4 files changed

+88
-31
lines changed

Diff for: get.js

+22-21
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
const Promise = require('bluebird')
44

55
const index = require('./lib/entry-index')
6-
const finished = Promise.promisify(require('mississippi').finished)
76
const memo = require('./lib/memoization')
87
const pipe = require('mississippi').pipe
98
const pipeline = require('mississippi').pipeline
@@ -25,33 +24,35 @@ function getData (byDigest, cache, key, opts) {
2524
: memo.get(cache, key)
2625
)
2726
if (memoized && opts.memoize !== false) {
28-
return Promise.resolve({
27+
return Promise.resolve(byDigest ? memoized : {
2928
metadata: memoized.entry.metadata,
3029
data: memoized.data,
3130
digest: memoized.entry.digest,
3231
hashAlgorithm: memoized.entry.hashAlgorithm
3332
})
3433
}
35-
const src = (byDigest ? getStreamDigest : getStream)(cache, key, opts)
36-
let acc = []
37-
let dataTotal = 0
38-
let metadata
39-
let digest
40-
let hashAlgorithm
41-
if (!byDigest) {
42-
src.on('digest', d => {
43-
digest = d
34+
return (
35+
byDigest ? Promise.resolve(null) : index.find(cache, key, opts)
36+
).then(entry => {
37+
if (!entry && !byDigest) {
38+
throw index.notFoundError(cache, key)
39+
}
40+
return read(cache, byDigest ? key : entry.digest, {
41+
hashAlgorithm: byDigest ? opts.hashAlgorithm : entry.hashAlgorithm,
42+
size: opts.size
43+
}).then(data => byDigest ? data : {
44+
metadata: entry.metadata,
45+
data: data,
46+
digest: entry.digest,
47+
hashAlgorithm: entry.hashAlgorithm
48+
}).then(res => {
49+
if (opts.memoize && byDigest) {
50+
memo.put.byDigest(cache, key, opts.hashAlgorithm, res)
51+
} else if (opts.memoize) {
52+
memo.put(cache, entry, res.data)
53+
}
54+
return res
4455
})
45-
src.on('hashAlgorithm', d => { hashAlgorithm = d })
46-
src.on('metadata', d => { metadata = d })
47-
}
48-
src.on('data', d => {
49-
acc.push(d)
50-
dataTotal += d.length
51-
})
52-
return finished(src).then(() => {
53-
const data = Buffer.concat(acc, dataTotal)
54-
return byDigest ? data : { metadata, data, digest, hashAlgorithm }
5556
})
5657
}
5758

Diff for: lib/content/read.js

+35
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,30 @@ const Promise = require('bluebird')
44

55
const checksumStream = require('checksum-stream')
66
const contentPath = require('./path')
7+
const crypto = require('crypto')
78
const fs = require('graceful-fs')
89
const pipeline = require('mississippi').pipeline
910

1011
Promise.promisifyAll(fs)
1112

13+
module.exports = read
14+
function read (cache, address, opts) {
15+
opts = opts || {}
16+
const algo = opts.hashAlgorithm || 'sha512'
17+
const cpath = contentPath(cache, address, algo)
18+
return fs.readFileAsync(cpath, null).then(data => {
19+
const digest = crypto.createHash(algo).update(data).digest('hex')
20+
if (typeof opts.size === 'number' && opts.size !== data.length) {
21+
throw sizeError(opts.size, data.length)
22+
} else if (digest !== address) {
23+
throw checksumError(address, digest)
24+
} else {
25+
return data
26+
}
27+
})
28+
}
29+
30+
module.exports.stream = readStream
1231
module.exports.readStream = readStream
1332
function readStream (cache, address, opts) {
1433
opts = opts || {}
@@ -37,3 +56,19 @@ function hasContent (cache, address, algorithm) {
3756
}
3857
})
3958
}
59+
60+
function sizeError (expected, found) {
61+
var err = new Error('stream data size mismatch')
62+
err.expected = expected
63+
err.found = found
64+
err.code = 'EBADSIZE'
65+
return err
66+
}
67+
68+
function checksumError (expected, found) {
69+
var err = new Error('checksum failed')
70+
err.code = 'EBADCHECKSUM'
71+
err.expected = expected
72+
err.found = found
73+
return err
74+
}

Diff for: test/content.read.js

+23-10
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
const Promise = require('bluebird')
44

5+
const bufferise = require('./util/bufferise')
56
const crypto = require('crypto')
67
const path = require('path')
78
const Tacks = require('tacks')
@@ -13,45 +14,57 @@ const CacheContent = require('./util/cache-content')
1314

1415
const read = require('../lib/content/read')
1516

16-
test('readStream: returns a stream with cache content data', function (t) {
17-
const CONTENT = 'foobarbaz'
17+
test('read: returns a Promise with cache content data', function (t) {
18+
const CONTENT = bufferise('foobarbaz')
1819
const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex')
1920
const fixture = new Tacks(CacheContent({
2021
[DIGEST]: CONTENT
2122
}))
2223
fixture.create(CACHE)
23-
const stream = read.readStream(CACHE, DIGEST)
24+
return read(CACHE, DIGEST).then(data => {
25+
t.deepEqual(data, CONTENT, 'cache contents read correctly')
26+
})
27+
})
28+
29+
test('read.stream: returns a stream with cache content data', function (t) {
30+
const CONTENT = bufferise('foobarbaz')
31+
const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex')
32+
const fixture = new Tacks(CacheContent({
33+
[DIGEST]: CONTENT
34+
}))
35+
fixture.create(CACHE)
36+
const stream = read.stream(CACHE, DIGEST)
2437
stream.on('error', function (e) { throw e })
2538
let buf = ''
2639
stream.on('data', function (data) { buf += data })
2740
stream.on('end', function () {
2841
t.ok(true, 'stream completed successfully')
29-
t.equal(CONTENT, buf, 'cache contents read correctly')
42+
t.deepEqual(bufferise(buf), CONTENT, 'cache contents read correctly')
3043
t.end()
3144
})
3245
})
3346

34-
test('readStream: allows hashAlgorithm configuration', function (t) {
47+
test('read.stream: allows hashAlgorithm configuration', function (t) {
3548
const CONTENT = 'foobarbaz'
3649
const HASH = 'whirlpool'
3750
const DIGEST = crypto.createHash(HASH).update(CONTENT).digest('hex')
3851
const fixture = new Tacks(CacheContent({
3952
[DIGEST]: CONTENT
4053
}, HASH))
4154
fixture.create(CACHE)
42-
const stream = read.readStream(CACHE, DIGEST, { hashAlgorithm: HASH })
55+
const stream = read.stream(CACHE, DIGEST, { hashAlgorithm: HASH })
4356
stream.on('error', function (e) { throw e })
4457
let buf = ''
4558
stream.on('data', function (data) { buf += data })
4659
stream.on('end', function () {
4760
t.ok(true, 'stream completed successfully, off a sha512')
48-
t.equal(CONTENT, buf, 'cache contents read correctly')
61+
t.deepEqual(buf, CONTENT, 'cache contents read correctly')
4962
t.end()
5063
})
5164
})
5265

53-
test('readStream: errors if content missing', function (t) {
54-
const stream = read.readStream(CACHE, 'whatnot')
66+
test('read.stream: errors if content missing', function (t) {
67+
const stream = read.stream(CACHE, 'whatnot')
5568
stream.on('error', function (e) {
5669
t.ok(e, 'got an error!')
5770
t.equal(e.code, 'ENOENT', 'error uses ENOENT error code')
@@ -65,7 +78,7 @@ test('readStream: errors if content missing', function (t) {
6578
})
6679
})
6780

68-
test('readStream: errors if content fails checksum', function (t) {
81+
test('read.stream: errors if content fails checksum', function (t) {
6982
const CONTENT = 'foobarbaz'
7083
const DIGEST = crypto.createHash('sha512').update(CONTENT).digest('hex')
7184
const fixture = new Tacks(CacheContent({

Diff for: test/util/bufferise.js

+8
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
'use strict'
2+
3+
module.exports = bufferise
4+
function bufferise (string) {
5+
return Buffer.from
6+
? Buffer.from(string, 'utf8')
7+
: new Buffer(string, 'utf8')
8+
}

0 commit comments

Comments
 (0)