Skip to content

Subgraph composition spec version #5782

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,9 +207,25 @@ impl EntityCache {
};

// Always test the cache consistency in debug mode. The test only
// makes sense when we were actually asked to read from the store
// makes sense when we were actually asked to read from the store.
// We need to remove the VID as the one from the DB might come from
// a legacy subgraph that has VID autoincremented while this trait
// always creates it in a new style.
debug_assert!(match scope {
GetScope::Store => entity == self.store.get(key).unwrap().map(Arc::new),
GetScope::Store => {
// Release build will never call this function and hence it's OK
// when that implementation is not correct.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean by 'not correct'? It seems correct to me, though a little slow because of the clone, but that's fine for debug builds

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The release build is missing the remove() call bellow which is guarded with a debug pragma, hence the function is a NOP.

fn remove_vid(entity: Option<Arc<Entity>>) -> Option<Entity> {
entity.map(|e| {
#[allow(unused_mut)]
let mut entity = (*e).clone();
#[cfg(debug_assertions)]
entity.remove("vid");
entity
})
}
remove_vid(entity.clone()) == remove_vid(self.store.get(key).unwrap().map(Arc::new))
}
GetScope::InBlock => true,
});

Expand Down
8 changes: 8 additions & 0 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,14 @@ impl EntityType {
pub fn is_object_type(&self) -> bool {
self.schema.is_object_type(self.atom)
}

// Changes the way the VID field is generated. It used to be autoincrement. Now its
// based on block number and the order of the entities in a block. The latter
// represents the write order across all entity types in the subgraph.
pub fn strict_vid_order(&self) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's the method that I would rename to has_vid_seq; the main reason is because InputSchema.strict_vid_order and EntityType.strict_vid_order mean slightly different things. You could also change the comment to (notice the ///)

/// Whether the table for this entity type uses a sequence for the `vid` or whether
/// `graph-node` sets them explicitly. See  also [`InputSchema.strict_vid_order()`]

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed this and next remark in a separate PR as this one is already merged.

// Currently the agregations entities don't have VIDs in insertion order
self.schema.strict_vid_order() && self.is_object_type()
}
}

impl fmt::Display for EntityType {
Expand Down
7 changes: 7 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::data::graphql::{DirectiveExt, DocumentExt, ObjectTypeExt, TypeExt, Va
use crate::data::store::{
self, EntityValidationError, IdType, IntoEntityIterator, TryIntoEntityIterator, ValueType, ID,
};
use crate::data::subgraph::SPEC_VERSION_1_3_0;
use crate::data::value::Word;
use crate::derive::CheapClone;
use crate::prelude::q::Value;
Expand Down Expand Up @@ -955,6 +956,7 @@ pub struct Inner {
pool: Arc<AtomPool>,
/// A list of all timeseries types by interval
agg_mappings: Box<[AggregationMapping]>,
spec_version: Version,
}

impl InputSchema {
Expand Down Expand Up @@ -1042,6 +1044,7 @@ impl InputSchema {
enum_map,
pool,
agg_mappings,
spec_version: spec_version.clone(),
}),
})
}
Expand Down Expand Up @@ -1585,6 +1588,10 @@ impl InputSchema {
}?;
Some(EntityType::new(self.cheap_clone(), obj_type.name))
}

pub fn strict_vid_order(&self) -> bool {
self.inner.spec_version >= SPEC_VERSION_1_3_0
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be a good comment for this method:

/// How the values for the VID field are generated. 
/// When this is `false`, this subgraph uses the old way of autoincrementing `vid` in the database. 
/// When it is `true`, `graph-node` sets the `vid` explicitly to a number based on block number 
/// and the order in which entities are written, and comparing by `vid` will order entities by that order

}

