Skip to content

Commit 4abf62d

Browse files
committed
use exact read size to acquire from io semaphore
- In read_object_stream, we used the requested_size passed to the _read function as the value to acquire from the io semaphore. By default, this is 32 MB (the stream's highWaterMark). - For datasets with mostly small objects, this limits the number of concurrent reads more than necessary. - Changed io_sem_size to reflect the actual size requested by the current read. - Also, avoid entering the code under the semaphore if there is nothing more to read. - changed debug level of some read\upload messages to log1 instead of log0 Signed-off-by: Danny Zaken <[email protected]>
1 parent 149c7f5 commit 4abf62d

File tree

1 file changed

+16
-10
lines changed

1 file changed

+16
-10
lines changed

src/sdk/object_io.js

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ class ObjectIO {
173173
params.bucket_master_key_id = obj_upload.bucket_master_key_id;
174174

175175
try {
176-
dbg.log0('upload_object_range: start upload stream', upload_params);
176+
dbg.log1('upload_object_range: start upload stream', upload_params);
177177
return this._upload_stream(params, complete_params);
178178
} catch (err) {
179179
dbg.error('upload_object_range: object part upload failed', upload_params, err);
@@ -213,7 +213,7 @@ class ObjectIO {
213213
'last_modified_time',
214214
);
215215
try {
216-
dbg.log0('upload_object: start upload', create_params);
216+
dbg.log1('upload_object: start upload', create_params);
217217
const create_reply = await params.client.object.create_object_upload(create_params);
218218
params.obj_id = create_reply.obj_id;
219219
params.tier_id = create_reply.tier_id;
@@ -228,7 +228,7 @@ class ObjectIO {
228228
await this._upload_stream(params, complete_params);
229229
}
230230

231-
dbg.log0('upload_object: complete upload', complete_params);
231+
dbg.log1('upload_object: complete upload', complete_params);
232232

233233
if (params.async_get_last_modified_time) {
234234
complete_params.last_modified_time = await params.async_get_last_modified_time();
@@ -275,7 +275,7 @@ class ObjectIO {
275275
'num',
276276
);
277277
try {
278-
dbg.log0('upload_multipart: start upload', complete_params);
278+
dbg.log1('upload_multipart: start upload', complete_params);
279279
const multipart_reply = await params.client.object.create_multipart(create_params);
280280
params.tier_id = multipart_reply.tier_id;
281281
params.bucket_id = multipart_reply.bucket_id;
@@ -289,7 +289,7 @@ class ObjectIO {
289289
} else {
290290
await this._upload_stream(params, complete_params);
291291
}
292-
dbg.log0('upload_multipart: complete upload', complete_params);
292+
dbg.log1('upload_multipart: complete upload', complete_params);
293293
const multipart_params = await params.client.object.complete_multipart(complete_params);
294294
multipart_params.multipart_id = complete_params.multipart_id;
295295
return multipart_params;
@@ -377,7 +377,7 @@ class ObjectIO {
377377
async _upload_stream_internal(params, complete_params) {
378378

379379
params.desc = _.pick(params, 'obj_id', 'num', 'bucket', 'key');
380-
dbg.log0('UPLOAD:', params.desc, 'streaming to', params.bucket, params.key);
380+
dbg.log1('UPLOAD:', params.desc, 'streaming to', params.bucket, params.key);
381381

382382
// start and seq are set to zero even for multiparts and will be fixed
383383
// when multiparts are combined to object in complete_object_upload
@@ -490,7 +490,7 @@ class ObjectIO {
490490
params.range.end = params.start;
491491
complete_params.size += chunk.size;
492492
complete_params.num_parts += 1;
493-
dbg.log0('UPLOAD: part', { ...params.desc, start: part.start, end: part.end, seq: part.seq });
493+
dbg.log1('UPLOAD: part', { ...params.desc, start: part.start, end: part.end, seq: part.seq });
494494

495495
if (chunk.size > config.MAX_OBJECT_PART_SIZE) {
496496
throw new Error(`Chunk size=${chunk.size} exceeds ` +
@@ -500,7 +500,7 @@ class ObjectIO {
500500
return chunk;
501501
});
502502

503-
/**
503+
/**
504504
* passing partial object info we have in this context which will be sent to block_stores
505505
* as block_md.mapping_info so it can be used for recovery in case the db is not available.
506506
* @type {Partial<nb.ObjectInfo>}
@@ -592,13 +592,19 @@ class ObjectIO {
592592
reader.push(reader.pending.shift());
593593
return;
594594
}
595-
const io_sem_size = _get_io_semaphore_size(requested_size);
596595

597596
// TODO we dont want to use requested_size as end, because we read entire chunks
598597
// and we are better off return the data to the stream buffer
599598
// instead of getting multiple calls from the stream with small slices to return.
600599

601600
const requested_end = Math.min(params.end, reader.pos + requested_size);
601+
if (requested_end <= reader.pos) {
602+
dbg.log1(`READ reader finished. requested end is less than reader pos. requested_end=${requested_end} reader.pos=${reader.pos}`);
603+
reader.push(null);
604+
return;
605+
}
606+
607+
const io_sem_size = _get_io_semaphore_size(requested_end - reader.pos);
602608
this._io_buffers_sem.surround_count(io_sem_size, async () => {
603609
try {
604610
const buffers = await this.read_object({
@@ -627,7 +633,7 @@ class ObjectIO {
627633
reader.pending.push(missing_buf);
628634
}
629635
}
630-
dbg.log0('READ reader pos', reader.pos);
636+
dbg.log1('READ reader pos', reader.pos);
631637
reader.push(reader.pending.shift());
632638
} else {
633639
reader.push(null);

0 commit comments

Comments
 (0)