1
1
'use strict'
2
2
3
- const assert = require ( 'assert' )
4
- const mortice = require ( 'mortice' )
5
- const nanoid = require ( 'nanoid' )
6
3
const pull = require ( 'pull-stream/pull' )
7
4
const pullThrough = require ( 'pull-stream/throughs/through' )
8
5
const pullAsyncMap = require ( 'pull-stream/throughs/async-map' )
9
6
const EventEmitter = require ( 'events' )
7
+ const Mutex = require ( '../../../utils/mutex' )
10
8
const log = require ( 'debug' ) ( 'ipfs:gc:lock' )
11
9
12
10
class GCLock extends EventEmitter {
13
11
constructor ( repoOwner ) {
14
12
super ( )
15
13
16
- // Ensure that we get a different mutex for each instance of GCLock
17
- // (There should only be one GCLock instance per IPFS instance, but
18
- // there may be multiple IPFS instances, eg in unit tests)
19
- const randId = nanoid ( )
20
- this . mutex = mortice ( randId , {
21
- singleProcess : repoOwner
22
- } )
23
-
24
- this . lockId = 0
14
+ this . mutex = new Mutex ( repoOwner , { log } )
25
15
}
26
16
27
17
readLock ( lockedFn , cb ) {
28
- return this . lock ( 'readLock' , lockedFn , cb )
18
+ this . emit ( `readLock request` )
19
+ return this . mutex . readLock ( lockedFn , cb )
29
20
}
30
21
31
22
writeLock ( lockedFn , cb ) {
32
- return this . lock ( 'writeLock' , lockedFn , cb )
33
- }
34
-
35
- lock ( type , lockedFn , cb ) {
36
- assert ( typeof lockedFn === 'function' , `first argument to GCLock.${ type } must be a function` )
37
- assert ( typeof cb === 'function' , `second argument to GCLock.${ type } must be a callback function` )
38
-
39
- const lockId = this . lockId ++
40
- log ( `[${ lockId } ] ${ type } requested` )
41
- this . emit ( `${ type } request` , lockId )
42
- const locked = ( ) => new Promise ( ( resolve , reject ) => {
43
- this . emit ( `${ type } start` , lockId )
44
- log ( `[${ lockId } ] ${ type } started` )
45
- lockedFn ( ( err , res ) => {
46
- this . emit ( `${ type } release` , lockId )
47
- log ( `[${ lockId } ] ${ type } released` )
48
- err ? reject ( err ) : resolve ( res )
49
- } )
50
- } )
51
-
52
- const lock = this . mutex [ type ] ( locked )
53
- return lock . then ( res => cb ( null , res ) , cb )
23
+ this . emit ( `writeLock request` )
24
+ return this . mutex . writeLock ( lockedFn , cb )
54
25
}
55
26
56
27
pullReadLock ( lockedPullFn ) {
@@ -73,60 +44,36 @@ class GCLock extends EventEmitter {
73
44
}
74
45
75
46
class PullLocker {
76
- constructor ( emitter , mutex , type , lockId ) {
47
+ constructor ( emitter , mutex , type ) {
77
48
this . emitter = emitter
78
49
this . mutex = mutex
79
50
this . type = type
80
- this . lockId = lockId
81
51
82
- // This Promise resolves when the mutex gives us permission to start
83
- // running the locked piece of code
84
- this . lockReady = new Promise ( ( resolve ) => {
85
- this . lockReadyResolver = resolve
86
- } )
52
+ // The function to call to release the lock. It is set when the lock is taken
53
+ this . releaseLock = null
87
54
}
88
55
89
- // Returns a Promise that resolves when the locked piece of code completes
90
- _locked ( ) {
91
- return new Promise ( ( resolve , reject ) => {
92
- this . releaseLock = ( err ) => err ? reject ( err ) : resolve ( )
93
-
94
- log ( `[${ this . lockId } ] ${ this . type } (pull) started` )
95
- this . emitter . emit ( `${ this . type } start` , this . lockId )
96
-
97
- // The locked piece of code is ready to start, so resolve the
98
- // this.lockReady Promise (created in the constructor)
99
- this . lockReadyResolver ( )
100
- } )
101
- }
102
-
103
- // Requests a lock and then waits for the mutex to give us permission to run
104
- // the locked piece of code
105
56
take ( ) {
106
57
return pull (
107
58
pullAsyncMap ( ( i , cb ) => {
108
- if ( ! this . lock ) {
109
- log ( `[${ this . lockId } ] ${ this . type } (pull) requested` )
110
- this . emitter . emit ( `${ this . type } request` , this . lockId )
111
- // Request the lock
112
- this . lock = this . mutex [ this . type ] ( ( ) => this . _locked ( ) )
113
- // If there is an error, it gets passed through to the caller using
114
- // pull streams, so here we just catch the error and ignore it so
115
- // that there isn't an UnhandledPromiseRejectionWarning
116
- this . lock . catch ( ( ) => { } )
59
+ if ( this . lockRequested ) {
60
+ return cb ( null , i )
117
61
}
62
+ this . lockRequested = true
63
+
64
+ this . emitter . emit ( `${ this . type } request` )
118
65
119
- // Wait for the mutex to give us permission
120
- this . lockReady . then ( ( ) => cb ( null , i ) )
66
+ this . mutex [ this . type ] ( ( releaseLock ) => {
67
+ cb ( null , i )
68
+ this . releaseLock = releaseLock
69
+ } )
121
70
} )
122
71
)
123
72
}
124
73
125
74
// Releases the lock
126
75
release ( ) {
127
76
return pullThrough ( null , ( err ) => {
128
- log ( `[${ this . lockId } ] ${ this . type } (pull) released` )
129
- this . emitter . emit ( `${ this . type } release` , this . lockId )
130
77
this . releaseLock ( err )
131
78
} )
132
79
}
0 commit comments