Skip to content

Commit 061b9c7

Browse files
committed
Subgraph composition: Rework generation of the VID
1 parent cca4d6f commit 061b9c7

File tree

25 files changed

+537
-279
lines changed

25 files changed

+537
-279
lines changed

chain/substreams/src/trigger.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,7 @@ where
240240
state.entity_cache.set(
241241
key,
242242
entity,
243+
block.number,
243244
Some(&mut state.write_capacity_remaining),
244245
)?;
245246
}

core/src/subgraph/runner.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1670,6 +1670,7 @@ async fn update_proof_of_indexing(
16701670
key: EntityKey,
16711671
digest: Bytes,
16721672
block_time: BlockTime,
1673+
block: BlockNumber,
16731674
) -> Result<(), Error> {
16741675
let digest_name = entity_cache.schema.poi_digest();
16751676
let mut data = vec![
@@ -1684,11 +1685,12 @@ async fn update_proof_of_indexing(
16841685
data.push((entity_cache.schema.poi_block_time(), block_time));
16851686
}
16861687
let poi = entity_cache.make_entity(data)?;
1687-
entity_cache.set(key, poi, None)
1688+
entity_cache.set(key, poi, block, None)
16881689
}
16891690

16901691
let _section_guard = stopwatch.start_section("update_proof_of_indexing");
16911692

1693+
let block_number = proof_of_indexing.get_block();
16921694
let mut proof_of_indexing = proof_of_indexing.take();
16931695

16941696
for (causality_region, stream) in proof_of_indexing.drain() {
@@ -1724,6 +1726,7 @@ async fn update_proof_of_indexing(
17241726
entity_key,
17251727
updated_proof_of_indexing,
17261728
block_time,
1729+
block_number,
17271730
)?;
17281731
}
17291732

graph/src/components/store/entity_cache.rs

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use anyhow::anyhow;
1+
use anyhow::{anyhow, bail};
22
use std::borrow::Borrow;
33
use std::collections::HashMap;
44
use std::fmt::{self, Debug};
@@ -17,6 +17,10 @@ use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError};
1717

1818
pub type EntityLfuCache = LfuCache<EntityKey, Option<Arc<Entity>>>;
1919

20+
// Number of VIDs that are reserved outside of the generated ones here.
21+
// Currently none is used, but lets reserve a few more.
22+
const RESERVED_VIDS: u32 = 100;
23+
2024
/// The scope in which the `EntityCache` should perform a `get` operation
2125
pub enum GetScope {
2226
/// Get from all previously stored entities in the store
@@ -105,6 +109,10 @@ pub struct EntityCache {
105109
/// generated IDs, the `EntityCache` needs to be newly instantiated for
106110
/// each block
107111
seq: u32,
112+
113+
// Sequence number of the next VID value for this block. The value written
114+
// in the database consist of a block number and this SEQ number.
115+
pub vid_seq: u32,
108116
}
109117

110118
impl Debug for EntityCache {
@@ -132,6 +140,7 @@ impl EntityCache {
132140
schema: store.input_schema(),
133141
store,
134142
seq: 0,
143+
vid_seq: RESERVED_VIDS,
135144
}
136145
}
137146

@@ -152,6 +161,7 @@ impl EntityCache {
152161
schema: store.input_schema(),
153162
store,
154163
seq: 0,
164+
vid_seq: RESERVED_VIDS,
155165
}
156166
}
157167

