Skip to content

Commit 748ff48

Browse files
committed
store: Fix ordering issues with different id types
1 parent 62d1e2b commit 748ff48

File tree

3 files changed

+105
-33
lines changed

3 files changed

+105
-33
lines changed

store/postgres/src/relational.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,13 @@ impl Layout {
576576
Ok((ewt, block))
577577
};
578578

579+
fn compare_entity_data_ext(a: &EntityDataExt, b: &EntityDataExt) -> std::cmp::Ordering {
580+
a.block_number
581+
.cmp(&b.block_number)
582+
.then_with(|| a.entity.cmp(&b.entity))
583+
.then_with(|| a.id.cmp(&b.id))
584+
}
585+
579586
// The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors
580587
// are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from
581588
// both lower_vec and upper_vec and tries to match entities that have entries in both vectors for
@@ -589,7 +596,7 @@ impl Layout {
589596
while lower_now.is_some() || upper_now.is_some() {
590597
let (ewt, block) = match (lower_now, upper_now) {
591598
(Some(lower), Some(upper)) => {
592-
match lower.cmp(&upper) {
599+
match compare_entity_data_ext(lower, upper) {
593600
std::cmp::Ordering::Greater => {
594601
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
595602
let (ewt, block) = transform(upper, EntityOperationKind::Delete)?;

store/postgres/src/relational_queries.rs

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use graph::schema::{EntityKey, EntityType, FulltextAlgorithm, FulltextConfig, In
2626
use graph::{components::store::AttributeNames, data::store::scalar};
2727
use inflector::Inflector;
2828
use itertools::Itertools;
29-
use std::cmp::Ordering;
3029
use std::collections::{BTreeMap, BTreeSet, HashSet};
3130
use std::convert::TryFrom;
3231
use std::fmt::{self, Display};
@@ -558,48 +557,20 @@ impl EntityData {
558557
}
559558
}
560559

561-
#[derive(QueryableByName, Clone, Debug, Default, Eq)]
560+
#[derive(QueryableByName, Clone, Debug, Default)]
562561
pub struct EntityDataExt {
563562
#[diesel(sql_type = Text)]
564563
pub entity: String,
565564
#[diesel(sql_type = Jsonb)]
566565
pub data: serde_json::Value,
567566
#[diesel(sql_type = Integer)]
568567
pub block_number: i32,
569-
#[diesel(sql_type = Text)]
570-
pub id: String,
568+
#[diesel(sql_type = Binary)]
569+
pub id: Vec<u8>,
571570
#[diesel(sql_type = BigInt)]
572571
pub vid: i64,
573572
}
574573

575-
impl Ord for EntityDataExt {
576-
fn cmp(&self, other: &Self) -> Ordering {
577-
let ord = self.block_number.cmp(&other.block_number);
578-
if ord != Ordering::Equal {
579-
ord
580-
} else {
581-
let ord = self.entity.cmp(&other.entity);
582-
if ord != Ordering::Equal {
583-
ord
584-
} else {
585-
self.id.cmp(&other.id)
586-
}
587-
}
588-
}
589-
}
590-
591-
impl PartialOrd for EntityDataExt {
592-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
593-
Some(self.cmp(other))
594-
}
595-
}
596-
597-
impl PartialEq for EntityDataExt {
598-
fn eq(&self, other: &Self) -> bool {
599-
self.cmp(other) == Ordering::Equal
600-
}
601-
}
602-
603574
/// The equivalent of `graph::data::store::Value` but in a form that does
604575
/// not require further transformation during `walk_ast`. This form takes
605576
/// the idiosyncrasies of how we serialize values into account (e.g., that

store/test-store/tests/postgres/writable.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,32 @@ const SCHEMA_GQL: &str = "
2929
id: ID!,
3030
count: Int!,
3131
}
32+
type BytesId @entity {
33+
id: Bytes!,
34+
value: String!
35+
}
36+
type Int8Id @entity {
37+
id: Int8!,
38+
value: String!
39+
}
40+
type StringId @entity {
41+
id: String!,
42+
value: String!
43+
}
44+
type PoolCreated @entity(immutable: true) {
45+
id: Bytes!,
46+
token0: Bytes!,
47+
token1: Bytes!,
48+
fee: Int!,
49+
tickSpacing: Int!,
50+
pool: Bytes!,
51+
blockNumber: BigInt!,
52+
blockTimestamp: BigInt!,
53+
transactionHash: Bytes!,
54+
transactionFrom: Bytes!,
55+
transactionGasPrice: BigInt!,
56+
logIndex: BigInt!
57+
}
3258
";
3359

3460
const COUNTER: &str = "Counter";
@@ -406,3 +432,71 @@ fn read_immutable_only_range_test() {
406432
assert_eq!(e.len(), 4);
407433
})
408434
}
435+
436+
#[test]
437+
fn read_range_pool_created_test() {
438+
run_test(|store, writable, sourceable, deployment| async move {
439+
let result_entities = vec![
440+
format!("(1, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369621), blockTimestamp: BigInt(1620243254), fee: Int(500), id: Bytes(0xff80818283848586), logIndex: BigInt(0), pool: Bytes(0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8), tickSpacing: Int(10), token0: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000000) }}, vid: 1 }}])"),
441+
format!("(2, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369622), blockTimestamp: BigInt(1620243255), fee: Int(3000), id: Bytes(0xff90919293949596), logIndex: BigInt(1), pool: Bytes(0x4585fe77225b41b697c938b018e2ac67ac5a20c0), tickSpacing: Int(60), token0: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000001) }}, vid: 2 }}])"),
442+
];
443+
444+
// Rest of the test remains the same
445+
let subgraph_store = store.subgraph_store();
446+
writable.deployment_synced().unwrap();
447+
448+
let pool_created_type = TEST_SUBGRAPH_SCHEMA.entity_type("PoolCreated").unwrap();
449+
let entity_types = vec![pool_created_type.clone()];
450+
451+
for count in (1..=2).map(|x| x as i64) {
452+
let id = if count == 1 {
453+
"0xff80818283848586"
454+
} else {
455+
"0xff90919293949596"
456+
};
457+
458+
let data = entity! { TEST_SUBGRAPH_SCHEMA =>
459+
id: id,
460+
token0: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
461+
token1: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
462+
fee: if count == 1 { 500 } else { 3000 },
463+
tickSpacing: if count == 1 { 10 } else { 60 },
464+
pool: if count == 1 { "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" } else { "0x4585fe77225b41b697c938b018e2ac67ac5a20c0" },
465+
blockNumber: 12369621 + count - 1,
466+
blockTimestamp: 1620243254 + count - 1,
467+
transactionHash: format!("0x1234{:0>76}", if count == 1 { "0" } else { "1" }),
468+
transactionFrom: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
469+
transactionGasPrice: 100000000000i64,
470+
logIndex: count - 1
471+
};
472+
473+
let key = pool_created_type.parse_key(id).unwrap();
474+
let op = EntityOperation::Set {
475+
key: key.clone(),
476+
data: EntityV::new(data, count),
477+
};
478+
479+
transact_entity_operations(
480+
&subgraph_store,
481+
&deployment,
482+
block_pointer(count as u8),
483+
vec![op],
484+
)
485+
.await
486+
.unwrap();
487+
}
488+
writable.flush().await.unwrap();
489+
writable.deployment_synced().unwrap();
490+
491+
let br: Range<BlockNumber> = 0..18;
492+
let e: BTreeMap<i32, Vec<EntitySourceOperation>> = sourceable
493+
.get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone())
494+
.unwrap();
495+
assert_eq!(e.len(), 2);
496+
for en in &e {
497+
let index = *en.0 - 1;
498+
let a = result_entities[index as usize].clone();
499+
assert_eq!(a, format!("{:?}", en));
500+
}
501+
})
502+
}

0 commit comments

Comments
 (0)