/// Create a new pool that contains the names of all the types defined
Expand Down
11 changes: 7 additions & 4 deletions store/postgres/src/deployment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use diesel::{
sql_query,
sql_types::{Nullable, Text},
};
use graph::semver::Version;
use graph::{
blockchain::block_stream::FirehoseCursor,
data::subgraph::schema::SubgraphError,
Expand Down Expand Up @@ -305,11 +306,13 @@ pub fn debug_fork(

pub fn schema(conn: &mut PgConnection, site: &Site) -> Result<(InputSchema, bool), StoreError> {
use subgraph_manifest as sm;
let (s, use_bytea_prefix) = sm::table
.select((sm::schema, sm::use_bytea_prefix))
let (s, spec_ver, use_bytea_prefix) = sm::table
.select((sm::schema, sm::spec_version, sm::use_bytea_prefix))
.filter(sm::id.eq(site.id))
.first::<(String, bool)>(conn)?;
InputSchema::parse_latest(s.as_str(), site.deployment.clone())
.first::<(String, String, bool)>(conn)?;
let spec_version =
Version::parse(spec_ver.as_str()).map_err(|err| StoreError::Unknown(err.into()))?;
InputSchema::parse(&spec_version, s.as_str(), site.deployment.clone())
.map_err(StoreError::Unknown)
.map(|schema| (schema, use_bytea_prefix))
}
Expand Down
3 changes: 1 addition & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,7 @@ impl Table {
Ok(cols)
}

// Currently the agregations entities don't have VIDs in insertion order
let vid_type = if self.object.is_object_type() {
let vid_type = if self.object.strict_vid_order() {
"bigint"
} else {
"bigserial"
Expand Down
7 changes: 3 additions & 4 deletions store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,6 @@ impl TablePair {

let vid_seq = format!("{}_{VID_COLUMN}_seq", self.src.name);

let old_vid_form = !self.src.object.is_object_type();
let mut query = String::new();

// What we are about to do would get blocked by autovacuum on our
Expand All @@ -253,9 +252,9 @@ impl TablePair {
"src" => src_nsp.as_str(), "error" => e.to_string());
}

// Make sure the vid sequence
// continues from where it was
if old_vid_form {
// Make sure the vid sequence continues from where it was in case
// that we use autoincrementing order of the DB
if !self.src.object.strict_vid_order() {
writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
Expand Down
12 changes: 6 additions & 6 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2377,7 +2377,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
let out = &mut out;
out.unsafe_to_cache_prepared();

let new_vid_form = self.table.object.is_object_type();
let strict_vid_order = self.table.object.strict_vid_order();

// Construct a query
// insert into schema.table(column, ...)
Expand All @@ -2404,7 +2404,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(CAUSALITY_REGION_COLUMN);
};

if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}
out.push_sql(") values\n");
Expand All @@ -2424,7 +2424,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&row.causality_region)?;
};
if new_vid_form {
if strict_vid_order {
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
}
Expand Down Expand Up @@ -4827,7 +4827,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let new_vid_form = self.src.object.is_object_type();
let strict_vid_order = self.src.object.strict_vid_order();

// Construct a query
// insert into {dst}({columns})
Expand All @@ -4849,7 +4849,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(", ");
out.push_sql(CAUSALITY_REGION_COLUMN);
};
if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}

Expand Down Expand Up @@ -4917,7 +4917,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
));
}
}
if new_vid_form {
if strict_vid_order {
out.push_sql(", vid");
}

Expand Down
4 changes: 2 additions & 2 deletions store/test-store/src/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ pub async fn create_subgraph(

let manifest = SubgraphManifest::<graph::blockchain::mock::MockBlockchain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: BTreeSet::new(),
description: Some(format!("manifest for {}", subgraph_id)),
repository: Some(format!("repo for {}", subgraph_id)),
Expand Down Expand Up @@ -227,7 +227,7 @@ pub async fn create_test_subgraph_with_features(

let manifest = SubgraphManifest::<graph::blockchain::mock::MockBlockchain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features,
description: Some(format!("manifest for {}", subgraph_id)),
repository: Some(format!("repo for {}", subgraph_id)),
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/graph/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: LOAD_RELATED_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/postgres/graft.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
4 changes: 2 additions & 2 deletions store/test-store/tests/postgres/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ where
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down Expand Up @@ -1270,7 +1270,7 @@ fn entity_changes_are_fired_and_forwarded_to_subscriptions() {
.expect("Failed to parse user schema");
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: subgraph_id.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down
4 changes: 2 additions & 2 deletions store/test-store/tests/postgres/subgraph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ fn create_subgraph() {

let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id,
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down Expand Up @@ -547,7 +547,7 @@ fn subgraph_features() {
} = get_subgraph_features(id.to_string()).unwrap();

assert_eq!(NAME, subgraph_id.as_str());
assert_eq!("1.0.0", spec_version);
assert_eq!("1.3.0", spec_version);
assert_eq!("1.0.0", api_version.unwrap());
assert_eq!(NETWORK_NAME, network);
assert_eq!(
Expand Down
2 changes: 1 addition & 1 deletion store/test-store/tests/postgres/writable.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ lazy_static! {
async fn insert_test_data(store: Arc<DieselSubgraphStore>) -> DeploymentLocator {
let manifest = SubgraphManifest::<graph_chain_ethereum::Chain> {
id: TEST_SUBGRAPH_ID.clone(),
spec_version: Version::new(1, 0, 0),
spec_version: Version::new(1, 3, 0),
features: Default::default(),
description: None,
repository: None,
Expand Down