@@ -353,14 +363,14 @@ impl EntityCache {
353363
&mut self,
354364
key: EntityKey,
355365
entity: Entity,
366+
block: BlockNumber,
356367
write_capacity_remaining: Option<&mut usize>,
357368
) -> Result<(), anyhow::Error> {
358369
// check the validate for derived fields
359370
let is_valid = entity.validate(&key).is_ok();
360371

361372
if let Some(write_capacity_remaining) = write_capacity_remaining {
362373
let weight = entity.weight();
363-
364374
if !self.current.contains_key(&key) && weight > *write_capacity_remaining {
365375
return Err(anyhow!(
366376
"exceeded block write limit when writing entity `{}`",
@@ -371,6 +381,21 @@ impl EntityCache {
371381
*write_capacity_remaining -= weight;
372382
}
373383

384+
// The next VID is based on a block number and a sequence within the block
385+
let vid = ((block as i64) << 32) + self.vid_seq as i64;
386+
self.vid_seq += 1;
387+
let mut entity = entity;
388+
let old_vid = entity.set_vid(vid).expect("the vid should be set");
389+
// Make sure that there was no VID previously set for this entity.
390+
if let Some(ovid) = old_vid {
391+
bail!(
392+
"VID: {} of entity: {} with ID: {} was already present when set in EntityCache",
393+
ovid,
394+
key.entity_type,
395+
entity.id()
396+
);
397+
}
398+
374399
self.entity_op(key.clone(), EntityOp::Update(entity));
375400

376401
// The updates we were given are not valid by themselves; force a
@@ -507,7 +532,7 @@ impl EntityCache {
507532
// Entity was removed and then updated, so it will be overwritten
508533
(Some(current), EntityOp::Overwrite(data)) => {
509534
let data = Arc::new(data);
510-
self.current.insert(key.clone(), Some(data.clone()));
535+
self.current.insert(key.clone(), Some(data.cheap_clone()));
511536
if current != data {
512537
Some(Overwrite {
513538
key,

graph/src/components/subgraph/proof_of_indexing/online.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,10 @@ impl ProofOfIndexing {
242242
pub fn take(self) -> HashMap<Id, BlockEventStream> {
243243
self.per_causality_region
244244
}
245+
246+
pub fn get_block(&self) -> BlockNumber {
247+
self.block_number
248+
}
245249
}
246250

247251
pub struct ProofOfIndexingFinisher {

graph/src/data/store/mod.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ use crate::{
33
derive::CacheWeight,
44
prelude::{lazy_static, q, r, s, CacheWeight, QueryExecutionError},
55
runtime::gas::{Gas, GasSizeOf},
6-
schema::{EntityKey, EntityType},
6+
schema::{input::VID_FIELD, EntityKey, EntityType},
77
util::intern::{self, AtomPool},
88
util::intern::{Error as InternError, NullValue, Object},
99
};
@@ -910,6 +910,29 @@ impl Entity {
910910
Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value")
911911
}
912912

913+
/// Return the VID of this entity and if its missing or of a type different than
914+
/// i64 it panics.
915+
pub fn vid(&self) -> i64 {
916+
self.get(VID_FIELD)
917+
.expect("the vid must be set")
918+
.as_int8()
919+
.expect("the vid must be set to a valid value")
920+
}
921+
922+
/// Sets the VID of the entity. The previous one is returned.
923+
pub fn set_vid(&mut self, value: i64) -> Result<Option<Value>, InternError> {
924+
self.0.insert(VID_FIELD, value.into())
925+
}
926+
927+
/// Sets the VID if it's not already set. Should be used only for tests.
928+
#[cfg(debug_assertions)]
929+
pub fn set_vid_if_empty(&mut self) {
930+
let vid = self.get(VID_FIELD);
931+
if vid.is_none() {
932+
let _ = self.set_vid(100).expect("the vid should be set");
933+
}
934+
}
935+
913936
/// Merges an entity update `update` into this entity.
914937
///
915938
/// If a key exists in both entities, the value from `update` is chosen.

graph/src/data/subgraph/api_version.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,14 @@ pub const SPEC_VERSION_1_1_0: Version = Version::new(1, 1, 0);
5454
// Enables eth call declarations and indexed arguments(topics) filtering in manifest
5555
pub const SPEC_VERSION_1_2_0: Version = Version::new(1, 2, 0);
5656

57-
// Enables subgraphs as datasource
57+
// Enables subgraphs as datasource.
58+
// Changes the way the VID field is generated. It used to be autoincrement. Now its
59+
// based on block number and the order of the entities in a block. The latter
60+
// represents the write order across all entity types in the subgraph.
5861
pub const SPEC_VERSION_1_3_0: Version = Version::new(1, 3, 0);
5962

6063
// The latest spec version available
61-
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_2_0;
64+
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_3_0;
6265

6366
pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2);
6467

graph/src/schema/input/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$";
3535
const POI_DIGEST: &str = "digest";
3636
/// The name of the PoI attribute for storing the block time
3737
const POI_BLOCK_TIME: &str = "blockTime";
38+
pub(crate) const VID_FIELD: &str = "vid";
3839

3940
pub mod kw {
4041
pub const ENTITY: &str = "entity";
@@ -1597,6 +1598,8 @@ fn atom_pool(document: &s::Document) -> AtomPool {
15971598
pool.intern(POI_DIGEST);
15981599
pool.intern(POI_BLOCK_TIME);
15991600

1601+
pool.intern(VID_FIELD);
1602+
16001603
for definition in &document.definitions {
16011604
match definition {
16021605
s::Definition::TypeDefinition(typedef) => match typedef {

graph/src/schema/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ pub mod ast;
2121
mod entity_key;
2222
mod entity_type;
2323
mod fulltext;
24-
mod input;
24+
pub(crate) mod input;
2525

2626
pub use api::{is_introspection_field, APISchemaError, INTROSPECTION_QUERY_TYPE};
2727

runtime/test/src/test.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -477,13 +477,13 @@ async fn test_ipfs_block() {
477477
// The user_data value we use with calls to ipfs_map
478478
const USER_DATA: &str = "user_data";
479479

480-
fn make_thing(id: &str, value: &str) -> (String, EntityModification) {
480+
fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) {
481481
const DOCUMENT: &str = " type Thing @entity { id: String!, value: String!, extra: String }";
482482
lazy_static! {
483483
static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter");
484484
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
485485
}
486-
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA };
486+
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid: vid };
487487
let key = THING_TYPE.parse_key(id).unwrap();
488488
(
489489
format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value),
@@ -553,8 +553,8 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) {
553553
let subgraph_id = "ipfsMap";
554554

555555
// Try it with two valid objects
556-
let (str1, thing1) = make_thing("one", "eins");
557-
let (str2, thing2) = make_thing("two", "zwei");
556+
let (str1, thing1) = make_thing("one", "eins", 100);
557+
let (str2, thing2) = make_thing("two", "zwei", 100);
558558
let ops = run_ipfs_map(
559559
subgraph_id,
560560
format!("{}\n{}", str1, str2),
@@ -1001,8 +1001,8 @@ async fn test_entity_store(api_version: Version) {
10011001

10021002
let schema = store.input_schema(&deployment.hash).unwrap();
10031003

1004-
let alex = entity! { schema => id: "alex", name: "Alex" };
1005-
let steve = entity! { schema => id: "steve", name: "Steve" };
1004+
let alex = entity! { schema => id: "alex", name: "Alex", vid: 0i64 };
1005+
let steve = entity! { schema => id: "steve", name: "Steve", vid: 1i64 };
10061006
let user_type = schema.entity_type("User").unwrap();
10071007
test_store::insert_entities(
10081008
&deployment,

runtime/wasm/src/host_exports.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,12 @@ impl HostExports {
350350

351351
state.metrics.track_entity_write(&entity_type, &entity);
352352

353-
state
354-
.entity_cache
355-
.set(key, entity, Some(&mut state.write_capacity_remaining))?;
353+
state.entity_cache.set(
354+
key,
355+
entity,
356+
block,
357+
Some(&mut state.write_capacity_remaining),
358+
)?;
356359

357360
Ok(())
358361
}

0 commit comments

Comments
 (0)