Skip to content

Commit 06edc2b

Browse files
committed
store: Fix ordering issues with different id types
1 parent 640e7d1 commit 06edc2b

File tree

3 files changed

+105
-33
lines changed

3 files changed

+105
-33
lines changed

store/postgres/src/relational.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -576,6 +576,13 @@ impl Layout {
576576
Ok((ewt, block))
577577
};
578578

579+
fn compare_entity_data_ext(a: &EntityDataExt, b: &EntityDataExt) -> std::cmp::Ordering {
580+
a.block_number
581+
.cmp(&b.block_number)
582+
.then_with(|| a.entity.cmp(&b.entity))
583+
.then_with(|| a.id.cmp(&b.id))
584+
}
585+
579586
// The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors
580587
// are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from
581588
// both lower_vec and upper_vec and tries to match entities that have entries in both vectors for
@@ -589,7 +596,7 @@ impl Layout {
589596
while lower_now.is_some() || upper_now.is_some() {
590597
let (ewt, block) = match (lower_now, upper_now) {
591598
(Some(lower), Some(upper)) => {
592-
match lower.cmp(&upper) {
599+
match compare_entity_data_ext(lower, upper) {
593600
std::cmp::Ordering::Greater => {
594601
// we have upper bound at this block, but no lower bounds at the same block so it's deletion
595602
let (ewt, block) = transform(upper, EntityOperationKind::Delete)?;

store/postgres/src/relational_queries.rs

Lines changed: 3 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use graph::schema::{EntityKey, EntityType, FulltextAlgorithm, FulltextConfig, In
2626
use graph::{components::store::AttributeNames, data::store::scalar};
2727
use inflector::Inflector;
2828
use itertools::Itertools;
29-
use std::cmp::Ordering;
3029
use std::collections::{BTreeMap, BTreeSet, HashSet};
3130
use std::convert::TryFrom;
3231
use std::fmt::{self, Display};
@@ -565,48 +564,20 @@ impl EntityData {
565564
}
566565
}
567566

568-
#[derive(QueryableByName, Clone, Debug, Default, Eq)]
567+
#[derive(QueryableByName, Clone, Debug, Default)]
569568
pub struct EntityDataExt {
570569
#[diesel(sql_type = Text)]
571570
pub entity: String,
572571
#[diesel(sql_type = Jsonb)]
573572
pub data: serde_json::Value,
574573
#[diesel(sql_type = Integer)]
575574
pub block_number: i32,
576-
#[diesel(sql_type = Text)]
577-
pub id: String,
575+
#[diesel(sql_type = Binary)]
576+
pub id: Vec<u8>,
578577
#[diesel(sql_type = BigInt)]
579578
pub vid: i64,
580579
}
581580

582-
impl Ord for EntityDataExt {
583-
fn cmp(&self, other: &Self) -> Ordering {
584-
let ord = self.block_number.cmp(&other.block_number);
585-
if ord != Ordering::Equal {
586-
ord
587-
} else {
588-
let ord = self.entity.cmp(&other.entity);
589-
if ord != Ordering::Equal {
590-
ord
591-
} else {
592-
self.id.cmp(&other.id)
593-
}
594-
}
595-
}
596-
}
597-
598-
impl PartialOrd for EntityDataExt {
599-
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
600-
Some(self.cmp(other))
601-
}
602-
}
603-
604-
impl PartialEq for EntityDataExt {
605-
fn eq(&self, other: &Self) -> bool {
606-
self.cmp(other) == Ordering::Equal
607-
}
608-
}
609-
610581
/// The equivalent of `graph::data::store::Value` but in a form that does
611582
/// not require further transformation during `walk_ast`. This form takes
612583
/// the idiosyncrasies of how we serialize values into account (e.g., that

store/test-store/tests/postgres/writable.rs

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,32 @@ const SCHEMA_GQL: &str = "
2828
id: ID!,
2929
count: Int!,
3030
}
31+
type BytesId @entity {
32+
id: Bytes!,
33+
value: String!
34+
}
35+
type Int8Id @entity {
36+
id: Int8!,
37+
value: String!
38+
}
39+
type StringId @entity {
40+
id: String!,
41+
value: String!
42+
}
43+
type PoolCreated @entity(immutable: true) {
44+
id: Bytes!,
45+
token0: Bytes!,
46+
token1: Bytes!,
47+
fee: Int!,
48+
tickSpacing: Int!,
49+
pool: Bytes!,
50+
blockNumber: BigInt!,
51+
blockTimestamp: BigInt!,
52+
transactionHash: Bytes!,
53+
transactionFrom: Bytes!,
54+
transactionGasPrice: BigInt!,
55+
logIndex: BigInt!
56+
}
3157
";
3258

3359
const COUNTER: &str = "Counter";
@@ -407,3 +433,71 @@ fn read_immutable_only_range_test() {
407433
assert_eq!(e.len(), 4);
408434
})
409435
}
436+
437+
#[test]
438+
fn read_range_pool_created_test() {
439+
run_test(|store, writable, sourceable, deployment| async move {
440+
let result_entities = vec![
441+
format!("(1, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369621), blockTimestamp: BigInt(1620243254), fee: Int(500), id: Bytes(0xff80818283848586), logIndex: BigInt(0), pool: Bytes(0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8), tickSpacing: Int(10), token0: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000000) }}, vid: 1 }}])"),
442+
format!("(2, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369622), blockTimestamp: BigInt(1620243255), fee: Int(3000), id: Bytes(0xff90919293949596), logIndex: BigInt(1), pool: Bytes(0x4585fe77225b41b697c938b018e2ac67ac5a20c0), tickSpacing: Int(60), token0: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000001) }}, vid: 2 }}])"),
443+
];
444+
445+
// Rest of the test remains the same
446+
let subgraph_store = store.subgraph_store();
447+
writable.deployment_synced().unwrap();
448+
449+
let pool_created_type = TEST_SUBGRAPH_SCHEMA.entity_type("PoolCreated").unwrap();
450+
let entity_types = vec![pool_created_type.clone()];
451+
452+
for count in (1..=2).map(|x| x as i64) {
453+
let id = if count == 1 {
454+
"0xff80818283848586"
455+
} else {
456+
"0xff90919293949596"
457+
};
458+
459+
let data = entity! { TEST_SUBGRAPH_SCHEMA =>
460+
id: id,
461+
token0: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
462+
token1: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2",
463+
fee: if count == 1 { 500 } else { 3000 },
464+
tickSpacing: if count == 1 { 10 } else { 60 },
465+
pool: if count == 1 { "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" } else { "0x4585fe77225b41b697c938b018e2ac67ac5a20c0" },
466+
blockNumber: 12369621 + count - 1,
467+
blockTimestamp: 1620243254 + count - 1,
468+
transactionHash: format!("0x1234{:0>76}", if count == 1 { "0" } else { "1" }),
469+
transactionFrom: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" },
470+
transactionGasPrice: 100000000000i64,
471+
logIndex: count - 1
472+
};
473+
474+
let key = pool_created_type.parse_key(id).unwrap();
475+
let op = EntityOperation::Set {
476+
key: key.clone(),
477+
data: EntityV::new(data, count),
478+
};
479+
480+
transact_entity_operations(
481+
&subgraph_store,
482+
&deployment,
483+
block_pointer(count as u8),
484+
vec![op],
485+
)
486+
.await
487+
.unwrap();
488+
}
489+
writable.flush().await.unwrap();
490+
writable.deployment_synced().unwrap();
491+
492+
let br: Range<BlockNumber> = 0..18;
493+
let e: BTreeMap<i32, Vec<EntitySourceOperation>> = sourceable
494+
.get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone())
495+
.unwrap();
496+
assert_eq!(e.len(), 2);
497+
for en in &e {
498+
let index = *en.0 - 1;
499+
let a = result_entities[index as usize].clone();
500+
assert_eq!(a, format!("{:?}", en));
501+
}
502+
})
503+
}

0 commit comments

Comments
 (0)