3
3
4
4
/* :: import type {Callback, Batch, Query, QueryResult, QueryEntry} from 'interface-datastore' */
5
5
6
- const pull = require ( 'pull-stream' )
7
6
const levelup = require ( 'levelup' )
8
-
9
- const asyncFilter = require ( 'interface-datastore' ) . utils . asyncFilter
10
- const asyncSort = require ( 'interface-datastore' ) . utils . asyncSort
11
- const Key = require ( 'interface-datastore' ) . Key
12
- const Errors = require ( 'interface-datastore' ) . Errors
7
+ const { Key, Errors, utils } = require ( 'interface-datastore' )
13
8
const encode = require ( 'encoding-down' )
14
9
10
+ const { filter, map, take, sortAll } = utils
11
+
15
12
/**
16
13
* A datastore backed by leveldb.
17
14
*/
@@ -50,59 +47,53 @@ class LevelDatastore {
50
47
)
51
48
}
52
49
53
- open ( callback /* : Callback<void> */ ) /* : void */ {
54
- this . db . open ( ( err ) => {
55
- if ( err ) {
56
- return callback ( Errors . dbOpenFailedError ( err ) )
57
- }
58
- callback ( )
59
- } )
50
+ async open ( ) /* : Promise */ {
51
+ try {
52
+ await this . db . open ( )
53
+ } catch ( err ) {
54
+ throw Errors . dbOpenFailedError ( err )
55
+ }
60
56
}
61
57
62
- put ( key /* : Key */ , value /* : Buffer */ , callback /* : Callback<void> */ ) /* : void */ {
63
- this . db . put ( key . toString ( ) , value , ( err ) => {
64
- if ( err ) {
65
- return callback ( Errors . dbWriteFailedError ( err ) )
66
- }
67
- callback ( )
68
- } )
58
+ async put ( key /* : Key */ , value /* : Buffer */ ) /* : Promise */ {
59
+ try {
60
+ await this . db . put ( key . toString ( ) , value )
61
+ } catch ( err ) {
62
+ throw Errors . dbWriteFailedError ( err )
63
+ }
69
64
}
70
65
71
- get ( key /* : Key */ , callback /* : Callback<Buffer> */ ) /* : void */ {
72
- this . db . get ( key . toString ( ) , ( err , data ) => {
73
- if ( err ) {
74
- return callback ( Errors . notFoundError ( err ) )
75
- }
76
- callback ( null , data )
77
- } )
66
+ async get ( key /* : Key */ ) /* : Promise */ {
67
+ let data
68
+ try {
69
+ data = await this . db . get ( key . toString ( ) )
70
+ } catch ( err ) {
71
+ if ( err . notFound ) throw Errors . notFoundError ( err )
72
+ throw Errors . dbWriteFailedError ( err )
73
+ }
74
+ return data
78
75
}
79
76
80
- has ( key /* : Key */ , callback /* : Callback<bool> */ ) /* : void */ {
81
- this . db . get ( key . toString ( ) , ( err , res ) => {
82
- if ( err ) {
83
- if ( err . notFound ) {
84
- callback ( null , false )
85
- return
86
- }
87
- callback ( err )
88
- return
89
- }
90
-
91
- callback ( null , true )
92
- } )
77
+ async has ( key /* : Key */ ) /* : Promise<Boolean> */ {
78
+ try {
79
+ await this . db . get ( key . toString ( ) )
80
+ } catch ( err ) {
81
+ if ( err . notFound ) return false
82
+ throw err
83
+ }
84
+ return true
93
85
}
94
86
95
- delete ( key /* : Key */ , callback /* : Callback<void> */ ) /* : void */ {
96
- this . db . del ( key . toString ( ) , ( err ) => {
97
- if ( err ) {
98
- return callback ( Errors . dbDeleteFailedError ( err ) )
99
- }
100
- callback ( )
101
- } )
87
+ async delete ( key /* : Key */ ) /* : Promise */ {
88
+ try {
89
+ await this . db . del ( key . toString ( ) )
90
+ } catch ( err ) {
91
+ throw Errors . dbDeleteFailedError ( err )
92
+ }
102
93
}
103
94
104
- close ( callback /* : Callback<void> */ ) /* : void */ {
105
- this . db . close ( callback )
95
+ close ( ) /* : Promise */ {
96
+ return this . db . close ( )
106
97
}
107
98
108
99
batch ( ) /* : Batch<Buffer> */ {
@@ -121,8 +112,8 @@ class LevelDatastore {
121
112
key : key . toString ( )
122
113
} )
123
114
} ,
124
- commit : ( callback /* : Callback<void> */ ) /* : void */ => {
125
- this . db . batch ( ops , callback )
115
+ commit : ( ) /* : Promise */ => {
116
+ return this . db . batch ( ops )
126
117
}
127
118
}
128
119
}
@@ -133,70 +124,65 @@ class LevelDatastore {
133
124
values = ! q . keysOnly
134
125
}
135
126
136
- const iter = this . db . db . iterator ( {
137
- keys : true ,
138
- values : values ,
139
- keyAsBuffer : true
140
- } )
141
-
142
- const rawStream = ( end , cb ) => {
143
- if ( end ) {
144
- return iter . end ( ( err ) => {
145
- cb ( err || end )
146
- } )
147
- }
148
-
149
- iter . next ( ( err , key , value ) => {
150
- if ( err ) {
151
- return cb ( err )
152
- }
153
-
154
- if ( err == null && key == null && value == null ) {
155
- return iter . end ( ( err ) => {
156
- cb ( err || true )
157
- } )
158
- }
159
-
160
- const res /* : QueryEntry<Buffer> */ = {
161
- key : new Key ( key , false )
162
- }
163
-
164
- if ( values ) {
165
- res . value = Buffer . from ( value )
166
- }
167
-
168
- cb ( null , res )
127
+ let it = levelIteratorToIterator (
128
+ this . db . db . iterator ( {
129
+ keys : true ,
130
+ values : values ,
131
+ keyAsBuffer : true
169
132
} )
170
- }
133
+ )
171
134
172
- let tasks = [ rawStream ]
173
- let filters = [ ]
135
+ it = map ( it , ( { key, value } ) => {
136
+ const res /* : QueryEntry<Buffer> */ = { key : new Key ( key , false ) }
137
+ if ( values ) {
138
+ res . value = Buffer . from ( value )
139
+ }
140
+ return res
141
+ } )
174
142
175
143
if ( q . prefix != null ) {
176
- const prefix = q . prefix
177
- filters . push ( ( e , cb ) => cb ( null , e . key . toString ( ) . startsWith ( prefix ) ) )
144
+ it = filter ( it , e => e . key . toString ( ) . startsWith ( q . prefix ) )
178
145
}
179
146
180
- if ( q . filters != null ) {
181
- filters = filters . concat ( q . filters )
147
+ if ( Array . isArray ( q . filters ) ) {
148
+ it = q . filters . reduce ( ( it , f ) => filter ( it , f ) , it )
182
149
}
183
150
184
- tasks = tasks . concat ( filters . map ( f => asyncFilter ( f ) ) )
185
-
186
- if ( q . orders != null ) {
187
- tasks = tasks . concat ( q . orders . map ( o => asyncSort ( o ) ) )
151
+ if ( Array . isArray ( q . orders ) ) {
152
+ it = q . orders . reduce ( ( it , f ) => sortAll ( it , f ) , it )
188
153
}
189
154
190
155
if ( q . offset != null ) {
191
156
let i = 0
192
- tasks . push ( pull . filter ( ( ) => i ++ >= q . offset ) )
157
+ it = filter ( it , ( ) => i ++ >= q . offset )
193
158
}
194
159
195
160
if ( q . limit != null ) {
196
- tasks . push ( pull . take ( q . limit ) )
161
+ it = take ( it , q . limit )
197
162
}
198
163
199
- return pull . apply ( null , tasks )
164
+ return it
165
+ }
166
+ }
167
+
168
+ function levelIteratorToIterator ( li ) {
169
+ return {
170
+ next : ( ) => new Promise ( ( resolve , reject ) => {
171
+ li . next ( ( err , key , value ) => {
172
+ if ( err ) return reject ( err )
173
+ if ( key == null ) return resolve ( { done : true } )
174
+ resolve ( { done : false , value : { key, value } } )
175
+ } )
176
+ } ) ,
177
+ return : ( ) => new Promise ( ( resolve , reject ) => {
178
+ li . end ( err => {
179
+ if ( err ) return reject ( err )
180
+ resolve ( { done : true } )
181
+ } )
182
+ } ) ,
183
+ [ Symbol . asyncIterator ] ( ) {
184
+ return this
185
+ }
200
186
}
201
187
}
202
188
0 commit comments