Skip to content
This repository was archived by the owner on Feb 12, 2024. It is now read-only.

Commit 6066c97

Browse files
author
Alan Shaw
authored
fix: race condition causing Database is not open error (#1834)
[`cleanup`](https://github.com/ipfs/js-ipfs/blob/7d9b006d0b0542651cbaa540d5f22a0112ae09bd/src/cli/bin.js#L109) closes the DB but `yargs-promise` does not wait for async stuff in the command handler to finish, and executes that promise chain immediately after the handler is executed. So it's a race condition. In windows, _sometimes_, the database is closed before the async stuff from the handler completes. This PR changes the CLI command handlers to always pass a promise to `resolve` function that `yargs-promise` adds to the context (`argv`). This makes `yargs-promise` wait for it to be resolved before continuing the promise chain and closing the DB. Since I had to edit all of the commands to make them use the `resolve` function and introduce promises, I decided to take the opportunity to refactor the CLI commands to use async/await. It should help towards #1670. License: MIT Signed-off-by: Alan Shaw <[email protected]>
1 parent 00cb494 commit 6066c97

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+563
-742
lines changed

src/cli/bin.js

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,11 @@ if (args[0] === 'daemon' || args[0] === 'init') {
5959
.completion()
6060
.command(require('./commands/daemon'))
6161
.command(require('./commands/init'))
62-
.parse(args)
62+
63+
new YargsPromise(cli).parse(args)
64+
.then(({ data }) => {
65+
if (data) print(data)
66+
})
6367
} else {
6468
// here we have to make a separate yargs instance with
6569
// only the `api` option because we need this before doing

src/cli/commands/add.js

Lines changed: 66 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -2,52 +2,56 @@
22

33
const sortBy = require('lodash/sortBy')
44
const pull = require('pull-stream')
5-
const getFolderSize = require('get-folder-size')
5+
const promisify = require('promisify-es6')
6+
const getFolderSize = promisify(require('get-folder-size'))
67
const byteman = require('byteman')
7-
const reduce = require('async/reduce')
88
const mh = require('multihashes')
99
const multibase = require('multibase')
1010
const toPull = require('stream-to-pull-stream')
1111
const { print, isDaemonOn, createProgressBar } = require('../utils')
1212
const { cidToString } = require('../../utils/cid')
1313
const globSource = require('../../utils/files/glob-source')
1414

15-
function getTotalBytes (paths, cb) {
16-
reduce(paths, 0, (total, path, cb) => {
17-
getFolderSize(path, (err, size) => {
18-
if (err) return cb(err)
19-
cb(null, total + size)
20-
})
21-
}, cb)
15+
async function getTotalBytes (paths, cb) {
16+
const sizes = await Promise.all(paths.map(p => getFolderSize(p)))
17+
return sizes.reduce((total, size) => total + size, 0)
2218
}
2319

2420
function addPipeline (source, addStream, options) {
25-
pull(
26-
source,
27-
addStream,
28-
pull.collect((err, added) => {
29-
if (err) {
30-
// Tweak the error message and add more relevant infor for the CLI
31-
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
32-
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
21+
return new Promise((resolve, reject) => {
22+
pull(
23+
source,
24+
addStream,
25+
pull.collect((err, added) => {
26+
if (err) {
27+
// Tweak the error message and add more relevant infor for the CLI
28+
if (err.code === 'ERR_DIR_NON_RECURSIVE') {
29+
err.message = `'${err.path}' is a directory, use the '-r' flag to specify directories`
30+
}
31+
return reject(err)
32+
}
33+
34+
if (options.silent) return resolve()
35+
36+
if (options.quieter) {
37+
print(added.pop().hash)
38+
return resolve()
3339
}
34-
throw err
35-
}
3640

37-
if (options.silent) return
38-
if (options.quieter) return print(added.pop().hash)
39-
40-
sortBy(added, 'path')
41-
.reverse()
42-
.map((file) => {
43-
const log = options.quiet ? [] : ['added']
44-
log.push(cidToString(file.hash, { base: options.cidBase }))
45-
if (!options.quiet && file.path.length > 0) log.push(file.path)
46-
return log.join(' ')
47-
})
48-
.forEach((msg) => print(msg))
49-
})
50-
)
41+
sortBy(added, 'path')
42+
.reverse()
43+
.map((file) => {
44+
const log = options.quiet ? [] : ['added']
45+
log.push(cidToString(file.hash, { base: options.cidBase }))
46+
if (!options.quiet && file.path.length > 0) log.push(file.path)
47+
return log.join(' ')
48+
})
49+
.forEach((msg) => print(msg))
50+
51+
resolve()
52+
})
53+
)
54+
})
5155
}
5256

5357
module.exports = {
@@ -140,46 +144,45 @@ module.exports = {
140144
},
141145

142146
handler (argv) {
143-
const { ipfs } = argv
144-
const options = {
145-
strategy: argv.trickle ? 'trickle' : 'balanced',
146-
shardSplitThreshold: argv.enableShardingExperiment
147-
? argv.shardSplitThreshold
148-
: Infinity,
149-
cidVersion: argv.cidVersion,
150-
rawLeaves: argv.rawLeaves,
151-
onlyHash: argv.onlyHash,
152-
hashAlg: argv.hash,
153-
wrapWithDirectory: argv.wrapWithDirectory,
154-
pin: argv.pin,
155-
chunker: argv.chunker
156-
}
157-
158-
if (options.enableShardingExperiment && isDaemonOn()) {
159-
throw new Error('Error: Enabling the sharding experiment should be done on the daemon')
160-
}
147+
argv.resolve((async () => {
148+
const { ipfs } = argv
149+
const options = {
150+
strategy: argv.trickle ? 'trickle' : 'balanced',
151+
shardSplitThreshold: argv.enableShardingExperiment
152+
? argv.shardSplitThreshold
153+
: Infinity,
154+
cidVersion: argv.cidVersion,
155+
rawLeaves: argv.rawLeaves,
156+
onlyHash: argv.onlyHash,
157+
hashAlg: argv.hash,
158+
wrapWithDirectory: argv.wrapWithDirectory,
159+
pin: argv.pin,
160+
chunker: argv.chunker
161+
}
161162

162-
const source = argv.file
163-
? globSource(...argv.file, { recursive: argv.recursive })
164-
: toPull.source(process.stdin) // Pipe directly to ipfs.add
163+
if (options.enableShardingExperiment && isDaemonOn()) {
164+
throw new Error('Error: Enabling the sharding experiment should be done on the daemon')
165+
}
165166

166-
const adder = ipfs.addPullStream(options)
167+
const source = argv.file
168+
? globSource(...argv.file, { recursive: argv.recursive })
169+
: toPull.source(process.stdin) // Pipe directly to ipfs.add
167170

168-
// No progress or piping directly to ipfs.add: no need to getTotalBytes
169-
if (!argv.progress || !argv.file) {
170-
return addPipeline(source, adder, argv)
171-
}
171+
const adder = ipfs.addPullStream(options)
172172

173-
getTotalBytes(argv.file, (err, totalBytes) => {
174-
if (err) throw err
173+
// No progress or piping directly to ipfs.add: no need to getTotalBytes
174+
if (!argv.progress || !argv.file) {
175+
return addPipeline(source, adder, argv)
176+
}
175177

178+
const totalBytes = await getTotalBytes(argv.file)
176179
const bar = createProgressBar(totalBytes)
177180

178181
options.progress = byteLength => {
179182
bar.update(byteLength / totalBytes, { progress: byteman(byteLength, 2, 'MB') })
180183
}
181184

182-
addPipeline(source, adder, argv)
183-
})
185+
return addPipeline(source, adder, argv)
186+
})())
184187
}
185188
}

src/cli/commands/bitswap/stat.js

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,9 @@ module.exports = {
1717
}
1818
},
1919

20-
handler ({ ipfs, cidBase }) {
21-
ipfs.bitswap.stat((err, stats) => {
22-
if (err) {
23-
throw err
24-
}
25-
20+
handler ({ ipfs, cidBase, resolve }) {
21+
resolve((async () => {
22+
const stats = await ipfs.bitswap.stat()
2623
stats.wantlist = stats.wantlist.map(k => cidToString(k['/'], { base: cidBase, upgrade: false }))
2724
stats.peers = stats.peers || []
2825

@@ -34,6 +31,6 @@ module.exports = {
3431
${stats.wantlist.join('\n ')}
3532
partners [${stats.peers.length}]
3633
${stats.peers.join('\n ')}`)
37-
})
34+
})())
3835
}
3936
}

src/cli/commands/bitswap/unwant.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,10 @@ module.exports = {
2121
choices: multibase.names
2222
}
2323
},
24-
handler ({ ipfs, key, cidBase }) {
25-
ipfs.bitswap.unwant(key, (err) => {
26-
if (err) {
27-
throw err
28-
}
24+
handler ({ ipfs, key, cidBase, resolve }) {
25+
resolve((async () => {
26+
await ipfs.bitswap.unwant(key)
2927
print(`Key ${cidToString(key, { base: cidBase, upgrade: false })} removed from wantlist`)
30-
})
28+
})())
3129
}
3230
}

src/cli/commands/bitswap/wantlist.js

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,10 @@ module.exports = {
2222
}
2323
},
2424

25-
handler ({ ipfs, peer, cidBase }) {
26-
ipfs.bitswap.wantlist(peer, (err, list) => {
27-
if (err) {
28-
throw err
29-
}
25+
handler ({ ipfs, peer, cidBase, resolve }) {
26+
resolve((async () => {
27+
const list = await ipfs.bitswap.wantlist(peer)
3028
list.Keys.forEach(k => print(cidToString(k['/'], { base: cidBase, upgrade: false })))
31-
})
29+
})())
3230
}
3331
}

src/cli/commands/block/get.js

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,14 @@ module.exports = {
99

1010
builder: {},
1111

12-
handler ({ ipfs, key }) {
13-
ipfs.block.get(key, (err, block) => {
14-
if (err) {
15-
throw err
16-
}
17-
12+
handler ({ ipfs, key, resolve }) {
13+
resolve((async () => {
14+
const block = await ipfs.block.get(key)
1815
if (block) {
1916
print(block.data, false)
2017
} else {
2118
print('Block was unwanted before it could be remotely retrieved')
2219
}
23-
})
20+
})())
2421
}
2522
}

src/cli/commands/block/put.js

Lines changed: 16 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,10 @@
33
const bl = require('bl')
44
const fs = require('fs')
55
const multibase = require('multibase')
6+
const promisify = require('promisify-es6')
67
const { print } = require('../../utils')
78
const { cidToString } = require('../../../utils/cid')
89

9-
function addBlock (data, opts) {
10-
const ipfs = opts.ipfs
11-
12-
ipfs.block.put(data, opts, (err, block) => {
13-
if (err) {
14-
throw err
15-
}
16-
print(cidToString(block.cid, { base: opts.cidBase }))
17-
})
18-
}
19-
2010
module.exports = {
2111
command: 'put [block]',
2212

@@ -48,17 +38,22 @@ module.exports = {
4838
},
4939

5040
handler (argv) {
51-
if (argv.block) {
52-
const buf = fs.readFileSync(argv.block)
53-
return addBlock(buf, argv)
54-
}
55-
56-
process.stdin.pipe(bl((err, input) => {
57-
if (err) {
58-
throw err
41+
argv.resolve((async () => {
42+
let data
43+
44+
if (argv.block) {
45+
data = await promisify(fs.readFile)(argv.block)
46+
} else {
47+
data = await new Promise((resolve, reject) => {
48+
process.stdin.pipe(bl((err, input) => {
49+
if (err) return reject(err)
50+
resolve(input)
51+
}))
52+
})
5953
}
6054

61-
addBlock(input, argv)
62-
}))
55+
const { cid } = await argv.ipfs.block.put(data, argv)
56+
print(cidToString(cid, { base: argv.cidBase }))
57+
})())
6358
}
6459
}

src/cli/commands/block/rm.js

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,18 +9,15 @@ module.exports = {
99

1010
builder: {},
1111

12-
handler ({ ipfs, key }) {
13-
if (isDaemonOn()) {
14-
// TODO implement this once `js-ipfs-http-client` supports it
15-
throw new Error('rm block with daemon running is not yet implemented')
16-
}
17-
18-
ipfs.block.rm(key, (err) => {
19-
if (err) {
20-
throw err
12+
handler ({ ipfs, key, resolve }) {
13+
resolve((async () => {
14+
if (isDaemonOn()) {
15+
// TODO implement this once `js-ipfs-http-client` supports it
16+
throw new Error('rm block with daemon running is not yet implemented')
2117
}
2218

19+
await ipfs.block.rm(key)
2320
print('removed ' + key)
24-
})
21+
})())
2522
}
2623
}

src/cli/commands/block/stat.js

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,11 @@ module.exports = {
1717
}
1818
},
1919

20-
handler ({ ipfs, key, cidBase }) {
21-
ipfs.block.stat(key, (err, stats) => {
22-
if (err) {
23-
throw err
24-
}
25-
20+
handler ({ ipfs, key, cidBase, resolve }) {
21+
resolve((async () => {
22+
const stats = await ipfs.block.stat(key)
2623
print('Key: ' + cidToString(stats.key, { base: cidBase }))
2724
print('Size: ' + stats.size)
28-
})
25+
})())
2926
}
3027
}

src/cli/commands/bootstrap/add.js

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,9 @@ module.exports = {
1616
},
1717

1818
handler (argv) {
19-
argv.ipfs.bootstrap.add(argv.peer, {
20-
default: argv.default
21-
}, (err, list) => {
22-
if (err) {
23-
throw err
24-
}
25-
19+
argv.resolve((async () => {
20+
const list = await argv.ipfs.bootstrap.add(argv.peer, { default: argv.default })
2621
list.Peers.forEach((peer) => print(peer))
27-
})
22+
})())
2823
}
2924
}

src/cli/commands/bootstrap/list.js

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,9 @@ module.exports = {
1010
builder: {},
1111

1212
handler (argv) {
13-
argv.ipfs.bootstrap.list((err, list) => {
14-
if (err) {
15-
throw err
16-
}
17-
13+
argv.resolve((async () => {
14+
const list = await argv.ipfs.bootstrap.list()
1815
list.Peers.forEach((node) => print(node))
19-
})
16+
})())
2017
}
2118
}

0 commit comments

Comments
 (0)