Skip to content

Commit

Permalink
fix: only filter currently under maintenance nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
zeeshanakram3 committed Mar 27, 2024
1 parent 971aaae commit 0537ee7
Show file tree
Hide file tree
Showing 7 changed files with 149 additions and 213 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ export class NetworkingService {

async checkActiveStorageNodeEndpoints(): Promise<void> {
try {
const activeStorageOperators = await this.queryNodeApi.getActiveStorageBucketOperatorsData()
const activeStorageOperators = await this.queryNodeApi.getOperationallyActiveStorageBucketOperatorsData()
const endpoints = this.filterStorageNodeEndpoints(
activeStorageOperators.map(({ id, operatorMetadata }) => ({
bucketId: id,
Expand Down
23 changes: 21 additions & 2 deletions distributor-node/src/services/networking/query-node/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -232,11 +232,30 @@ export class QueryNodeApi {
return this.getDataObjectsByBagIds(bagIds)
}

public getActiveStorageBucketOperatorsData(): Promise<StorageBucketOperatorFieldsFragment[]> {
return this.multipleEntitiesQuery<
public async getOperationallyActiveStorageBucketOperatorsData(): Promise<StorageBucketOperatorFieldsFragment[]> {
const buckets = await this.multipleEntitiesQuery<
GetActiveStorageBucketOperatorsDataQuery,
GetActiveStorageBucketOperatorsDataQueryVariables
>(GetActiveStorageBucketOperatorsData, {}, 'storageBuckets')

// Filter out nodes/operators under maintenance
return buckets.filter(({ operatorMetadata }) => {
const status = operatorMetadata?.nodeOperationalStatus
const date = new Date()
if (
!operatorMetadata ||
!status ||
status.__typename === 'NodeOperationalStatusNormal' ||
(status.__typename === 'NodeOperationalStatusNoServiceFrom' && new Date(status.from) > date) || // planned future maintenance (which has not started yet)
(status.__typename === 'NodeOperationalStatusNoServiceUntil' &&
new Date(status.from) > date &&
new Date(status.until) < date) // planned future maintenance with end time (which has not started yet)
) {
return true
}

return false
})
}

public async getPackageVersion(): Promise<string> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,21 +103,31 @@ fragment StorageBucketOperatorFields on StorageBucket {
id
operatorMetadata {
nodeEndpoint
nodeOperationalStatus {
... on NodeOperationalStatusNormal {
__typename
}
... on NodeOperationalStatusNoService {
__typename
forced
}
... on NodeOperationalStatusNoServiceFrom {
__typename
forced
from
}
... on NodeOperationalStatusNoServiceUntil {
__typename
forced
from
until
}
}
}
}

query getActiveStorageBucketOperatorsData {
storageBuckets(
where: {
operatorStatus: { isTypeOf_eq: "StorageBucketOperatorStatusActive" }
operatorMetadata: {
OR: [
{ nodeOperationalStatus_isNull: true }
{ nodeOperationalStatus: { isTypeOf_eq: "NodeOperationalStatusNormal" } }
]
}
}
) {
storageBuckets(where: { operatorStatus: { isTypeOf_eq: "StorageBucketOperatorStatusActive" } }) {
...StorageBucketOperatorFields
}
}
Expand Down
112 changes: 24 additions & 88 deletions distributor-node/src/services/networking/query-node/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -432,10 +432,10 @@ enum DistributionBucketOperatorMetadataOrderByInput {
nodeOperationalStatus_from_DESC
nodeOperationalStatus_from_ASC_NULLS_FIRST
nodeOperationalStatus_from_DESC_NULLS_LAST
nodeOperationalStatus_to_ASC
nodeOperationalStatus_to_DESC
nodeOperationalStatus_to_ASC_NULLS_FIRST
nodeOperationalStatus_to_DESC_NULLS_LAST
nodeOperationalStatus_until_ASC
nodeOperationalStatus_until_DESC
nodeOperationalStatus_until_ASC_NULLS_FIRST
nodeOperationalStatus_until_DESC_NULLS_LAST
nodeOperationalStatus_isTypeOf_ASC
nodeOperationalStatus_isTypeOf_DESC
nodeOperationalStatus_isTypeOf_ASC_NULLS_FIRST
Expand Down Expand Up @@ -678,7 +678,7 @@ input DistributionBucketWhereInput {

type DistributionNodeOperationalStatusSetEvent {
"""Distribution bucket operator"""
bucketOperator: DistributionBucketOperator
bucketOperator: DistributionBucketOperator!

"""Operational status that was set"""
operationalStatus: NodeOperationalStatus!
Expand Down Expand Up @@ -1014,7 +1014,7 @@ input NodeLocationMetadataWhereInput {
coordinates: GeoCoordinatesWhereInput
}

union NodeOperationalStatus = NodeOperationalStatusNormal | NodeOperationalStatusNoService | NodeOperationalStatusNoServiceFrom | NodeOperationalStatusNoServiceDuring
union NodeOperationalStatus = NodeOperationalStatusNormal | NodeOperationalStatusNoService | NodeOperationalStatusNoServiceFrom | NodeOperationalStatusNoServiceUntil

type NodeOperationalStatusNormal {
"""Reason why node was set to this state"""
Expand All @@ -1032,7 +1032,7 @@ type NodeOperationalStatusNoService {
rationale: String
}

type NodeOperationalStatusNoServiceDuring {
type NodeOperationalStatusNoServiceFrom {
"""
Whether the state was set by lead (true) or by the operator (false), it is
meant to prevent worker from unilaterally reversing.
Expand All @@ -1042,14 +1042,11 @@ type NodeOperationalStatusNoServiceDuring {
"""The time from which the bucket would have to no service"""
from: DateTime!

"""The time until which the bucket would have to no service"""
to: DateTime!

"""Reason why node was set to this state"""
rationale: String
}

type NodeOperationalStatusNoServiceFrom {
type NodeOperationalStatusNoServiceUntil {
"""
Whether the state was set by lead (true) or by the operator (false), it is
meant to prevent worker from unilaterally reversing.
Expand All @@ -1059,6 +1056,9 @@ type NodeOperationalStatusNoServiceFrom {
"""The time from which the bucket would have to no service"""
from: DateTime!

"""The time until which the bucket would have to no service"""
until: DateTime!

"""Reason why node was set to this state"""
rationale: String
}
Expand Down Expand Up @@ -1093,15 +1093,15 @@ input NodeOperationalStatusWhereInput {
from_lte: DateTime
from_in: [DateTime!]
from_not_in: [DateTime!]
to_isNull: Boolean
to_eq: DateTime
to_not_eq: DateTime
to_gt: DateTime
to_gte: DateTime
to_lt: DateTime
to_lte: DateTime
to_in: [DateTime!]
to_not_in: [DateTime!]
until_isNull: Boolean
until_eq: DateTime
until_not_eq: DateTime
until_gt: DateTime
until_gte: DateTime
until_lt: DateTime
until_lte: DateTime
until_in: [DateTime!]
until_not_in: [DateTime!]
isTypeOf_isNull: Boolean
isTypeOf_eq: String
isTypeOf_not_eq: String
Expand Down Expand Up @@ -1181,10 +1181,6 @@ type Query {
distributionBucketFamilyById(id: String!): DistributionBucketFamily
distributionBucketFamilyByUniqueInput(where: WhereIdInput!): DistributionBucketFamily @deprecated(reason: "Use distributionBucketFamilyById")
distributionBucketFamiliesConnection(orderBy: [DistributionBucketFamilyOrderByInput!]!, after: String, first: Int, where: DistributionBucketFamilyWhereInput): DistributionBucketFamiliesConnection!
workers(where: WorkerWhereInput, orderBy: [WorkerOrderByInput!], offset: Int, limit: Int): [Worker!]!
workerById(id: String!): Worker
workerByUniqueInput(where: WhereIdInput!): Worker @deprecated(reason: "Use workerById")
workersConnection(orderBy: [WorkerOrderByInput!]!, after: String, first: Int, where: WorkerWhereInput): WorkersConnection!
squidStatus: SquidStatus
squidVersion: SquidVersion!
}
Expand Down Expand Up @@ -1597,10 +1593,10 @@ enum StorageBucketOperatorMetadataOrderByInput {
nodeOperationalStatus_from_DESC
nodeOperationalStatus_from_ASC_NULLS_FIRST
nodeOperationalStatus_from_DESC_NULLS_LAST
nodeOperationalStatus_to_ASC
nodeOperationalStatus_to_DESC
nodeOperationalStatus_to_ASC_NULLS_FIRST
nodeOperationalStatus_to_DESC_NULLS_LAST
nodeOperationalStatus_until_ASC
nodeOperationalStatus_until_DESC
nodeOperationalStatus_until_ASC_NULLS_FIRST
nodeOperationalStatus_until_DESC_NULLS_LAST
nodeOperationalStatus_isTypeOf_ASC
nodeOperationalStatus_isTypeOf_DESC
nodeOperationalStatus_isTypeOf_ASC_NULLS_FIRST
Expand Down Expand Up @@ -2149,63 +2145,3 @@ input WhereIdInput {
id: String!
}

type Worker {
"""Worker id ({workingGroupName}-{workerId})"""
id: String!

"""WorkerId in specific working group module"""
runtimeId: BigInt!
}

type WorkerEdge {
node: Worker!
cursor: String!
}

enum WorkerOrderByInput {
id_ASC
id_DESC
id_ASC_NULLS_FIRST
id_DESC_NULLS_LAST
runtimeId_ASC
runtimeId_DESC
runtimeId_ASC_NULLS_FIRST
runtimeId_DESC_NULLS_LAST
}

type WorkersConnection {
edges: [WorkerEdge!]!
pageInfo: PageInfo!
totalCount: Int!
}

input WorkerWhereInput {
id_isNull: Boolean
id_eq: String
id_not_eq: String
id_gt: String
id_gte: String
id_lt: String
id_lte: String
id_in: [String!]
id_not_in: [String!]
id_contains: String
id_not_contains: String
id_containsInsensitive: String
id_not_containsInsensitive: String
id_startsWith: String
id_not_startsWith: String
id_endsWith: String
id_not_endsWith: String
runtimeId_isNull: Boolean
runtimeId_eq: BigInt
runtimeId_not_eq: BigInt
runtimeId_gt: BigInt
runtimeId_gte: BigInt
runtimeId_lt: BigInt
runtimeId_lte: BigInt
runtimeId_in: [BigInt!]
runtimeId_not_in: [BigInt!]
AND: [WorkerWhereInput!]
OR: [WorkerWhereInput!]
}
27 changes: 25 additions & 2 deletions storage-node/src/services/queryNode/api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,29 @@ export class QueryNodeApi {
return result.data[resultKey]
}

private getOperationallyActiveStorageBuckets(
buckets: Array<StorageBucketIdsFragment>
): Array<StorageBucketIdsFragment> {
// Filter out nodes/operators under maintenance
return buckets.filter(({ operatorMetadata }) => {
const status = operatorMetadata?.nodeOperationalStatus
const date = new Date()
if (
!operatorMetadata ||
!status ||
status.__typename === 'NodeOperationalStatusNormal' ||
(status.__typename === 'NodeOperationalStatusNoServiceFrom' && new Date(status.from) > date) || // planned future maintenance (which has not started yet)
(status.__typename === 'NodeOperationalStatusNoServiceUntil' &&
new Date(status.from) > date &&
new Date(status.until) < date) // planned future maintenance with end time (which has not started yet)
) {
return true
}

return false
})
}

/**
* Returns storage bucket IDs filtered by worker ID.
*
Expand All @@ -197,7 +220,7 @@ export class QueryNodeApi {
return []
}

return result
return this.getOperationallyActiveStorageBuckets(result)
}

/**
Expand Down Expand Up @@ -301,7 +324,7 @@ export class QueryNodeApi {
return []
}

return result
return this.getOperationallyActiveStorageBuckets(result)
}

/**
Expand Down
54 changes: 33 additions & 21 deletions storage-node/src/services/queryNode/queries/queries.graphql
Original file line number Diff line number Diff line change
@@ -1,34 +1,43 @@
fragment NodeOperationalStatusFragment on NodeOperationalStatus {
__typename
... on NodeOperationalStatusNormal {
__typename
}
... on NodeOperationalStatusNoService {
__typename
forced
}
... on NodeOperationalStatusNoServiceFrom {
__typename
forced
from
}
... on NodeOperationalStatusNoServiceUntil {
__typename
forced
from
until
}
}

fragment StorageBucketIds on StorageBucket {
id
operatorMetadata {
nodeOperationalStatus {
...NodeOperationalStatusFragment
}
}
}

query getStorageBuckets {
storageBuckets(
where: {
operatorStatus: { isTypeOf_eq: "StorageBucketOperatorStatusActive" }
operatorMetadata: {
OR: [
{ nodeOperationalStatus_isNull: true }
{ nodeOperationalStatus: { isTypeOf_eq: "NodeOperationalStatusNormal" } }
]
}
}
) {
storageBuckets(where: { operatorStatus: { isTypeOf_eq: "StorageBucketOperatorStatusActive" } }) {
...StorageBucketIds
}
}

query getStorageBucketsByWorkerId($workerId: BigInt!) {
storageBuckets(
where: {
operatorStatus: { isTypeOf_eq: "StorageBucketOperatorStatusActive", workerId_eq: $workerId }
operatorMetadata: {
OR: [
{ nodeOperationalStatus_isNull: true }
{ nodeOperationalStatus: { isTypeOf_eq: "NodeOperationalStatusNormal" } }
]
}
}
where: { operatorStatus: { isTypeOf_eq: "StorageBucketOperatorStatusActive", workerId_eq: $workerId } }
) {
...StorageBucketIds
}
Expand All @@ -49,10 +58,13 @@ query getStorageBucketsOperationalStatus($ids: [String!]) {
... on NodeOperationalStatusNoServiceFrom {
__typename
forced
from
}
... on NodeOperationalStatusNoServiceDuring {
... on NodeOperationalStatusNoServiceUntil {
__typename
forced
from
until
}
}
}
Expand Down
Loading

0 comments on commit 0537ee7

Please sign in to comment.