1- use graph:: blockchain:: block_stream:: FirehoseCursor ;
1+ use graph:: blockchain:: block_stream:: { EntityWithType , FirehoseCursor } ;
22use graph:: data:: subgraph:: schema:: DeploymentCreate ;
33use graph:: data:: value:: Word ;
44use graph:: data_source:: CausalityRegion ;
55use graph:: schema:: { EntityKey , EntityType , InputSchema } ;
66use lazy_static:: lazy_static;
7- use std:: collections:: BTreeSet ;
7+ use std:: collections:: { BTreeMap , BTreeSet } ;
88use std:: marker:: PhantomData ;
99use std:: ops:: Range ;
1010use test_store:: * ;
@@ -123,49 +123,40 @@ async fn insert_count(
123123 deployment : & DeploymentLocator ,
124124 block : u8 ,
125125 count : u8 ,
126- counter_type : & EntityType ,
127- id : & str ,
126+ immutable : bool ,
128127) {
129- let count_key_local = |id : & str | counter_type. parse_key ( id) . unwrap ( ) ;
128+ let count_key_local = |counter_type : & EntityType , id : & str | counter_type. parse_key ( id) . unwrap ( ) ;
130129 let data = entity ! { TEST_SUBGRAPH_SCHEMA =>
131- id: id ,
132- count : count as i32 ,
130+ id: "1" ,
131+ count: count as i32
133132 } ;
134- let entity_op = EntityOperation :: Set {
135- key : count_key_local ( & data. get ( "id" ) . unwrap ( ) . to_string ( ) ) ,
136- data,
133+ let entity_op = if ( block != 3 && block != 5 && block != 7 ) || !immutable {
134+ EntityOperation :: Set {
135+ key : count_key_local ( & COUNTER_TYPE , & data. get ( "id" ) . unwrap ( ) . to_string ( ) ) ,
136+ data,
137+ }
138+ } else {
139+ EntityOperation :: Remove {
140+ key : count_key_local ( & COUNTER_TYPE , & data. get ( "id" ) . unwrap ( ) . to_string ( ) ) ,
141+ }
137142 } ;
138- transact_entity_operations ( store, deployment, block_pointer ( block) , vec ! [ entity_op] )
143+ let mut ops = vec ! [ entity_op] ;
144+ if immutable && block < 6 {
145+ let data = entity ! { TEST_SUBGRAPH_SCHEMA =>
146+ id: & block. to_string( ) ,
147+ count : count as i32 ,
148+ } ;
149+ let entity_op = EntityOperation :: Set {
150+ key : count_key_local ( & COUNTER2_TYPE , & data. get ( "id" ) . unwrap ( ) . to_string ( ) ) ,
151+ data,
152+ } ;
153+ ops. push ( entity_op) ;
154+ }
155+ transact_entity_operations ( store, deployment, block_pointer ( block) , ops)
139156 . await
140157 . unwrap ( ) ;
141158}
142159
143- async fn insert_count_mutable (
144- store : & Arc < DieselSubgraphStore > ,
145- deployment : & DeploymentLocator ,
146- block : u8 ,
147- count : u8 ,
148- ) {
149- insert_count ( store, deployment, block, count, & COUNTER_TYPE , "1" ) . await ;
150- }
151-
152- async fn insert_count_immutable (
153- store : & Arc < DieselSubgraphStore > ,
154- deployment : & DeploymentLocator ,
155- block : u8 ,
156- count : u8 ,
157- ) {
158- insert_count (
159- store,
160- deployment,
161- block,
162- count,
163- & COUNTER2_TYPE ,
164- & ( block / 2 ) . to_string ( ) ,
165- )
166- . await ;
167- }
168-
169160async fn pause_writer ( deployment : & DeploymentLocator ) {
170161 flush ( deployment) . await . unwrap ( ) ;
171162 writable:: allow_steps ( deployment, 0 ) . await ;
@@ -191,13 +182,13 @@ where
191182 }
192183
193184 for count in 1 ..4 {
194- insert_count_mutable ( & subgraph_store, & deployment, count, count) . await ;
185+ insert_count ( & subgraph_store, & deployment, count, count, false ) . await ;
195186 }
196187
197188 // Test reading back with pending writes to the same entity
198189 pause_writer ( & deployment) . await ;
199190 for count in 4 ..7 {
200- insert_count_mutable ( & subgraph_store, & deployment, count, count) . await ;
191+ insert_count ( & subgraph_store, & deployment, count, count, false ) . await ;
201192 }
202193 assert_eq ! ( 6 , read_count( ) ) ;
203194
@@ -206,7 +197,7 @@ where
206197
207198 // Test reading back with pending writes and a pending revert
208199 for count in 7 ..10 {
209- insert_count_mutable ( & subgraph_store, & deployment, count, count) . await ;
200+ insert_count ( & subgraph_store, & deployment, count, count, false ) . await ;
210201 }
211202 writable
212203 . revert_block_operations ( block_pointer ( 2 ) , FirehoseCursor :: None )
@@ -331,19 +322,46 @@ fn restart() {
331322#[ test]
332323fn read_range_test ( ) {
333324 run_test ( |store, writable, deployment| async move {
325+ let result_entities = vec ! [
326+ r#"(1, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(2), id: String("1") }, vid: 1 }])"# ,
327+ r#"(2, [EntityWithType { entity_op: Modify, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(4), id: String("2") }, vid: 2 }])"# ,
328+ r#"(3, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(4), id: String("1") }, vid: 2 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(6), id: String("3") }, vid: 3 }])"# ,
329+ r#"(4, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(8), id: String("4") }, vid: 4 }])"# ,
330+ r#"(5, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(8), id: String("1") }, vid: 3 }, EntityWithType { entity_op: Create, entity_type: EntityType(Counter2), entity: Entity { count: Int(10), id: String("5") }, vid: 5 }])"# ,
331+ r#"(6, [EntityWithType { entity_op: Create, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"# ,
332+ r#"(7, [EntityWithType { entity_op: Delete, entity_type: EntityType(Counter), entity: Entity { count: Int(12), id: String("1") }, vid: 4 }])"# ,
333+ ] ;
334334 let subgraph_store = store. subgraph_store ( ) ;
335335 writable. deployment_synced ( ) . unwrap ( ) ;
336336
337- for count in 1 ..=7 {
338- insert_count_mutable ( & subgraph_store, & deployment, 2 * count, 4 * count) . await ;
339- insert_count_immutable ( & subgraph_store, & deployment, 2 * count + 1 , 4 * count) . await ;
337+ for count in 1 ..=5 {
338+ insert_count ( & subgraph_store, & deployment, count, 2 * count, true ) . await ;
340339 }
341340 writable. flush ( ) . await . unwrap ( ) ;
342-
343- let br: Range < BlockNumber > = 4 ..8 ;
344341 writable. deployment_synced ( ) . unwrap ( ) ;
342+
343+ let br: Range < BlockNumber > = 0 ..18 ;
345344 let entity_types = vec ! [ COUNTER_TYPE . clone( ) , COUNTER2_TYPE . clone( ) ] ;
346- let e = writable. get_range ( entity_types, br) . unwrap ( ) ;
347- assert_eq ! ( e. len( ) , 5 ) // TODO: fix it - it should be 4 as the range is open
345+ let e: BTreeMap < i32 , Vec < EntityWithType > > = writable
346+ . get_range ( entity_types. clone ( ) , br. clone ( ) )
347+ . unwrap ( ) ;
348+ assert_eq ! ( e. len( ) , 5 ) ;
349+ for en in & e {
350+ let index = * en. 0 - 1 ;
351+ let a = result_entities[ index as usize ] ;
352+ assert_eq ! ( a, format!( "{:?}" , en) ) ;
353+ }
354+ for count in 6 ..=7 {
355+ insert_count ( & subgraph_store, & deployment, count, 2 * count, true ) . await ;
356+ }
357+ writable. flush ( ) . await . unwrap ( ) ;
358+ writable. deployment_synced ( ) . unwrap ( ) ;
359+ let e: BTreeMap < i32 , Vec < EntityWithType > > = writable. get_range ( entity_types, br) . unwrap ( ) ;
360+ assert_eq ! ( e. len( ) , 7 ) ;
361+ for en in & e {
362+ let index = * en. 0 - 1 ;
363+ let a = result_entities[ index as usize ] ;
364+ assert_eq ! ( a, format!( "{:?}" , en) ) ;
365+ }
348366 } )
349367}
0 commit comments