diff --git a/src/mappings/handlers.ts b/src/mappings/handlers.ts index 714cf1d..dd8d1b9 100644 --- a/src/mappings/handlers.ts +++ b/src/mappings/handlers.ts @@ -31,19 +31,10 @@ import { handleEventClaimUpdated, handleEventProofUpdated, handleEventProofValidityChecked, - handleEventSupplierSlashed, handleMsgCreateClaim, handleMsgSubmitProof, } from "./pocket/relays"; import { handleEventRelayMiningDifficultyUpdated, handleMsgAddService } from "./pocket/services"; -import { - handleEventSupplierServiceConfigActivated, - handleMsgClaimMorseSupplier, - handleSupplierStakeMsg, - handleSupplierUnbondingBeginEvent, - handleSupplierUnbondingEndEvent, - handleUnstakeSupplierMsg, -} from "./pocket/suppliers"; import { handleValidatorCommission, handleValidatorMsgCreate, @@ -68,7 +59,6 @@ export const MsgHandlers: Record) => Pro // this is currently being handle inside Authz handler "/pocket.migration.MsgRecoverMorseAccount": noOp, "/pocket.migration.MsgClaimMorseApplication": handleMsgClaimMorseApplication, - "/pocket.migration.MsgClaimMorseSupplier": handleMsgClaimMorseSupplier, // bank "/cosmos.bank.v1beta1.MsgSend": handleNativeTransfer, // validator @@ -83,9 +73,10 @@ export const MsgHandlers: Record) => Pro "/pocket.application.MsgTransferApplication": handleTransferApplicationMsg, // service "/pocket.service.MsgAddService": handleMsgAddService, - // supplier - "/pocket.supplier.MsgStakeSupplier": handleSupplierStakeMsg, - "/pocket.supplier.MsgUnstakeSupplier": handleUnstakeSupplierMsg, + // supplier - handled by batch processing in indexSupplier (called from indexStake) + "/pocket.supplier.MsgStakeSupplier": noOp,// - now handled in indexSupplier + "/pocket.supplier.MsgUnstakeSupplier": noOp,// - now handled in indexSupplier + "/pocket.migration.MsgClaimMorseSupplier": noOp,// - now handled in indexSupplier // gateway "/pocket.gateway.MsgStakeGateway": handleGatewayMsgStake, "/pocket.gateway.MsgUnstakeGateway": handleGatewayMsgUnstake, @@ -117,10 +108,10 @@ export const EventHandlers: Record) => Promi "pocket.application.EventTransferError": handleTransferApplicationErrorEvent, "pocket.application.EventApplicationUnbondingBegin": handleApplicationUnbondingBeginEvent, "pocket.application.EventApplicationUnbondingEnd": handleApplicationUnbondingEndEvent, - // supplier - "pocket.supplier.EventSupplierServiceConfigActivated": handleEventSupplierServiceConfigActivated, - "pocket.supplier.EventSupplierUnbondingBegin": handleSupplierUnbondingBeginEvent, - "pocket.supplier.EventSupplierUnbondingEnd": handleSupplierUnbondingEndEvent, + // supplier - handled by batch processing in indexSupplier (called from indexStake) + "pocket.supplier.EventSupplierServiceConfigActivated": noOp, // - now handled in indexSupplier + "pocket.supplier.EventSupplierUnbondingBegin": noOp, // - now handled in indexSupplier + "pocket.supplier.EventSupplierUnbondingEnd": noOp, // - now handled in indexSupplier // service "pocket.service.EventRelayMiningDifficultyUpdated": handleEventRelayMiningDifficultyUpdated, // gateway @@ -130,7 +121,7 @@ export const EventHandlers: Record) => Promi // relay "pocket.tokenomics.EventClaimSettled": handleEventClaimSettled, "pocket.tokenomics.EventClaimExpired": handleEventClaimExpired, - "pocket.tokenomics.EventSupplierSlashed": handleEventSupplierSlashed, + "pocket.tokenomics.EventSupplierSlashed": noOp, // - now handled in indexSupplier "pocket.tokenomics.EventApplicationOverserviced": handleEventApplicationOverserviced, "pocket.tokenomics.EventApplicationReimbursementRequest": handleEventApplicationReimbursementRequest, "pocket.proof.EventClaimUpdated": handleEventClaimUpdated, diff --git a/src/mappings/indexer.manager.ts b/src/mappings/indexer.manager.ts index 0164b52..fe045a4 100644 --- a/src/mappings/indexer.manager.ts +++ b/src/mappings/indexer.manager.ts @@ -25,6 +25,7 @@ import { MsgHandlers, } from "./handlers"; import { handleAddBlockReports } from "./pocket/reports"; +import { indexSupplier } from "./pocket/suppliers"; import { handleBlock, handleGenesis, @@ -501,80 +502,6 @@ async function indexStakeEntity(allData: Array, get ) } -// any supplier msg or event -async function indexSupplier(msgByType: MessageByType, eventByType: EventByType): Promise { - const msgTypes = [ - "/pocket.supplier.MsgUnstakeSupplier", - "/pocket.migration.MsgClaimMorseSupplier", - "/pocket.supplier.MsgStakeSupplier", - ]; - const eventTypes = [ - "pocket.supplier.EventSupplierUnbondingBegin", - "pocket.supplier.EventSupplierUnbondingEnd", - "pocket.supplier.EventSupplierServiceConfigActivated", - // this is here because it modifies the staked tokens of the supplier - "pocket.tokenomics.EventSupplierSlashed" - ]; - - - const eventGetId = (attributes: CosmosEvent["event"]["attributes"]) => { - for (const attribute of attributes) { - if (attribute.key === "supplier") { - return JSON.parse(attribute.value as string).operator_address - } - - if (attribute.key === "operator_address") { - return attribute.value as string - } - } - - return null - } - - await indexStakeEntity( - [ - ...msgTypes.map(type => msgByType[type]).flat(), - ...eventTypes.map(type => eventByType[type]).flat() - ], - { - "/pocket.supplier.MsgUnstakeSupplier": "operatorAddress", - "/pocket.supplier.MsgStakeSupplier": "operatorAddress", - "/pocket.migration.MsgClaimMorseSupplier": "shannonOperatorAddress", - "pocket.supplier.EventSupplierUnbondingBegin": eventGetId, - "pocket.supplier.EventSupplierUnbondingEnd": eventGetId, - "pocket.supplier.EventSupplierServiceConfigActivated": eventGetId, - "pocket.tokenomics.EventSupplierSlashed": (attributes) => { - /* - [ - {"key":"application_address","value":"\"pokt16wwc45wjc4ulne7wmaawxhju00vwf900lscfld\""}, - {"key":"claim_proof_status_int","value":"2"}, - {"key":"proof_missing_penalty","value":"\"1upokt\""}, - {"key":"service_id","value":"\"hey\""}, - {"key":"session_end_block_height","value":"\"363540\""}, - {"key":"supplier_operator_address","value":"\"pokt1wua234ulad3vkcsqmasu845mn4ugu9aa6jcv23\""}, - {"key":"mode","value":"EndBlock"} - ] - */ - for (const attribute of attributes) { - // in the previous version of this event this is the key to get the supplierId - if (attribute.key === "supplier_operator_addr" || attribute.key === "supplier_operator_address") { - return attribute.value as string - } - - if (attribute.key === "claim") { - return JSON.parse(attribute.value as string).supplier_operator_address - } - - if (attribute.key === "supplier_operator_address") { - return attribute.value as string - } - } - - return null - } - }) -} - // any message or event related to stake (supplier, gateway, application, service) async function indexStake(msgByType: MessageByType, eventByType: EventByType): Promise { await Promise.all([ diff --git a/src/mappings/pocket/relays.ts b/src/mappings/pocket/relays.ts index 5df7fd1..15f9be4 100644 --- a/src/mappings/pocket/relays.ts +++ b/src/mappings/pocket/relays.ts @@ -57,7 +57,7 @@ import { import { getDenomAndAmount } from "../utils/primitives"; // this can return undefined because older events do not have this attribute -function getClaimProofStatusFromSDK(item: typeof ClaimProofStatusSDKType | string | number): ClaimProofStatus | undefined { +export function getClaimProofStatusFromSDK(item: typeof ClaimProofStatusSDKType | string | number): ClaimProofStatus | undefined { if (!item) return undefined; switch (item) { @@ -151,7 +151,7 @@ function parseAttribute(attribute: unknown = ""): string { } // eslint-disable-next-line complexity -function getAttributes(attributes: CosmosEvent["event"]["attributes"]) { +export function getAttributes(attributes: CosmosEvent["event"]["attributes"]) { // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore @@ -890,121 +890,6 @@ function _handleEventApplicationReimbursementRequest(event: CosmosEvent): EventA } } -async function _handleOldEventSupplierSlashed(event: CosmosEvent) { - let slashingCoin: CoinSDKType | null = null, operatorAddress = ""; - - for (const attribute of event.event.attributes) { - if (attribute.key === "slashing_amount") { - slashingCoin = getDenomAndAmount(attribute.value as string); - if (!slashingCoin) { - throw new Error(`[handleEventSupplierSlashed] event attribute key=${attribute.key} value=${attribute.value} is not a valid coin`); - } - } - - if (attribute.key === "proof_missing_penalty") { - /* - [ - {"key":"application_address","value":"\"pokt16wwc45wjc4ulne7wmaawxhju00vwf900lscfld\""}, - {"key":"claim_proof_status_int","value":"2"}, - {"key":"proof_missing_penalty","value":"\"1upokt\""}, - {"key":"service_id","value":"\"hey\""}, - {"key":"session_end_block_height","value":"\"363540\""}, - {"key":"supplier_operator_address","value":"\"pokt1wua234ulad3vkcsqmasu845mn4ugu9aa6jcv23\""}, - {"key":"mode","value":"EndBlock"} - ] - */ - const coins = parseCoins(parseAttribute(attribute.value)); - if (!coins.length) { - throw new Error(`[handleEventSupplierSlashed] event attribute key=${attribute.key} value=${attribute.value} is not a valid coin`); - } - } - - if (attribute.key === "supplier_operator_addr" || attribute.key === "supplier_operator_address") { - operatorAddress = parseAttribute(attribute.value); - } - } - - if (!slashingCoin) { - throw new Error(`[handleEventSupplierSlashed] slashingCoin not found in event`); - } - - if (!operatorAddress) { - logger.error(`[handleEventSupplierSlashed] operatorAddress not found in event=${event.kind} attributes=${stringify(event.event.attributes)}`); - throw new Error(`[handleEventSupplierSlashed] operatorAddress not found in event`); - } - - const supplier = await Supplier.get(operatorAddress); - - if (!supplier) { - throw new Error(`[handleEventSupplierSlashed] supplier not found for operator address ${operatorAddress}`); - } - - supplier.stakeAmount -= BigInt(slashingCoin.amount); - - await Promise.all([ - supplier.save(), - EventSupplierSlashed.create({ - id: getEventId(event), - supplierId: operatorAddress, - blockId: getBlockId(event.block), - eventId: getEventId(event), - proofMissingPenalty: BigInt(slashingCoin.amount), - proofMissingPenaltyDenom: slashingCoin.denom, - previousStakeAmount: supplier.stakeAmount, - afterStakeAmount: supplier.stakeAmount, - // in alpha this event does not have the values below, so we are setting them to empty values for now - applicationId: "", - serviceId: "", - sessionId: "", - sessionStartHeight: BigInt(0), - sessionEndHeight: BigInt(0), - }).save(), - ]); -} - -async function _handleEventSupplierSlashed(event: CosmosEvent) { - const { - claim, - proofMissingPenalty, - } = getAttributes(event.event.attributes); - - if (!claim || !claim.session_header || Object.keys(claim).length === 0) { - logger.warn(`[handleEventSupplierSlashed] claim not found in event, trying to handle with previous version`); - await _handleOldEventSupplierSlashed(event); - return; - } - - const supplier = await Supplier.get(claim.supplier_operator_address); - - if (!supplier) { - throw new Error(`[handleEventSupplierSlashed] supplier not found for address: ${claim.supplier_operator_address}`); - } - - const previousStakeAmount = supplier.stakeAmount.valueOf(); - supplier.stakeAmount -= BigInt(proofMissingPenalty.amount); - - await Promise.all([ - supplier.save(), - EventSupplierSlashed.create({ - id: getEventId(event), - supplierId: claim.supplier_operator_address, - // TODO: Create entity for session header - applicationId: claim.session_header.application_address, - serviceId: claim.session_header.service_id, - sessionId: claim.session_header.session_id || "", - sessionEndHeight: BigInt(claim.session_header.session_end_block_height || "0"), - sessionStartHeight: BigInt(claim.session_header.session_start_block_height || "0"), - blockId: getBlockId(event.block), - eventId: getEventId(event), - proofMissingPenalty: BigInt(proofMissingPenalty.amount), - proofMissingPenaltyDenom: proofMissingPenalty.denom, - previousStakeAmount, - afterStakeAmount: supplier.stakeAmount, - proofValidationStatus: getClaimProofStatusFromSDK(claim.proof_validation_status), - }).save(), - ]); -} - // eslint-disable-next-line complexity function _handleEventProofValidityChecked(event: CosmosEvent): EventProofValidityCheckedProps { let supplierOperatorAddress = "", @@ -1158,10 +1043,6 @@ export async function handleEventApplicationReimbursementRequest(events: Array): Promise { - await Promise.all(events.map(_handleEventSupplierSlashed)); -} - export async function handleEventProofValidityChecked(events: Array): Promise { await optimizedBulkCreate("EventProofValidityChecked", events.map(_handleEventProofValidityChecked)); } diff --git a/src/mappings/pocket/suppliers.ts b/src/mappings/pocket/suppliers.ts index 5674b16..4816aa6 100644 --- a/src/mappings/pocket/suppliers.ts +++ b/src/mappings/pocket/suppliers.ts @@ -3,15 +3,11 @@ import { CosmosEvent, CosmosMessage, } from "@subql/types-cosmos"; +import { get, orderBy } from "lodash"; import { Coin } from "../../client/cosmos/base/v1beta1/coin"; +import { parseCoins } from "../../cosmjs/utils"; import { - EventSupplierServiceConfigActivated, - EventSupplierUnbondingBegin as EventSupplierUnbondingBeginEntity, - EventSupplierUnbondingEnd as EventSupplierUnbondingEndEntity, MorseSupplierClaimSignerType, - MsgClaimMorseSupplier as MsgClaimMorseSupplierEntity, - MsgStakeSupplier as MsgStakeSupplierEntity, - MsgUnstakeSupplier as MsgUnstakeSupplierEntity, StakeStatus, Supplier, SupplierEndpoint, @@ -19,13 +15,18 @@ import { SupplierServiceConfig, SupplierUnbondingReason, } from "../../types"; +import { EventSupplierServiceConfigActivatedProps } from "../../types/models/EventSupplierServiceConfigActivated"; +import { EventSupplierSlashedProps } from "../../types/models/EventSupplierSlashed"; +import { EventSupplierUnbondingBeginProps } from "../../types/models/EventSupplierUnbondingBegin"; +import { EventSupplierUnbondingEndProps } from "../../types/models/EventSupplierUnbondingEnd"; import { MsgClaimMorseSupplierProps } from "../../types/models/MsgClaimMorseSupplier"; -import { MsgStakeSupplierServiceProps } from "../../types/models/MsgStakeSupplierService"; +import { MsgStakeSupplierProps } from "../../types/models/MsgStakeSupplier"; +import { MsgUnstakeSupplierProps } from "../../types/models/MsgUnstakeSupplier"; +import { SupplierProps } from "../../types/models/Supplier"; import { SupplierServiceConfigProps } from "../../types/models/SupplierServiceConfig"; import { CoinSDKType } from "../../types/proto-interfaces/cosmos/base/v1beta1/coin"; import { MorseSupplierClaimSignerTypeSDKType } from "../../types/proto-interfaces/pocket/migration/morse_onchain"; import { MsgClaimMorseSupplier } from "../../types/proto-interfaces/pocket/migration/tx"; -import { SupplierServiceConfig as SupplierServiceConfigType } from "../../types/proto-interfaces/pocket/shared/service"; import { SupplierSDKType } from "../../types/proto-interfaces/pocket/shared/supplier"; import { supplierUnbondingReasonFromJSON, @@ -35,21 +36,30 @@ import { MsgStakeSupplier, MsgUnstakeSupplier, } from "../../types/proto-interfaces/pocket/supplier/tx"; -import { optimizedBulkCreate } from "../utils/db"; +import { + fetchPaginatedRecords, + getSequelize, + getStoreModel, + optimizedBulkCreate +} from "../utils/db"; import { getBlockId, getEventId, - getMsgStakeServiceId, getStakeServiceId, messageId, } from "../utils/ids"; -import { getDenomAndAmount } from "../utils/primitives"; +import { parseAttribute } from "../utils/json"; +import { + filterEventsByTxStatus, + filterMsgByTxStatus, + getDenomAndAmount, + isEventOfFinalizedBlockKind +} from "../utils/primitives"; import { Ed25519, pubKeyToAddress, } from "../utils/pub_key"; -import { updateMorseClaimableAccounts } from "./migration"; -import { fetchAllSupplierServiceConfigBySupplier } from "./pagination"; +import { getAttributes, getClaimProofStatusFromSDK } from "./relays"; function getMorseSupplierClaimSignerType(item: typeof MorseSupplierClaimSignerTypeSDKType | string | number): MorseSupplierClaimSignerType { switch (item) { @@ -97,135 +107,18 @@ function getSupplierUnbondingReasonFromSDK(item: typeof SupplierUnbondingReasonS } } -interface StakeSupplierProps { - operatorAddress: string; - ownerAddress: string; - stake: Coin; - services: Array; - msgId: string - msgServicesEntityName: string - serviceMsgIdKey: 'claimMsgId' | 'stakeMsgId' -} - -type ClaimOrStake = T extends 'claimMsgId' ? MsgClaimMorseSupplierProps : MsgStakeSupplierServiceProps - -async function _stakeSupplier({ - msgId, - msgServicesEntityName, - operatorAddress, - ownerAddress, - serviceMsgIdKey, - services, - stake, -}: StakeSupplierProps): Promise>> { - const supplier = Supplier.create({ - id: operatorAddress, - operatorId: operatorAddress, - ownerId: ownerAddress, - stakeAmount: BigInt(stake.amount), - stakeDenom: stake.denom, - stakeStatus: StakeStatus.Staked, - }); - - const servicesId: Array = []; - // used to create the services that came in the stake message - const supplierMsgStakeServices: Array> = []; - // used to have the services that are currently configured for the supplier - const newSupplierServices: Array = []; - - for (const { endpoints, revShare, serviceId } of services) { - servicesId.push(serviceId); - - const endpointsArr: Array = endpoints.map((endpoint) => ({ - url: endpoint.url, - rpcType: endpoint.rpcType, - configs: endpoint.configs, - })); - - const revShareArr: Array = revShare.map((revShare) => ({ - address: revShare.address, - revSharePercentage: revShare.revSharePercentage.toString(), - })); - - supplierMsgStakeServices.push({ - id: getMsgStakeServiceId(msgId, serviceId), - serviceId, - [serviceMsgIdKey]: msgId, - endpoints: endpointsArr, - revShare: revShareArr, - } as ClaimOrStake); - - newSupplierServices.push({ - id: getStakeServiceId(operatorAddress, serviceId), - serviceId, - supplierId: operatorAddress, - endpoints: endpointsArr, - revShare: revShareArr, - }); - } - - const currentSupplierServices = await SupplierServiceConfig.getBySupplierId(operatorAddress, { limit: 100 }); - - const servicesToRemove: Array = []; - - if (currentSupplierServices && currentSupplierServices.length > 0) { - for (const service of currentSupplierServices) { - if (!servicesId.includes(service.serviceId)) { - servicesToRemove.push(service.id); - } - } - } - - const promises: Array> = [ - supplier.save(), - optimizedBulkCreate(msgServicesEntityName, supplierMsgStakeServices), - store.bulkCreate("SupplierServiceConfig", newSupplierServices), - ]; - - if (servicesToRemove.length > 0) { - promises.push(store.bulkRemove("SupplierServiceConfig", servicesToRemove)); - } - - return promises -} - -async function _handleSupplierStakeMsg(msg: CosmosMessage) { - if (!msg.msg.decodedMsg.stake) { - return logger.error(`[handleSupplierStakeMsg] stake not provided in msg`); - } - - const msgId = messageId(msg); - - const promises = await _stakeSupplier({ - msgId, - msgServicesEntityName: "MsgStakeSupplierService", - operatorAddress: msg.msg.decodedMsg.operatorAddress, - ownerAddress: msg.msg.decodedMsg.ownerAddress, - stake: msg.msg.decodedMsg.stake, - services: msg.msg.decodedMsg.services, - serviceMsgIdKey: 'stakeMsgId', - }); - - promises.push( - MsgStakeSupplierEntity.create({ - id: msgId, - signerId: msg.msg.decodedMsg.signer, - supplierId: msg.msg.decodedMsg.operatorAddress, - ownerId: msg.msg.decodedMsg.ownerAddress, - stakeAmount: BigInt(msg.msg.decodedMsg.stake.amount), - stakeDenom: msg.msg.decodedMsg.stake.denom, - blockId: getBlockId(msg.block), - transactionId: msg.tx.hash, - messageId: msgId, - }).save() - ) - - await Promise.all(promises); -} - -async function _handleMsgClaimMorseSupplier(msg: CosmosMessage) { - const msgId = messageId(msg); - +function _handleClaimSupplier( + msg: CosmosMessage, + record: Record, + }> +): { + supplier: SupplierProps, + msgClaimSupplier: MsgClaimMorseSupplierProps, + services: Array, + servicesToRemove: Array, +} { let stakeCoin: Coin | null = null, balanceCoin: Coin | null = null, claimSignerType: string | null = null; for (const event of msg.tx.tx.events) { @@ -264,24 +157,31 @@ async function _handleMsgClaimMorseSupplier(msg: CosmosMessage, -) { - const supplier = await Supplier.get(msg.msg.decodedMsg.operatorAddress); - - if (!supplier) { - throw new Error(`[handleUnstakeSupplierMsg] supplier not found for operator address ${msg.msg.decodedMsg.operatorAddress}`); + }, + ...getServices( + rawServices, + operatorAddress, + Object.keys(record[operatorAddress]?.services || {}) + ) } - - const msgId = messageId(msg); - - const msgUnstakeSupplier = MsgUnstakeSupplierEntity.create({ - id: msgId, - signerId: msg.msg.decodedMsg.signer, - supplierId: msg.msg.decodedMsg.operatorAddress, - blockId: getBlockId(msg.block), - transactionId: msg.tx.hash, - messageId: msgId, - }); - - supplier.stakeStatus = StakeStatus.Unstaking; - supplier.unstakingBeginBlockId = getBlockId(msg.block); - - await Promise.all([ - supplier.save(), - msgUnstakeSupplier.save(), - ]); } -async function _handleEventSupplierServiceConfigActivated( +function _handleEventSupplierServiceConfigActivated( event: CosmosEvent, -) { + record: Record, + }> +): { + services: Array, + serviceConfigEvent: EventSupplierServiceConfigActivatedProps, +} { let activationHeight: bigint | null = null, operatorAddress: string | undefined, serviceId: string | undefined; for (const {key, value} of event.event.attributes) { @@ -367,12 +248,10 @@ async function _handleEventSupplierServiceConfigActivated( throw new Error(`[handleEventSupplierServiceConfigActivated] operatorAddress not found in event`); } - let services: Array = [] + let services: Array = [] if (serviceId) { - const service = await SupplierServiceConfig.get( - getStakeServiceId(operatorAddress, serviceId) - ) + const service = record[operatorAddress]?.services?.[getStakeServiceId(operatorAddress, serviceId)]; if (service) { services = [ @@ -382,63 +261,37 @@ async function _handleEventSupplierServiceConfigActivated( } if (services.length === 0) { - services = await fetchAllSupplierServiceConfigBySupplier(operatorAddress); + services = Object.values(record[operatorAddress]?.services || {}); } const eventId = getEventId(event); - await Promise.all([ - EventSupplierServiceConfigActivated.create({ + return { + services: services + .filter((service) => !service.activatedAtId) + .map((service) => { + service.activatedAtId = activationHeight; + service.activatedEventId = eventId; + return service; + }), + serviceConfigEvent: { id: eventId, eventId: eventId, blockId: getBlockId(event.block), - }).save(), - store.bulkUpdate( - "SupplierServiceConfig", - services - .filter((service) => !service.activatedAtId) - .map((service) => { - service.activatedAtId = activationHeight; - service.activatedEventId = eventId; - return service; - }) - ) - ]) + } + } } -async function _handleSupplierUnbondingBeginEvent( +function _handleSupplierUnbondingBeginEvent( event: CosmosEvent, -) { - /* - { - "type":"pocket.supplier.EventSupplierUnbondingBegin", - "attributes":[ - { - "key":"reason", - "value":"\"SUPPLIER_UNBONDING_REASON_BELOW_MIN_STAKE\"", - "index":true - }, - { - "key":"session_end_height", - "value":"\"55060\"", - "index":true - }, - { - "key":"supplier", - "value":"{\"owner_address\":\"pokt1kfjlev8j9nml32rzln7nw6r9pynez30c5lpgx5\",\"operator_address\":\"pokt1kfjlev8j9nml32rzln7nw6r9pynez30c5lpgx5\", - \"stake\":{\"denom\":\"upokt\",\"amount\":\"0\"},\"services\":[{\"service_id\":\"proto-anvil\",\ - "endpoints\":[{\"url\":\"https://beta-relayminer-4.us-nj.pocket.com:443\",\"rpc_type\":\"JSON_RPC\",\ - "configs\":[]}],\"rev_share\":[{\"address\":\"pokt1kfjlev8j9nml32rzln7nw6r9pynez30c5lpgx5\", - \"rev_share_percentage\":100}]},{\"service_id\":\"proto-static-ngx\", - \"endpoints\":[{\"url\":\"https://beta-relayminer-4.us-nj.pocket.com:443\",\"rpc_type\":\"JSON_RPC\",\"configs\":[]}], - \"rev_share\":[{\"address\":\"pokt1kfjlev8j9nml32rzln7nw6r9pynez30c5lpgx5\",\"rev_share_percentage\":100}]}],\"unstake_session_end_height\":\"55060\", - \"services_activation_heights_map\":{\"proto-anvil\":\"31851\",\"proto-static-ngx\":\"31851\"}}","index":true - }, - { - "key":"unbonding_end_height", - "value":"\"55070\"","index":true - },{"key":"mode","value":"EndBlock","index":true}]} - */ + record: Record, + }> +): { + supplier: SupplierProps, + unbondingBeginEvent: EventSupplierUnbondingBeginProps, +} { let unbondingHeight: bigint | null = null, sessionEndHeight: bigint | null = null, operatorAddress: string | undefined, reason: null | number = null; for (const attribute of event.event.attributes) { @@ -470,33 +323,25 @@ async function _handleSupplierUnbondingBeginEvent( throw new Error(`[handleSupplierUnbondingBeginEvent] operatorAddress not provided in event`); } - const supplier = await Supplier.get(operatorAddress); + const supplier = record [operatorAddress]?.supplier if (!supplier) { throw new Error(`[handleSupplierUnbondingBeginEvent] supplier not found for operator address ${operatorAddress}`); } - if (!unbondingHeight) { - // todo: we should do this -> throw new Error(`[handleSupplierUnbondingBeginEvent] unbonding_end_height not found`); - // but alpha has still events without this - logger.error(`[handleSupplierUnbondingBeginEvent] unbonding_end_height not found`); - } else { - supplier.unstakingEndHeight = unbondingHeight - } - - if (reason === null) { - // todo: we should do this -> throw new Error(`[handleSupplierUnbondingBeginEvent] reason not found in event`); - // but alpha has still events without this - logger.error(`[handleSupplierUnbondingBeginEvent] reason not found in event`); - } else { - supplier.unstakingReason = getSupplierUnbondingReasonFromSDK(reason) - } - const eventId = getEventId(event); - await Promise.all([ - supplier.save(), - EventSupplierUnbondingBeginEntity.create({ + return { + supplier: { + ...supplier, + ...(unbondingHeight && { + unstakingEndHeight: unbondingHeight, + }), + ...(reason && { + unstakingReason: getSupplierUnbondingReasonFromSDK(reason) + }) + }, + unbondingBeginEvent: { id: eventId, unbondingEndHeight: unbondingHeight || BigInt(0), sessionEndHeight: sessionEndHeight || BigInt(0), @@ -504,13 +349,21 @@ async function _handleSupplierUnbondingBeginEvent( blockId: getBlockId(event.block), reason: reason !== null ? getSupplierUnbondingReasonFromSDK(reason) : SupplierUnbondingReason.UNSPECIFIED, eventId, - }).save(), - ]); + } + } } -async function _handleSupplierUnbondingEndEvent( +function _handleSupplierUnbondingEndEvent( event: CosmosEvent, -) { + record: Record, + }> +): { + supplier: SupplierProps, + servicesToRemove: Array, + unbondingEndEvent: EventSupplierUnbondingEndProps, +} { let unbondingHeight: bigint | null = null, sessionEndHeight: bigint | null = null, operatorAddress: string | undefined, reason: null | number = null; for (const attribute of event.event.attributes) { @@ -542,36 +395,28 @@ async function _handleSupplierUnbondingEndEvent( throw new Error(`[handleSupplierUnbondingEndEvent] operatorAddress not provided in event`); } - const supplier = await Supplier.get(operatorAddress); + const supplierAndServices = record[operatorAddress] + const supplier = supplierAndServices?.supplier if (!supplier) { throw new Error(`[handleSupplierUnbondingEndEvent] supplier not found for operator address ${operatorAddress}`); } - if (!unbondingHeight) { - // todo: we should do this -> throw new Error(`[handleSupplierUnbondingEndEvent] unbonding_end_height not found`); - // but alpha has still events without this - logger.error(`[handleSupplierUnbondingEndEvent] unbonding_end_height not found`); - } else { - supplier.unstakingEndBlockId = unbondingHeight - } - - if (reason === null) { - // todo: we should do this -> throw new Error(`[handleSupplierUnbondingBeginEvent] reason not found in event`); - // but alpha has still events without this - logger.error(`[handleSupplierUnbondingEndEvent] reason not found in event`); - } else { - supplier.unstakingReason = getSupplierUnbondingReasonFromSDK(reason) - } - supplier.stakeStatus = StakeStatus.Unstaked; - const supplierServices = (await fetchAllSupplierServiceConfigBySupplier(operatorAddress) || []).map(item => item.id); - const eventId = getEventId(event); - await Promise.all([ - EventSupplierUnbondingEndEntity.create({ + return { + supplier: { + ...supplier, + ...(unbondingHeight && { + unstakingEndBlockId: unbondingHeight, + }), + ...(reason && { + unstakingReason: getSupplierUnbondingReasonFromSDK(reason) + }) + }, + unbondingEndEvent: { id: eventId, unbondingEndHeight: unbondingHeight || BigInt(0), sessionEndHeight: sessionEndHeight || BigInt(0), @@ -579,55 +424,826 @@ async function _handleSupplierUnbondingEndEvent( blockId: getBlockId(event.block), supplierId: operatorAddress, eventId, - }).save(), - supplier.save(), - // todo: change this for an atomic operation - store.bulkRemove("SupplierServiceConfig", supplierServices), - ]); + }, + servicesToRemove: Object.keys(supplierAndServices?.services || {}) + } } -export async function handleSupplierStakeMsg( - messages: Array>, -): Promise { - await Promise.all(messages.map(_handleSupplierStakeMsg)); +function getServices( + rawServices: MsgStakeSupplier['services'], + operatorAddress: string, + existingServicesId: Array +) { + // to compare with the current services and know which one to remove + const servicesId: Array = []; + // services to save + const services: Array = []; + + for (const { endpoints, revShare, serviceId } of rawServices) { + servicesId.push(serviceId); + + const endpointsArr: Array = endpoints.map((endpoint) => ({ + url: endpoint.url, + rpcType: endpoint.rpcType, + configs: endpoint.configs, + })); + + const revShareArr: Array = revShare.map((revShare) => ({ + address: revShare.address, + revSharePercentage: revShare.revSharePercentage.toString(), + })); + + services.push({ + id: getStakeServiceId(operatorAddress, serviceId), + serviceId, + supplierId: operatorAddress, + endpoints: endpointsArr, + revShare: revShareArr, + }); + } + + const servicesToRemove: Array = []; + + for (const serviceId of existingServicesId) { + if (!servicesId.includes(serviceId)) { + servicesToRemove.push(serviceId); + } + } + + return { + servicesToRemove, + services, + } } -export async function handleMsgClaimMorseSupplier( - messages: Array>, -): Promise { - await Promise.all([ - ...messages.map(_handleMsgClaimMorseSupplier), - updateMorseClaimableAccounts( - messages.map((msg) => ({ - morseAddress: msg.msg.decodedMsg.morseNodeAddress, - destinationAddress: msg.msg.decodedMsg.shannonOperatorAddress, - claimedMsgId: messageId(msg), - transactionHash: msg.tx.hash, - })) +function _handleSupplierStakeMsg( + msg: CosmosMessage, + record: Record, + }> +): { + supplier: SupplierProps, + msgStakeSupplier: MsgStakeSupplierProps, + services: Array, + servicesToRemove: Array, +} { + // the MsgStakeSupplier can come without the stake field, so we need to get the previous stake + let stake = msg.msg.decodedMsg.stake; + + if (!stake) { + const previousSupplier = record[msg.msg.decodedMsg.operatorAddress]?.supplier; + + if (!previousSupplier) { + throw new Error(`[handleSupplierStakeMsg] previous supplier not found for operator address ${msg.msg.decodedMsg.operatorAddress}`); + } + + stake = { + amount: previousSupplier.stakeAmount.toString(), + denom: previousSupplier.stakeDenom, + } + } + + if (!stake) { + throw new Error(`[handleSupplierStakeMsg] stake not provided in msg`); + } + + const {operatorAddress, ownerAddress, services: rawServices, signer} = msg.msg.decodedMsg; + + const msgId = messageId(msg); + + return { + supplier: { + id: operatorAddress, + operatorId: operatorAddress, + ownerId: ownerAddress, + stakeAmount: BigInt(stake.amount), + stakeDenom: stake.denom, + stakeStatus: StakeStatus.Staked, + unstakingEndHeight: undefined, + unstakingEndBlockId: undefined, + unstakingBeginBlockId: undefined, + unstakingReason: undefined, + }, + msgStakeSupplier: { + id: msgId, + signerId: signer, + supplierId: operatorAddress, + ownerId: ownerAddress, + stakeAmount: BigInt(stake.amount), + stakeDenom: stake.denom, + blockId: getBlockId(msg.block), + transactionId: msg.tx.hash, + messageId: msgId, + }, + ...getServices( + rawServices, + operatorAddress, + Object.keys(record[operatorAddress]?.services || {}) ) + } +} + +function _handleUnstakeSupplierMsg( + msg: CosmosMessage, + record: Record, + }> +): { + supplier: SupplierProps, + unstakedMsg: MsgUnstakeSupplierProps +} { + const {operatorAddress, signer} = msg.msg.decodedMsg; + + const supplier = record[operatorAddress]?.supplier; + + if (!supplier) { + throw new Error(`[handleUnstakeSupplierMsg] supplier not found for operator address ${msg.msg.decodedMsg.operatorAddress}`); + } + + const msgId = messageId(msg); + + supplier.stakeStatus = StakeStatus.Unstaking; + supplier.unstakingBeginBlockId = getBlockId(msg.block); + + return { + supplier: { + ...supplier, + stakeStatus: StakeStatus.Unstaking, + unstakingBeginBlockId: getBlockId(msg.block) + }, + unstakedMsg: { + id: msgId, + signerId: signer, + supplierId: operatorAddress, + blockId: getBlockId(msg.block), + transactionId: msg.tx.hash, + messageId: msgId, + } + } +} + +// V2 handler for EventSupplierSlashed (batch processing - used in indexSupplier) +function _getValuesOldEventSupplierSlashed(event: CosmosEvent) { + let slashingCoin: CoinSDKType | null = null, operatorAddress = ""; + + for (const attribute of event.event.attributes) { + if (attribute.key === "slashing_amount") { + slashingCoin = getDenomAndAmount(attribute.value as string); + } + + if (attribute.key === "proof_missing_penalty") { + const coins = parseCoins(parseAttribute(attribute.value)); + if (!coins.length) { + throw new Error(`[handleEventSupplierSlashed] event attribute key=${attribute.key} value=${attribute.value} is not a valid coin`); + } + } + + if (attribute.key === "supplier_operator_addr" || attribute.key === "supplier_operator_address") { + operatorAddress = parseAttribute(attribute.value); + } + } + + return { + proofMissingPenalty: slashingCoin, + operatorAddress, + proofValidationStatus: undefined, + application: "", + service: "", + session: "", + sessionStartHeight: BigInt(0), + sessionEndHeight: BigInt(0), + } +} + +function _getValuesEventSupplierSlashed(event: CosmosEvent) { + const { + claim, + proofMissingPenalty, + } = getAttributes(event.event.attributes); + + if (!claim || !claim.session_header || Object.keys(claim).length === 0) { + logger.warn(`[handleEventSupplierSlashed] claim not found in event, trying to handle with previous version`); + return _getValuesOldEventSupplierSlashed(event); + } + + return { + operatorAddress: claim.supplier_operator_address, + application: claim.session_header.application_address, + service: claim.session_header.service_id, + session: claim.session_header.session_id || "", + sessionEndHeight: BigInt(claim.session_header.session_end_block_height || "0"), + sessionStartHeight: BigInt(claim.session_header.session_start_block_height || "0"), + proofMissingPenalty, + proofValidationStatus: getClaimProofStatusFromSDK(claim.proof_validation_status), + } +} + +export function _handleEventSupplierSlashed( + event: CosmosEvent, + record: Record, + }> +): { + supplier: SupplierProps, + slashingEvent: EventSupplierSlashedProps, +} { + const { + application, + operatorAddress, + proofMissingPenalty, + proofValidationStatus, + service, + session, + sessionEndHeight, + sessionStartHeight, + } = _getValuesEventSupplierSlashed(event); + + if (!operatorAddress) { + throw new Error(`[handleEventSupplierSlashed] operatorAddress not found in event`); + } + + if (!proofMissingPenalty) { + throw new Error(`[handleEventSupplierSlashed] proofMissingPenalty not found in event`); + } + + const currentSupplier = record[operatorAddress]?.supplier; + + if (!currentSupplier) { + throw new Error(`[handleEventSupplierSlashed] supplier not found for address: ${operatorAddress}`); + } + + const previousStakeAmount = currentSupplier.stakeAmount.valueOf(); + const afterStakeAmount = currentSupplier.stakeAmount - BigInt(proofMissingPenalty.amount); + + return { + supplier: { + ...currentSupplier, + stakeAmount: afterStakeAmount, + }, + slashingEvent: { + id: getEventId(event), + supplierId: operatorAddress, + applicationId: application, + serviceId: service, + sessionId: session || "", + sessionEndHeight, + sessionStartHeight, + blockId: getBlockId(event.block), + eventId: getEventId(event), + proofMissingPenalty: BigInt(proofMissingPenalty.amount), + proofMissingPenaltyDenom: proofMissingPenalty.denom, + previousStakeAmount, + afterStakeAmount, + proofValidationStatus: proofValidationStatus, + } + } +} + +// Type definitions for indexSupplier +type GetIdFromEventAttribute = (attributes: CosmosEvent["event"]["attributes"]) => string | Array | null; +type RecordGetId = Record; + +interface MessageByType { + [key: string]: Array +} + +interface EventByType { + [key: string]: Array +} + +interface SupplierRecord { + supplier?: SupplierProps; + services?: Record; +} + +// Helper: Get record ID getters configuration +function getSupplierRecordIdGetters(): RecordGetId { + const eventGetId = (attributes: CosmosEvent["event"]["attributes"]) => { + for (const attribute of attributes) { + if (attribute.key === "supplier") { + return JSON.parse(attribute.value as string).operator_address; + } + + if (attribute.key === "operator_address") { + return (attribute.value as string).replaceAll('"', ''); + } + } + + return null; + }; + + const slashingGetId = (attributes: CosmosEvent["event"]["attributes"]) => { + for (const attribute of attributes) { + if (attribute.key === "supplier_operator_addr" || attribute.key === "supplier_operator_address") { + return (attribute.value as string).replaceAll('"', ''); + } + + if (attribute.key === "claim") { + return JSON.parse(attribute.value as string).supplier_operator_address; + } + + if (attribute.key === "supplier_operator_address") { + return (attribute.value as string).replaceAll('"', ''); + } + } + + return null; + }; + + return { + "/pocket.supplier.MsgUnstakeSupplier": "operatorAddress", + "/pocket.supplier.MsgStakeSupplier": "operatorAddress", + "/pocket.migration.MsgClaimMorseSupplier": "shannonOperatorAddress", + "pocket.supplier.EventSupplierUnbondingBegin": eventGetId, + "pocket.supplier.EventSupplierUnbondingEnd": eventGetId, + "pocket.supplier.EventSupplierServiceConfigActivated": eventGetId, + "pocket.tokenomics.EventSupplierSlashed": slashingGetId, + }; +} + +// Helper: Collect supplier IDs from events and messages +function collectSupplierIds( + eventsAndMessages: Array, + recordId: RecordGetId +): { + suppliers: Array; + suppliersToFetchServices: Array; +} { + const suppliers: Array = []; + const suppliersToFetchServices: Array = []; + + for (const eventOrMsg of eventsAndMessages) { + if ('event' in eventOrMsg) { + const getEntityId = recordId[eventOrMsg.event.type] as GetIdFromEventAttribute; + const ids = getEntityId(eventOrMsg.event.attributes); + + if ([ + "pocket.tokenomics.EventSupplierSlashed", + "pocket.supplier.EventSupplierUnbondingBegin", + "pocket.supplier.EventSupplierUnbondingEnd", + ].includes(eventOrMsg.event.type)) { + if (typeof ids === "string") { + suppliers.push(ids); + } else if (ids) { + suppliers.push(...ids); + } + } + + if (eventOrMsg.event.type === "pocket.supplier.EventSupplierServiceConfigActivated") { + if (typeof ids === "string") { + suppliersToFetchServices.push(ids); + } else if (ids) { + suppliersToFetchServices.push(...ids); + } + } + } else { + const entityIdPath = recordId[eventOrMsg.msg.typeUrl] as string; + + if ([ + "/pocket.supplier.MsgUnstakeSupplier", + "/pocket.supplier.MsgStakeSupplier", + "/pocket.migration.MsgClaimMorseSupplier", + ].includes(eventOrMsg.msg.typeUrl)) { + const id = get(eventOrMsg.msg.decodedMsg, entityIdPath); + suppliers.push(id); + + if (eventOrMsg.msg.typeUrl !== "/pocket.supplier.MsgUnstakeSupplier") { + suppliersToFetchServices.push(id); + } + } + } + } + + return { suppliers, suppliersToFetchServices }; +} + +// Helper: Fetch and prepare supplier records +async function fetchSupplierData( + suppliers: Array, + suppliersToFetchServices: Array +): Promise> { + const [fetchedSuppliers, fetchedServices] = await Promise.all([ + fetchPaginatedRecords({ + fetchFn: (options) => Supplier.getByFields( + [['id', 'in', Array.from(new Set(suppliers))]], + options + ) + }), + fetchPaginatedRecords({ + fetchFn: (options) => SupplierServiceConfig.getByFields( + [['supplierId', 'in', Array.from(new Set(suppliersToFetchServices))]], + options + ) + }) ]); + + const record: Record = {}; + + for (const supplier of fetchedSuppliers) { + record[supplier.id] = { + supplier: supplier, + services: {} + }; + } + + for (const service of fetchedServices) { + if (!record[service.supplierId]) { + record[service.supplierId] = { services: {} }; + } + + if (!record[service.supplierId].services) { + record[service.supplierId].services = {}; + } + + record[service.supplierId].services![service.id] = service; + } + + return record; } -export async function handleUnstakeSupplierMsg( - messages: Array>, -): Promise { - await Promise.all(messages.map(_handleUnstakeSupplierMsg)); +// Helper: Process all events and messages +function processSupplierEventsAndMessages( + eventsAndMessages: Array, + record: Record, + recordId: RecordGetId +): { + suppliersToClose: Array; + servicesToClose: Array; + stakeMsgs: Array; + claimMsgs: Array; + unstakeMsgs: Array; + serviceConfigActivatedEvents: Array; + slashingEvents: Array; + unbondingBeginEvents: Array; + unbondingEndEvents: Array; +} { + const suppliersToClose: Array = Object.keys(record).filter(id => record[id].supplier); + const servicesToClose: Array = []; + const stakeMsgs: Array = []; + const claimMsgs: Array = []; + const unstakeMsgs: Array = []; + const serviceConfigActivatedEvents: Array = []; + const slashingEvents: Array = []; + const unbondingBeginEvents: Array = []; + const unbondingEndEvents: Array = []; + + for (const eventOrMsg of eventsAndMessages) { + if ('event' in eventOrMsg) { + if (eventOrMsg.event.type === "pocket.supplier.EventSupplierServiceConfigActivated") { + const { serviceConfigEvent, services } = _handleEventSupplierServiceConfigActivated(eventOrMsg, record); + const getId = recordId[eventOrMsg.event.type] as GetIdFromEventAttribute; + const operator = getId(eventOrMsg.event.attributes) as string; + + for (const service of services) { + record[operator].services![service.id] = service; + servicesToClose.push(service.id); + } + + serviceConfigActivatedEvents.push(serviceConfigEvent); + } + + if (eventOrMsg.event.type === "pocket.tokenomics.EventSupplierSlashed") { + const { slashingEvent, supplier } = _handleEventSupplierSlashed(eventOrMsg, record); + slashingEvents.push(slashingEvent); + record[supplier.id].supplier = supplier; + } + + if (eventOrMsg.event.type === "pocket.supplier.EventSupplierUnbondingBegin") { + const { supplier, unbondingBeginEvent } = _handleSupplierUnbondingBeginEvent(eventOrMsg, record); + record[supplier.id].supplier = supplier; + unbondingBeginEvents.push(unbondingBeginEvent); + } + + if (eventOrMsg.event.type === "pocket.supplier.EventSupplierUnbondingEnd") { + const { servicesToRemove, supplier, unbondingEndEvent } = _handleSupplierUnbondingEndEvent(eventOrMsg, record); + + for (const serviceId of servicesToRemove) { + delete record[supplier.id].services![serviceId]; + servicesToClose.push(serviceId); + } + + record[supplier.id].supplier = supplier; + unbondingEndEvents.push(unbondingEndEvent); + } + } else { + if (eventOrMsg.msg.typeUrl === "/pocket.supplier.MsgStakeSupplier") { + const { msgStakeSupplier, services, servicesToRemove, supplier } = _handleSupplierStakeMsg( + eventOrMsg as CosmosMessage, + record + ); + + stakeMsgs.push(msgStakeSupplier); + + if (!record[supplier.id]) { + record[supplier.id] = { services: {} }; + } + + record[supplier.id].supplier = supplier; + + for (const serviceId of servicesToRemove) { + delete record[supplier.id].services![serviceId]; + servicesToClose.push(serviceId); + } + + for (const service of services) { + record[supplier.id].services![service.id] = service; + servicesToClose.push(service.id); + } + } + + if (eventOrMsg.msg.typeUrl === "/pocket.migration.MsgClaimMorseSupplier") { + const { msgClaimSupplier, services, servicesToRemove, supplier } = _handleClaimSupplier(eventOrMsg, record); + + claimMsgs.push(msgClaimSupplier); + + if (!record[supplier.id]) { + record[supplier.id] = { services: {} }; + } + + record[supplier.id].supplier = supplier; + + for (const serviceId of servicesToRemove) { + delete record[supplier.id].services![serviceId]; + servicesToClose.push(serviceId); + } + + for (const service of services) { + record[supplier.id].services![service.id] = service; + servicesToClose.push(service.id); + } + } + + if (eventOrMsg.msg.typeUrl === "/pocket.supplier.MsgUnstakeSupplier") { + const { supplier, unstakedMsg } = _handleUnstakeSupplierMsg(eventOrMsg, record); + record[supplier.id].supplier = supplier; + unstakeMsgs.push(unstakedMsg); + } + } + } + + return { + suppliersToClose, + servicesToClose, + stakeMsgs, + claimMsgs, + unstakeMsgs, + serviceConfigActivatedEvents, + slashingEvents, + unbondingBeginEvents, + unbondingEndEvents + }; } -export async function handleEventSupplierServiceConfigActivated( - events: Array, -): Promise { - await Promise.all(events.map(_handleEventSupplierServiceConfigActivated)); +// Helper: Build lists of items to save +function buildSupplierSaveLists(record: Record): { + suppliersToSave: Array; + servicesToSave: Array; +} { + const suppliersToSave: Array = []; + const servicesToSave: Array = []; + + for (const { services, supplier } of Object.values(record)) { + if (supplier) { + suppliersToSave.push(supplier); + } + + if (services) { + servicesToSave.push(...Object.values(services)); + } + } + + return { suppliersToSave, servicesToSave }; } -export async function handleSupplierUnbondingBeginEvent( - events: Array, -): Promise { - await Promise.all(events.map(_handleSupplierUnbondingBeginEvent)); +// Main function: Index supplier data +export async function indexSupplier(msgByType: MessageByType, eventByType: EventByType): Promise { + const msgTypes = [ + "/pocket.supplier.MsgUnstakeSupplier", + "/pocket.migration.MsgClaimMorseSupplier", + "/pocket.supplier.MsgStakeSupplier", + ]; + + const eventTypes = [ + "pocket.supplier.EventSupplierUnbondingBegin", + "pocket.supplier.EventSupplierUnbondingEnd", + "pocket.supplier.EventSupplierServiceConfigActivated", + "pocket.tokenomics.EventSupplierSlashed" + ]; + + const recordId = getSupplierRecordIdGetters(); + + const eventsAndMessages = sortEventsAndMsgs([ + ...msgTypes.map(type => msgByType[type]).flat(), + ...eventTypes.map(type => eventByType[type]).flat() + ]); + + const { suppliers, suppliersToFetchServices } = collectSupplierIds(eventsAndMessages, recordId); + const record = await fetchSupplierData(suppliers, suppliersToFetchServices); + + const { + claimMsgs, + serviceConfigActivatedEvents, + servicesToClose, + slashingEvents, + stakeMsgs, + suppliersToClose, + unbondingBeginEvents, + unbondingEndEvents, + unstakeMsgs + } = processSupplierEventsAndMessages(eventsAndMessages, record, recordId); + + const { servicesToSave, suppliersToSave } = buildSupplierSaveLists(record); + + await performSupplierDatabaseOperations({ + suppliersToSave, + servicesToSave, + suppliersToClose, + servicesToClose, + stakeMsgs, + claimMsgs, + unstakeMsgs, + serviceConfigActivatedEvents, + slashingEvents, + unbondingBeginEvents, + unbondingEndEvents + }); } -export async function handleSupplierUnbondingEndEvent( - events: Array, -): Promise { - await Promise.all(events.map(_handleSupplierUnbondingEndEvent)); +// Helper: Perform database operations (delete, close, save) +// eslint-disable-next-line complexity +async function performSupplierDatabaseOperations(data: { + suppliersToSave: Array; + servicesToSave: Array; + suppliersToClose: Array; + servicesToClose: Array; + stakeMsgs: Array; + claimMsgs: Array; + unstakeMsgs: Array; + serviceConfigActivatedEvents: Array; + slashingEvents: Array; + unbondingBeginEvents: Array; + unbondingEndEvents: Array; +}): Promise { + const block = store.context.getHistoricalUnit(); + + const removeRecords = (model: string) => { + const sequelize = getSequelize(model); + return getStoreModel(model).model.destroy({ + where: sequelize.where( + sequelize.fn("lower", sequelize.col("_block_range")), + block + ), + transaction: store.context.transaction, + }); + }; + + const SupplierModel = getStoreModel("Supplier"); + const SupplierServiceConfigModel = getStoreModel("SupplierServiceConfig"); + + // Delete records created at this block + const deletePromises: Array> = []; + + if (data.suppliersToSave.length > 0) deletePromises.push(removeRecords("Supplier")); + if (data.servicesToSave.length > 0) deletePromises.push(removeRecords("SupplierServiceConfig")); + if (data.stakeMsgs.length > 0) deletePromises.push(removeRecords("MsgStakeSupplier")); + if (data.claimMsgs.length > 0) deletePromises.push(removeRecords("MsgClaimMorseSupplier")); + if (data.unstakeMsgs.length > 0) deletePromises.push(removeRecords("MsgUnstakeSupplier")); + if (data.serviceConfigActivatedEvents.length > 0) deletePromises.push(removeRecords("EventSupplierServiceConfigActivated")); + if (data.slashingEvents.length > 0) deletePromises.push(removeRecords("EventSupplierSlashed")); + if (data.unbondingBeginEvents.length > 0) deletePromises.push(removeRecords("EventSupplierUnbondingBegin")); + if (data.unbondingEndEvents.length > 0) deletePromises.push(removeRecords("EventSupplierUnbondingEnd")); + + if (deletePromises.length > 0) { + await Promise.all(deletePromises); + } + + // Close block ranges + const closePromises: Array> = []; + + if (data.suppliersToClose.length > 0) { + const supplierSequelize = getSequelize("Supplier"); + closePromises.push( + SupplierModel.model.update( + { + __block_range: supplierSequelize.fn( + "int8range", + supplierSequelize.fn("lower", supplierSequelize.col("_block_range")), + BigInt(block), + '[)' + ), + }, + { + where: { + id: { [Symbol.for("in")]: data.suppliersToClose }, + __block_range: { [Symbol.for("contains")]: BigInt(block) }, + }, + hooks: false, + transaction: store.context.transaction, + } + ) + ); + } + + if (data.servicesToClose.length > 0) { + const servicesSequelize = getSequelize("SupplierServiceConfig"); + closePromises.push( + SupplierServiceConfigModel.model.update( + { + __block_range: servicesSequelize.fn( + "int8range", + servicesSequelize.fn("lower", servicesSequelize.col("_block_range")), + BigInt(block), + '[)' + ), + }, + { + where: { + id: { [Symbol.for("in")]: data.servicesToClose }, + __block_range: { [Symbol.for("contains")]: BigInt(block) }, + }, + hooks: false, + transaction: store.context.transaction, + } + ) + ); + } + + if (closePromises.length > 0) { + await Promise.all(closePromises); + } + + // Save new records + const assignBlockRange = (doc: object) => ({ ...doc, __block_range: [block, null] }); + const savePromises: Array> = []; + + if (data.suppliersToSave.length > 0) { + savePromises.push(optimizedBulkCreate("Supplier", data.suppliersToSave, assignBlockRange)); + } + if (data.servicesToSave.length > 0) { + savePromises.push(optimizedBulkCreate("SupplierServiceConfig", data.servicesToSave, assignBlockRange)); + } + if (data.stakeMsgs.length > 0) { + savePromises.push(optimizedBulkCreate("MsgStakeSupplier", data.stakeMsgs, assignBlockRange)); + } + if (data.claimMsgs.length > 0) { + savePromises.push(optimizedBulkCreate("MsgClaimMorseSupplier", data.claimMsgs, assignBlockRange)); + } + if (data.unstakeMsgs.length > 0) { + savePromises.push(optimizedBulkCreate("MsgUnstakeSupplier", data.unstakeMsgs, assignBlockRange)); + } + if (data.serviceConfigActivatedEvents.length > 0) { + savePromises.push(optimizedBulkCreate("EventSupplierServiceConfigActivated", data.serviceConfigActivatedEvents, assignBlockRange)); + } + if (data.unbondingBeginEvents.length > 0) { + savePromises.push(optimizedBulkCreate("EventSupplierUnbondingBegin", data.unbondingBeginEvents, assignBlockRange)); + } + if (data.unbondingEndEvents.length > 0) { + savePromises.push(optimizedBulkCreate("EventSupplierUnbondingEnd", data.unbondingEndEvents, assignBlockRange)); + } + if (data.slashingEvents.length > 0) { + savePromises.push(optimizedBulkCreate("EventSupplierSlashed", data.slashingEvents, assignBlockRange)); + } + + if (savePromises.length > 0) { + await Promise.all(savePromises); + } +} + +// Helper: Sort events and messages by transaction order +function sortEventsAndMsgs(allData: Array): Array { + const allEvents: Array = []; + const allMsgs: Array = []; + + for (const datum of allData) { + if ('event' in datum) { + allEvents.push(datum); + } else { + allMsgs.push(datum); + } + } + + const { success: successfulEvents } = filterEventsByTxStatus(allEvents); + const { success: successfulMsgs } = filterMsgByTxStatus(allMsgs); + + const finalizedEvents: Array = []; + const nonFinalizedData: Array<(CosmosEvent | CosmosMessage) & { rank: 0 | 1 }> = []; + + for (const datum of [...successfulEvents, ...successfulMsgs]) { + if ('event' in datum && isEventOfFinalizedBlockKind(datum)) { + finalizedEvents.push(datum); + } else { + nonFinalizedData.push({ + ...datum, + rank: 'event' in datum ? 1 : 0 + }); + } + } + + return [ + ...orderBy(nonFinalizedData, ['tx.idx', 'rank', 'idx'], ['asc', 'asc', 'asc']), + ...orderBy(finalizedEvents, ['idx'], ['asc']) + ]; } diff --git a/src/mappings/utils/json.ts b/src/mappings/utils/json.ts index b66aa77..948ec94 100644 --- a/src/mappings/utils/json.ts +++ b/src/mappings/utils/json.ts @@ -18,3 +18,7 @@ export function sanitize(value: unknown): string { // otherwise return it as a stringifies object return stringify(value); } + +export function parseAttribute(attribute: unknown = ""): string { + return (attribute as string).replaceAll("\"", ""); +}