@@ -983,174 +983,192 @@ describe('Change Streams', function () {
983
983
} ) ;
984
984
985
985
describe ( '#asyncIterator' , function ( ) {
986
- it (
987
- 'can iterate through changes' ,
988
- { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
989
- async function ( ) {
990
- changeStream = collection . watch ( [ ] ) ;
991
- await initIteratorMode ( changeStream ) ;
986
+ describe ( 'for-await iteration' , function ( ) {
987
+ it (
988
+ 'can iterate through changes' ,
989
+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
990
+ async function ( ) {
991
+ changeStream = collection . watch ( [ ] ) ;
992
+ await initIteratorMode ( changeStream ) ;
992
993
993
- const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
994
- await collection . insertMany ( docs ) ;
994
+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
995
+ await collection . insertMany ( docs ) ;
995
996
996
- for await ( const change of changeStream ) {
997
- const { fullDocument } = change ;
998
- const expectedDoc = docs . shift ( ) ;
999
- expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
1000
- if ( docs . length === 0 ) {
1001
- break ;
997
+ for await ( const change of changeStream ) {
998
+ const { fullDocument } = change ;
999
+ const expectedDoc = docs . shift ( ) ;
1000
+ expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
1001
+ if ( docs . length === 0 ) {
1002
+ break ;
1003
+ }
1002
1004
}
1003
- }
1004
-
1005
- expect ( docs ) . to . have . length ( 0 , 'expected to find all docs before exiting loop' ) ;
1006
- }
1007
- ) ;
1008
1005
1009
- it (
1010
- 'should close the change stream when return is called' ,
1011
- { requires : { topology : '!single' } } ,
1012
- async function ( ) {
1013
- changeStream = collection . watch ( [ ] ) ;
1014
- await initIteratorMode ( changeStream ) ;
1015
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1016
-
1017
- const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
1018
- await collection . insertMany ( docs ) ;
1006
+ expect ( docs ) . to . have . length ( 0 , 'expected to find all docs before exiting loop' ) ;
1007
+ }
1008
+ ) ;
1019
1009
1020
- await changeStreamIterator . next ( ) ;
1021
- await changeStreamIterator . return ( ) ;
1022
- expect ( changeStream . closed ) . to . be . true ;
1023
- expect ( changeStream . cursor ) . property ( 'closed' , true ) ;
1024
- }
1025
- ) ;
1010
+ it (
1011
+ 'cannot be resumed from partial iteration' ,
1012
+ { requires : { topology : '!single' } } ,
1013
+ async function ( ) {
1014
+ changeStream = collection . watch ( [ ] ) ;
1015
+ await initIteratorMode ( changeStream ) ;
1026
1016
1027
- it (
1028
- 'should close the change stream when an error is thrown' ,
1029
- { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
1030
- async function ( ) {
1031
- changeStream = collection . watch ( [ ] ) ;
1032
- await initIteratorMode ( changeStream ) ;
1033
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1017
+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
1018
+ await collection . insertMany ( docs ) ;
1034
1019
1035
- const unresumableErrorCode = 1000 ;
1036
- await client . db ( 'admin' ) . command ( {
1037
- configureFailPoint : is4_2Server ( this . configuration . version )
1038
- ? 'failCommand'
1039
- : 'failGetMoreAfterCursorCheckout' ,
1040
- mode : { times : 1 } ,
1041
- data : {
1042
- failCommands : [ 'getMore' ] ,
1043
- errorCode : unresumableErrorCode
1020
+ for await ( const change of changeStream ) {
1021
+ const { fullDocument } = change ;
1022
+ const expectedDoc = docs . shift ( ) ;
1023
+ expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
1024
+ break ;
1025
+ }
1026
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
1027
+ for await ( const change of changeStream ) {
1028
+ expect . fail ( 'Change stream was resumed after partial iteration' ) ;
1044
1029
}
1045
- } as FailPoint ) ;
1046
1030
1047
- await collection . insertOne ( { city : 'New York City' } ) ;
1048
- try {
1049
- await changeStreamIterator . next ( ) ;
1050
- expect . fail (
1051
- 'Change stream did not throw unresumable error and did not produce any events'
1031
+ expect ( docs ) . to . have . length (
1032
+ 2 ,
1033
+ 'expected to find remaining docs after partial iteration'
1052
1034
) ;
1053
- } catch {
1054
- expect ( changeStream . closed ) . to . be . true ;
1055
- expect ( changeStream . cursor ) . property ( 'closed' , true ) ;
1056
1035
}
1057
- }
1058
- ) ;
1036
+ ) ;
1059
1037
1060
- it (
1061
- 'should not produce events on closed stream ',
1062
- { requires : { topology : '!single' } } ,
1063
- async function ( ) {
1064
- changeStream = collection . watch ( [ ] ) ;
1065
- changeStream . close ( ) ;
1038
+ it (
1039
+ 'cannot be used with emitter-based iteration ',
1040
+ { requires : { topology : '!single' } } ,
1041
+ async function ( ) {
1042
+ changeStream = collection . watch ( [ ] ) ;
1043
+ changeStream . on ( 'change' , sinon . stub ( ) ) ;
1066
1044
1067
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1068
- const change = await changeStreamIterator . next ( ) ;
1045
+ try {
1046
+ // eslint-disable-next-line @typescript-eslint/no-unused-vars
1047
+ for await ( const change of changeStream ) {
1048
+ expect . fail ( 'Async iterator was used with emitter-based iteration' ) ;
1049
+ }
1050
+ } catch ( error ) {
1051
+ expect ( error ) . to . be . instanceOf ( MongoAPIError ) ;
1052
+ }
1053
+ }
1054
+ ) ;
1069
1055
1070
- expect ( change . value ) . to . be . undefined ;
1071
- }
1072
- ) ;
1056
+ it (
1057
+ 'can be used with raw iterator API' ,
1058
+ { requires : { topology : '!single' } } ,
1059
+ async function ( ) {
1060
+ changeStream = collection . watch ( [ ] ) ;
1061
+ await initIteratorMode ( changeStream ) ;
1073
1062
1074
- it (
1075
- 'cannot be used with emitter-based iteration' ,
1076
- { requires : { topology : '!single' } } ,
1077
- async function ( ) {
1078
- changeStream = collection . watch ( [ ] ) ;
1079
- changeStream . on ( 'change' , sinon . stub ( ) ) ;
1080
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1063
+ const docs = [ { city : 'Los Angeles' } , { city : 'Miami' } ] ;
1064
+ await collection . insertMany ( docs ) ;
1081
1065
1082
- const error = await changeStreamIterator . next ( ) . catch ( e => e ) ;
1083
- expect ( error ) . to . be . instanceOf ( MongoAPIError ) ;
1084
- }
1085
- ) ;
1066
+ await changeStream . next ( ) ;
1067
+ docs . shift ( ) ;
1086
1068
1087
- it (
1088
- 'can be used with raw iterator API' ,
1089
- { requires : { topology : '!single' } } ,
1090
- async function ( ) {
1091
- changeStream = collection . watch ( [ ] ) ;
1092
- await initIteratorMode ( changeStream ) ;
1093
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1069
+ try {
1070
+ for await ( const change of changeStream ) {
1071
+ const { fullDocument } = change ;
1072
+ const expectedDoc = docs . shift ( ) ;
1073
+ expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
1094
1074
1095
- const docs = [ { city : 'Los Angeles' } , { city : 'Miami' } ] ;
1096
- await collection . insertMany ( docs ) ;
1075
+ if ( docs . length === 0 ) {
1076
+ break ;
1077
+ }
1078
+ }
1079
+ } catch {
1080
+ expect . fail ( 'Async could not be used with raw iterator API' ) ;
1081
+ }
1082
+ }
1083
+ ) ;
1084
+ } ) ;
1097
1085
1098
- await changeStream . next ( ) ;
1086
+ describe ( '#return' , function ( ) {
1087
+ it (
1088
+ 'should close the change stream when return is called' ,
1089
+ { requires : { topology : '!single' } } ,
1090
+ async function ( ) {
1091
+ changeStream = collection . watch ( [ ] ) ;
1092
+ await initIteratorMode ( changeStream ) ;
1093
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1099
1094
1100
- try {
1101
- const change = await changeStreamIterator . next ( ) ;
1102
- expect ( change . value ) . to . not . be . undefined ;
1095
+ const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
1096
+ await collection . insertMany ( docs ) ;
1103
1097
1104
- const { fullDocument } = change . value ;
1105
- expect ( fullDocument . city ) . to . equal ( docs [ 1 ] . city ) ;
1106
- } catch {
1107
- expect . fail ( 'Async could not be used with raw iterator API' ) ;
1098
+ await changeStreamIterator . next ( ) ;
1099
+ await changeStreamIterator . return ( ) ;
1100
+ expect ( changeStream . closed ) . to . be . true ;
1101
+ expect ( changeStream . cursor ) . property ( 'closed' , true ) ;
1108
1102
}
1109
- }
1110
- ) ;
1103
+ ) ;
1111
1104
1112
- it (
1113
- 'ignores errors thrown from close' ,
1114
- { requires : { topology : '!single' } } ,
1115
- async function ( ) {
1116
- changeStream = collection . watch ( [ ] ) ;
1117
- await initIteratorMode ( changeStream ) ;
1118
- const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1105
+ it (
1106
+ 'ignores errors thrown from close' ,
1107
+ { requires : { topology : '!single' } } ,
1108
+ async function ( ) {
1109
+ changeStream = collection . watch ( [ ] ) ;
1110
+ await initIteratorMode ( changeStream ) ;
1111
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1119
1112
1120
- sinon . stub ( changeStream . cursor , 'close' ) . throws ( new MongoAPIError ( 'testing' ) ) ;
1113
+ sinon . stub ( changeStream . cursor , 'close' ) . throws ( new MongoAPIError ( 'testing' ) ) ;
1121
1114
1122
- try {
1123
- await changeStreamIterator . return ( ) ;
1124
- } catch {
1125
- expect . fail ( 'Async iterator threw an error on close' ) ;
1115
+ try {
1116
+ await changeStreamIterator . return ( ) ;
1117
+ } catch {
1118
+ expect . fail ( 'Async iterator threw an error on close' ) ;
1119
+ }
1126
1120
}
1127
- }
1128
- ) ;
1121
+ ) ;
1122
+ } ) ;
1129
1123
1130
- it (
1131
- 'cannot be resumed from partial iteration' ,
1132
- { requires : { topology : '!single' } } ,
1133
- async function ( ) {
1134
- changeStream = collection . watch ( [ ] ) ;
1135
- await initIteratorMode ( changeStream ) ;
1124
+ describe ( '#next' , function ( ) {
1125
+ it (
1126
+ 'should close the change stream when an error is thrown' ,
1127
+ { requires : { topology : '!single' , mongodb : '>=4.2' } } ,
1128
+ async function ( ) {
1129
+ changeStream = collection . watch ( [ ] ) ;
1130
+ await initIteratorMode ( changeStream ) ;
1131
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1136
1132
1137
- const docs = [ { city : 'New York City' } , { city : 'Seattle' } , { city : 'Boston' } ] ;
1138
- await collection . insertMany ( docs ) ;
1133
+ const unresumableErrorCode = 1000 ;
1134
+ await client . db ( 'admin' ) . command ( {
1135
+ configureFailPoint : is4_2Server ( this . configuration . version )
1136
+ ? 'failCommand'
1137
+ : 'failGetMoreAfterCursorCheckout' ,
1138
+ mode : { times : 1 } ,
1139
+ data : {
1140
+ failCommands : [ 'getMore' ] ,
1141
+ errorCode : unresumableErrorCode
1142
+ }
1143
+ } as FailPoint ) ;
1139
1144
1140
- for await ( const change of changeStream ) {
1141
- const { fullDocument } = change ;
1142
- const expectedDoc = docs . shift ( ) ;
1143
- expect ( fullDocument . city ) . to . equal ( expectedDoc . city ) ;
1144
- break ;
1145
- }
1146
- // eslint-disable-next-line @typescript-eslint/no-unused-vars
1147
- for await ( const change of changeStream ) {
1148
- expect . fail ( 'Change stream was resumed after partial iteration' ) ;
1145
+ await collection . insertOne ( { city : 'New York City' } ) ;
1146
+ try {
1147
+ await changeStreamIterator . next ( ) ;
1148
+ expect . fail (
1149
+ 'Change stream did not throw unresumable error and did not produce any events'
1150
+ ) ;
1151
+ } catch {
1152
+ expect ( changeStream . closed ) . to . be . true ;
1153
+ expect ( changeStream . cursor ) . property ( 'closed' , true ) ;
1154
+ }
1149
1155
}
1156
+ ) ;
1150
1157
1151
- expect ( docs ) . to . have . length ( 2 , 'expected to find remaining docs after partial iteration' ) ;
1152
- }
1153
- ) ;
1158
+ it (
1159
+ 'should not produce events on closed stream' ,
1160
+ { requires : { topology : '!single' } } ,
1161
+ async function ( ) {
1162
+ changeStream = collection . watch ( [ ] ) ;
1163
+ changeStream . close ( ) ;
1164
+
1165
+ const changeStreamIterator = changeStream [ Symbol . asyncIterator ] ( ) ;
1166
+ const change = await changeStreamIterator . next ( ) ;
1167
+
1168
+ expect ( change . value ) . to . be . undefined ;
1169
+ }
1170
+ ) ;
1171
+ } ) ;
1154
1172
} ) ;
1155
1173
} ) ;
1156
1174
0 commit comments