@@ -920,22 +920,39 @@ class NamespaceFS {
920920 async read_object_md ( params , object_sdk ) {
921921 const fs_context = this . prepare_fs_context ( object_sdk ) ;
922922 let file_path ;
923+ let stat ;
924+ let isDir ;
925+ let retries = ( this . _is_versioning_enabled ( ) || this . _is_versioning_suspended ( ) ) ? config . NSFS_RENAME_RETRIES : 0 ;
926+ const is_gpfs = native_fs_utils . _is_gpfs ( fs_context ) ;
923927 try {
924- file_path = await this . _find_version_path ( fs_context , params , true ) ;
925- await this . _check_path_in_bucket_boundaries ( fs_context , file_path ) ;
926- await this . _load_bucket ( params , fs_context ) ;
927- let stat = await nb_native ( ) . fs . stat ( fs_context , file_path ) ;
928-
929- const isDir = native_fs_utils . isDirectory ( stat ) ;
930- if ( isDir ) {
931- if ( ! stat . xattr ?. [ XATTR_DIR_CONTENT ] || ! params . key . endsWith ( '/' ) ) {
932- throw error_utils . new_error_code ( 'ENOENT' , 'NoSuchKey' ) ;
933- } else if ( stat . xattr ?. [ XATTR_DIR_CONTENT ] !== '0' ) {
934- // find dir object content file path and return its stat + xattr of its parent directory
935- const dir_content_path = await this . _find_version_path ( fs_context , params ) ;
936- const dir_content_path_stat = await nb_native ( ) . fs . stat ( fs_context , dir_content_path ) ;
937- const xattr = stat . xattr ;
938- stat = { ...dir_content_path_stat , xattr } ;
928+ for ( ; ; ) {
929+ try {
930+ file_path = await this . _find_version_path ( fs_context , params , true ) ;
931+ await this . _check_path_in_bucket_boundaries ( fs_context , file_path ) ;
932+ await this . _load_bucket ( params , fs_context ) ;
933+ stat = await nb_native ( ) . fs . stat ( fs_context , file_path ) ;
934+
935+ isDir = native_fs_utils . isDirectory ( stat ) ;
936+ if ( isDir ) {
937+ if ( ! stat . xattr ?. [ XATTR_DIR_CONTENT ] || ! params . key . endsWith ( '/' ) ) {
938+ throw error_utils . new_error_code ( 'ENOENT' , 'NoSuchKey' ) ;
939+ } else if ( stat . xattr ?. [ XATTR_DIR_CONTENT ] !== '0' ) {
940+ // find dir object content file path and return its stat + xattr of its parent directory
941+ const dir_content_path = await this . _find_version_path ( fs_context , params ) ;
942+ const dir_content_path_stat = await nb_native ( ) . fs . stat ( fs_context , dir_content_path ) ;
943+ const xattr = stat . xattr ;
944+ stat = { ...dir_content_path_stat , xattr } ;
945+ }
946+ }
947+ if ( this . _is_mismatch_version_id ( stat , params . version_id ) ) {
948+ dbg . warn ( 'NamespaceFS.read_object_md mismatch version_id' , file_path , params . version_id , this . _get_version_id_by_xattr ( stat ) ) ;
949+ throw error_utils . new_error_code ( 'MISMATCH_VERSION' , 'file version does not match the version we asked for' ) ;
950+ }
951+ break ;
952+ } catch ( err ) {
953+ dbg . warn ( `NamespaceFS.read_object_md: retrying retries=${ retries } file_path=${ file_path } ` , err ) ;
954+ retries -= 1 ;
955+ if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) throw err ;
939956 }
940957 }
941958 this . _throw_if_delete_marker ( stat , params ) ;
@@ -948,42 +965,70 @@ class NamespaceFS {
948965 }
949966 }
950967
968+ async _is_empty_directory_content ( file_path , fs_context , params ) {
969+ const is_dir_content = this . _is_directory_content ( file_path , params . key ) ;
970+ if ( is_dir_content ) {
971+ try {
972+ const md_path = this . _get_file_md_path ( params ) ;
973+ const dir_stat = await nb_native ( ) . fs . stat ( fs_context , md_path ) ;
974+ if ( dir_stat && dir_stat . xattr [ XATTR_DIR_CONTENT ] === '0' ) return true ;
975+ } catch ( err ) {
976+ //failed to get object
977+ new NoobaaEvent ( NoobaaEvent . OBJECT_GET_FAILED ) . create_event ( params . key ,
978+ { bucket_path : this . bucket_path , object_name : params . key } , err ) ;
979+ dbg . log0 ( 'NamespaceFS: read_object_stream couldnt find dir content xattr' , err ) ;
980+ }
981+ }
982+ return false ;
983+ }
984+
951985 // eslint-disable-next-line max-statements
952986 async read_object_stream ( params , object_sdk , res ) {
953- let file ;
954987 let buffer_pool_cleanup = null ;
955988 const fs_context = this . prepare_fs_context ( object_sdk ) ;
956989 let file_path ;
990+ let file ;
957991 try {
958992 await this . _load_bucket ( params , fs_context ) ;
959- file_path = await this . _find_version_path ( fs_context , params ) ;
960- await this . _check_path_in_bucket_boundaries ( fs_context , file_path ) ;
961-
962- // NOTE: don't move this code after the open
963- // this can lead to ENOENT failures due to file not exists when content size is 0
964- // if entry is a directory object and its content size = 0 - return empty response
965- const is_dir_content = this . _is_directory_content ( file_path , params . key ) ;
966- if ( is_dir_content ) {
993+ let retries = ( this . _is_versioning_enabled ( ) || this . _is_versioning_suspended ( ) ) ? config . NSFS_RENAME_RETRIES : 0 ;
994+ const is_gpfs = native_fs_utils . _is_gpfs ( fs_context ) ;
995+ let stat ;
996+ for ( ; ; ) {
967997 try {
968- const md_path = this . _get_file_md_path ( params ) ;
969- const dir_stat = await nb_native ( ) . fs . stat ( fs_context , md_path ) ;
970- if ( dir_stat && dir_stat . xattr [ XATTR_DIR_CONTENT ] === '0' ) return null ;
998+ file_path = await this . _find_version_path ( fs_context , params ) ;
999+ await this . _check_path_in_bucket_boundaries ( fs_context , file_path ) ;
1000+
1001+ // NOTE: don't move this code after the open
1002+ // this can lead to ENOENT failures due to file not exists when content size is 0
1003+ // if entry is a directory object and its content size = 0 - return empty response
1004+ if ( await this . _is_empty_directory_content ( file_path , fs_context , params ) ) return null ;
1005+
1006+ file = await nb_native ( ) . fs . open (
1007+ fs_context ,
1008+ file_path ,
1009+ config . NSFS_OPEN_READ_MODE ,
1010+ native_fs_utils . get_umasked_mode ( config . BASE_MODE_FILE ) ,
1011+ ) ;
1012+ stat = await file . stat ( fs_context ) ;
1013+ if ( this . _is_mismatch_version_id ( stat , params . version_id ) ) {
1014+ dbg . warn ( 'NamespaceFS.read_object_stream mismatch version_id' , params . version_id , this . _get_version_id_by_xattr ( stat ) ) ;
1015+ throw error_utils . new_error_code ( 'MISMATCH_VERSION' , 'file version does not match the version we asked for' ) ;
1016+ }
1017+ break ;
9711018 } catch ( err ) {
972- //failed to get object
973- new NoobaaEvent ( NoobaaEvent . OBJECT_GET_FAILED ) . create_event ( params . key ,
974- { bucket_path : this . bucket_path , object_name : params . key } , err ) ;
975- dbg . log0 ( 'NamespaceFS: read_object_stream couldnt find dir content xattr' , err ) ;
1019+ dbg . warn ( `NamespaceFS.read_object_stream: retrying retries=${ retries } file_path=${ file_path } ` , err ) ;
1020+ if ( file ) {
1021+ await file . close ( fs_context ) ;
1022+ file = null ;
1023+ }
1024+ retries -= 1 ;
1025+ if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) {
1026+ new NoobaaEvent ( NoobaaEvent . OBJECT_GET_FAILED ) . create_event ( params . key ,
1027+ { bucket_path : this . bucket_path , object_name : params . key } , err ) ;
1028+ throw err ;
1029+ }
9761030 }
9771031 }
978-
979- file = await nb_native ( ) . fs . open (
980- fs_context ,
981- file_path ,
982- config . NSFS_OPEN_READ_MODE ,
983- native_fs_utils . get_umasked_mode ( config . BASE_MODE_FILE ) ,
984- ) ;
985-
986- const stat = await file . stat ( fs_context ) ;
9871032 this . _throw_if_delete_marker ( stat , params ) ;
9881033 // await this._fail_if_archived_or_sparse_file(fs_context, file_path, stat);
9891034
@@ -1244,14 +1289,14 @@ class NamespaceFS {
12441289
12451290 /**
12461291 * _check_copy_storage_class returns true if a copy is needed to be forced.
1247- *
1292+ *
12481293 * This might be needed if we need to manage xattr separately on the source
12491294 * object and target object (eg. GLACIER objects).
1250- *
1295+ *
12511296 * NOTE: The function will throw S3 error if source object storage class is
12521297 * "GLACIER" but it is not in restored state (AWS behaviour).
1253- * @param {nb.NativeFSContext } fs_context
1254- * @param {Record<any, any> } params
1298+ * @param {nb.NativeFSContext } fs_context
1299+ * @param {Record<any, any> } params
12551300 * @returns {Promise<boolean> }
12561301 */
12571302 async _check_copy_storage_class ( fs_context , params ) {
@@ -1391,8 +1436,8 @@ class NamespaceFS {
13911436 }
13921437
13931438 // 1. get latest version_id
1394- // 2. if versioning is suspended -
1395- // 2.1 if version ID of the latest version is null -
1439+ // 2. if versioning is suspended -
1440+ // 2.1 if version ID of the latest version is null -
13961441 // 2.1.1 remove latest version
13971442 // 2.2 else (version ID of the latest version is unique or there is no latest version) -
13981443 // 2.2.1 remove a version (or delete marker) with null version ID from .versions/ (if exists)
@@ -1557,7 +1602,7 @@ class NamespaceFS {
15571602 const delimiter_idx = create_params_parsed . key . indexOf ( params . delimiter , start_idx ) ;
15581603 if ( delimiter_idx > 0 ) {
15591604 common_prefixes_set . add ( create_params_parsed . key . substring ( 0 , delimiter_idx + 1 ) ) ;
1560- // if key has common prefix it should not be returned as an upload object
1605+ // if key has common prefix it should not be returned as an upload object
15611606 return undefined ;
15621607 }
15631608 }
@@ -1606,7 +1651,7 @@ class NamespaceFS {
16061651 return path . join ( params . mpu_path , `part-${ params . num } ` ) ;
16071652 }
16081653
1609- // optimized version of upload_multipart -
1654+ // optimized version of upload_multipart -
16101655 // 1. if size is pre known -
16111656 // 1.1. calc offset
16121657 // 1.2. upload data to by_size file in offset position
@@ -1710,13 +1755,13 @@ class NamespaceFS {
17101755 }
17111756 }
17121757
1713- // iterate over multiparts array -
1758+ // iterate over multiparts array -
17141759 // 1. if num of unique sizes is 1
17151760 // 1.1. if this is the last part - link the size file and break the loop
17161761 // 1.2. else, continue the loop
17171762 // 2. if num of unique sizes is 2
17181763 // 2.1. if should_copy_file_prefix
1719- // 2.1.1. if the cur part is the last, link the previous part file to upload_path and copy the last part (tail) to upload_path
1764+ // 2.1.1. if the cur part is the last, link the previous part file to upload_path and copy the last part (tail) to upload_path
17201765 // 2.1.2. else - copy the prev part size file prefix to upload_path
17211766 // 3. copy bytes of the current's part size file
17221767 async complete_object_upload ( params , object_sdk ) {
@@ -2069,12 +2114,12 @@ class NamespaceFS {
20692114 /**
20702115 * restore_object simply sets the restore request xattr
20712116 * which should be picked by another mechanism.
2072- *
2117+ *
20732118 * restore_object internally relies on 2 xattrs:
20742119 * - XATTR_RESTORE_REQUEST
20752120 * - XATTR_RESTORE_EXPIRY
2076- * @param {* } params
2077- * @param {nb.ObjectSDK } object_sdk
2121+ * @param {* } params
2122+ * @param {nb.ObjectSDK } object_sdk
20782123 * @returns {Promise<boolean> }
20792124 */
20802125 async restore_object ( params , object_sdk ) {
@@ -2195,7 +2240,7 @@ class NamespaceFS {
21952240 }
21962241
21972242 /**
2198- *
2243+ *
21992244 * @param {* } fs_context - fs context object
22002245 * @param {string } file_path - path to file
22012246 * @param {* } set - the xattr object to be set
@@ -2533,7 +2578,7 @@ class NamespaceFS {
25332578 }
25342579 try {
25352580 // Returns the real path of the entry.
2536- // The entry path may point to regular file or directory, but can have symbolic links
2581+ // The entry path may point to regular file or directory, but can have symbolic links
25372582 const full_path = await nb_native ( ) . fs . realpath ( fs_context , entry_path ) ;
25382583 if ( ! full_path . startsWith ( this . bucket_path ) ) {
25392584 dbg . log0 ( 'check_bucket_boundaries: the path' , entry_path , 'is not in the bucket' , this . bucket_path , 'boundaries' ) ;
@@ -2725,6 +2770,10 @@ class NamespaceFS {
27252770 }
27262771 }
27272772
2773+ _is_mismatch_version_id ( stat , version_id ) {
2774+ return version_id && ! this . _is_versioning_disabled ( ) && this . _get_version_id_by_xattr ( stat ) !== version_id ;
2775+ }
2776+
27282777 /**
27292778 * @param {nb.NativeFSContext } fs_context
27302779 * @param {string } key
@@ -2771,9 +2820,9 @@ class NamespaceFS {
27712820 // there are a few concurrency scenarios that might happen we should retry for -
27722821 // 1. the version id is the latest, concurrent put will might move the version id from being the latest to .versions/ -
27732822 // will throw safe unlink failed on non matching fd (on GPFS) or inode/mtime (on POSIX).
2774- // 2. the version id is the second latest and stays under .versions/ - on concurrent delete of the latest,
2823+ // 2. the version id is the second latest and stays under .versions/ - on concurrent delete of the latest,
27752824 // the version id might move to be the latest and we will get ENOENT
2776- // 3. concurrent delete of this version - will get ENOENT, doing a retry will return successfully
2825+ // 3. concurrent delete of this version - will get ENOENT, doing a retry will return successfully
27772826 // after we will see that the version was already deleted
27782827 if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) throw err ;
27792828 } finally {
@@ -2930,8 +2979,8 @@ class NamespaceFS {
29302979 const bucket_tmp_dir_path = this . get_bucket_tmpdir_full_path ( ) ;
29312980 if ( this . _is_versioning_enabled ( ) || suspended_and_latest_is_not_null ) {
29322981 await native_fs_utils . _make_path_dirs ( versioned_path , fs_context ) ;
2933- await native_fs_utils . safe_move ( fs_context , latest_ver_path , versioned_path , latest_ver_info ,
2934- gpfs_options && gpfs_options . delete_version , bucket_tmp_dir_path ) ;
2982+ await native_fs_utils . safe_move_posix ( fs_context , latest_ver_path , versioned_path , latest_ver_info ,
2983+ bucket_tmp_dir_path ) ;
29352984 if ( suspended_and_latest_is_not_null ) {
29362985 // remove a version (or delete marker) with null version ID from .versions/ (if exists)
29372986 await this . _delete_null_version_from_versions_directory ( params . key , fs_context ) ;
@@ -2945,9 +2994,9 @@ class NamespaceFS {
29452994 }
29462995 break ;
29472996 } catch ( err ) {
2997+ dbg . warn ( `NamespaceFS._delete_latest_version: Retrying retries=${ retries } latest_ver_path=${ latest_ver_path } ` , err ) ;
29482998 retries -= 1 ;
29492999 if ( retries <= 0 || ! native_fs_utils . should_retry_link_unlink ( is_gpfs , err ) ) throw err ;
2950- dbg . warn ( `NamespaceFS._delete_latest_version: Retrying retries=${ retries } latest_ver_path=${ latest_ver_path } ` , err ) ;
29513000 } finally {
29523001 if ( gpfs_options ) await this . _close_files_gpfs ( fs_context , gpfs_options . delete_version , undefined , true ) ;
29533002 }
@@ -2962,7 +3011,7 @@ class NamespaceFS {
29623011
29633012 // We can have only one versioned object with null version ID per key.
29643013 // It can be latest version, old version in .version/ directory or delete marker
2965- // This function removes an object version or delete marker with a null version ID inside .version/ directory
3014+ // This function removes an object version or delete marker with a null version ID inside .version/ directory
29663015 async _delete_null_version_from_versions_directory ( key , fs_context ) {
29673016 const is_gpfs = native_fs_utils . _is_gpfs ( fs_context ) ;
29683017 let retries = config . NSFS_RENAME_RETRIES ;
@@ -3098,7 +3147,7 @@ class NamespaceFS {
30983147 dst_file = await native_fs_utils . open_file ( fs_context , this . bucket_path , dst_path , 'r' ) ;
30993148 }
31003149 return {
3101- move_to_versions : { src_file : dst_file , dir_file, dst_file : versioned_file } ,
3150+ move_to_versions : { src_file : dst_file , dir_file, should_override : false } ,
31023151 move_to_dst : { src_file, dst_file, dir_file, versioned_file }
31033152 } ;
31043153 } catch ( err ) {
0 commit comments