Skip to content

Commit 0d72898

Browse files
committed
store: Fix ordering issues with different id types
1 parent 1ff44c6 commit 0d72898

File tree

3 files changed

+137
-22
lines changed

3 files changed

+137
-22
lines changed

store/postgres/src/relational.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -589,7 +589,7 @@ impl Layout {
589589
while lower_now.is_some() || upper_now.is_some() {
590590
let (ewt, block) = match (lower_now, upper_now) {
591591
(Some(lower), Some(upper)) => {
592-
match lower.cmp(&upper) {
592+
match lower.compare_with(&upper, &IdType::String)? {
593593
std::cmp::Ordering::Greater => {
594594
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
595595
let (ewt, block) = transform(upper, EntityOperationKind::Delete)?;

store/postgres/src/relational_queries.rs

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -558,45 +558,78 @@ impl EntityData {
558558
}
559559
}
560560

561-
#[derive(QueryableByName, Clone, Debug, Default, Eq)]
561+
#[derive(QueryableByName, Clone, Debug, Default)]
562562
pub struct EntityDataExt {
563563
#[diesel(sql_type = Text)]
564564
pub entity: String,
565565
#[diesel(sql_type = Jsonb)]
566566
pub data: serde_json::Value,
567567
#[diesel(sql_type = Integer)]
568568
pub block_number: i32,
569-
#[diesel(sql_type = Text)]
570-
pub id: String,
569+
#[diesel(sql_type = Binary)]
570+
pub id: Vec<u8>,
571571
#[diesel(sql_type = BigInt)]
572572
pub vid: i64,
573573
}
574574

575-
impl Ord for EntityDataExt {
576-
fn cmp(&self, other: &Self) -> Ordering {
577-
let ord = self.block_number.cmp(&other.block_number);
578-
if ord != Ordering::Equal {
579-
ord
580-
} else {
581-
let ord = self.entity.cmp(&other.entity);
582-
if ord != Ordering::Equal {
583-
ord
584-
} else {
585-
self.id.cmp(&other.id)
586-
}
575+
#[derive(Debug)]
576+
pub enum IdCompareError {
577+
InvalidUtf8(std::string::FromUtf8Error),
578+
InvalidInt8Length(usize),
579+
}
580+
581+
impl From<std::string::FromUtf8Error> for IdCompareError {
582+
fn from(e: std::string::FromUtf8Error) -> Self {
583+
IdCompareError::InvalidUtf8(e)
584+
}
585+
}
586+
587+
impl From<IdCompareError> for anyhow::Error {
588+
fn from(e: IdCompareError) -> Self {
589+
match e {
590+
IdCompareError::InvalidUtf8(e) => anyhow::anyhow!("Invalid UTF-8 in ID: {}", e),
591+
IdCompareError::InvalidInt8Length(len) => anyhow::anyhow!("Invalid Int8 ID length: expected 8 bytes, got {}", len),
587592
}
588593
}
589594
}
590595

591-
impl PartialOrd for EntityDataExt {
592-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
593-
Some(self.cmp(other))
596+
impl From<IdCompareError> for StoreError {
597+
fn from(e: IdCompareError) -> Self {
598+
StoreError::Unknown(anyhow::Error::from(e))
594599
}
595600
}
596601

597-
impl PartialEq for EntityDataExt {
598-
fn eq(&self, other: &Self) -> bool {
599-
self.cmp(other) == Ordering::Equal
602+
impl EntityDataExt {
603+
pub fn compare_with(&self, other: &Self, idtype: &IdType) -> Result<Ordering, IdCompareError> {
604+
let ord = self.block_number.cmp(&other.block_number);
605+
if ord != Ordering::Equal {
606+
return Ok(ord);
607+
}
608+
let ord = self.entity.cmp(&other.entity);
609+
if ord != Ordering::Equal {
610+
return Ok(ord);
611+
}
612+
613+
match idtype {
614+
IdType::String => {
615+
let self_str = String::from_utf8(self.id.clone()).map_err(IdCompareError::InvalidUtf8)?;
616+
let other_str = String::from_utf8(other.id.clone()).map_err(IdCompareError::InvalidUtf8)?;
617+
Ok(self_str.cmp(&other_str))
618+
}
619+
IdType::Bytes => Ok(self.id.cmp(&other.id)),
620+
IdType::Int8 => {
621+
if self.id.len() < 8 {
622+
return Err(IdCompareError::InvalidInt8Length(self.id.len()));
623+
}
624+
if other.id.len() < 8 {
625+
return Err(IdCompareError::InvalidInt8Length(other.id.len()));
626+
}
627+
628+
let self_int = i64::from_be_bytes(self.id[..8].try_into().unwrap());
629+
let other_int = i64::from_be_bytes(other.id[..8].try_into().unwrap());
630+
Ok(self_int.cmp(&other_int))
631+
}
632+
}
600633
}
601634
}
602635

store/test-store/tests/postgres/writable.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,20 @@ const SCHEMA_GQL: &str = "
2929
id: ID!,
3030
count: Int!,
3131
}
32+
type PoolCreated @entity(immutable: true) {
33+
id: Bytes!,
34+
token0: Bytes!,
35+
token1: Bytes!,
36+
fee: Int!,
37+
tickSpacing: Int!,
38+
pool: Bytes!,
39+
blockNumber: BigInt!,
40+
blockTimestamp: BigInt!,
41+
transactionHash: Bytes!,
42+
transactionFrom: Bytes!,
43+
transactionGasPrice: BigInt!,
44+
logIndex: BigInt!
45+
}
3246
";
3347

3448
const COUNTER: &str = "Counter";
@@ -406,3 +420,71 @@ fn read_immutable_only_range_test() {
406420
assert_eq!(e.len(), 4);
407421
})
408422
}
423+
424+
#[test]
425+
fn read_range_pool_created_test() {
426+
run_test(|store, writable, sourceable, deployment| async move {
427+
let result_entities = vec![
428+
format!("(1, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369621), blockTimestamp: BigInt(1620243254), fee: Int(500), id: Bytes(0xff80818283848586), logIndex: BigInt(0), pool: Bytes(0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8), tickSpacing: Int(10), token0: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000000) }}, vid: 1 }}])"),
429+
format!("(2, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369622), blockTimestamp: BigInt(1620243255), fee: Int(3000), id: Bytes(0xff90919293949596), logIndex: BigInt(1), pool: Bytes(0x4585fe77225b41b697c938b018e2ac67ac5a20c0), tickSpacing: Int(60), token0: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000001) }}, vid: 2 }}])"),
430+
];
431+
432+
// Rest of the test remains the same
433+
let subgraph_store = store.subgraph_store();
434+
writable.deployment_synced().unwrap();
435+
436+
let pool_created_type = TEST_SUBGRAPH_SCHEMA.entity_type("PoolCreated").unwrap();
437+
let entity_types = vec![pool_created_type.clone()];
438+
439+
for count in (1..=2).map(|x| x as i64) {
440+
let id = if count == 1 {
441+
"0xff80818283848586"
442+
} else {
443+
"0xff90919293949596"
444+
};
445+
446+
let data = entity! { TEST_SUBGRAPH_SCHEMA =>
447+
id: id,
448+
token0: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
449+
token1: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
450+
fee: if count == 1 { 500 } else { 3000 },
451+
tickSpacing: if count == 1 { 10 } else { 60 },
452+
pool: if count == 1 { "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" } else { "0x4585fe77225b41b697c938b018e2ac67ac5a20c0" },
453+
blockNumber: 12369621 + count - 1,
454+
blockTimestamp: 1620243254 + count - 1,
455+
transactionHash: format!("0x1234{:0>76}", if count == 1 { "0" } else { "1" }),
456+
transactionFrom: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
457+
transactionGasPrice: 100000000000i64,
458+
logIndex: count - 1
459+
};
460+
461+
let key = pool_created_type.parse_key(id).unwrap();
462+
let op = EntityOperation::Set {
463+
key: key.clone(),
464+
data: EntityV::new(data, count),
465+
};
466+
467+
transact_entity_operations(
468+
&subgraph_store,
469+
&deployment,
470+
block_pointer(count as u8),
471+
vec![op],
472+
)
473+
.await
474+
.unwrap();
475+
}
476+
writable.flush().await.unwrap();
477+
writable.deployment_synced().unwrap();
478+
479+
let br: Range<BlockNumber> = 0..18;
480+
let e: BTreeMap<i32, Vec<EntitySourceOperation>> = sourceable
481+
.get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone())
482+
.unwrap();
483+
assert_eq!(e.len(), 2);
484+
for en in &e {
485+
let index = *en.0 - 1;
486+
let a = result_entities[index as usize].clone();
487+
assert_eq!(a, format!("{:?}", en));
488+
}
489+
})
490+
}

0 commit comments

Comments
 (0)