Skip to content

feat: add aggregator use example #646

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
60 commits
Select commit Hold shift + click to select a range
c593008
Add initial operator implementation idea
maximopalopoli Apr 8, 2025
cba3bb0
Modify operator to do a reduced set of things
maximopalopoli Apr 9, 2025
f3413ad
Change ProcessTaskResponse interface func to receive 32 bytes
maximopalopoli Apr 9, 2025
6fedb7a
Remove unused or commented fields in structs
maximopalopoli Apr 9, 2025
7deb520
Receive logger and taskprocessor by parameter
maximopalopoli Apr 9, 2025
6331afb
fmt changes
maximopalopoli Apr 9, 2025
8a5d5ac
Add change to changelog file
maximopalopoli Apr 9, 2025
9f4c423
Update anvil state
maximopalopoli Apr 9, 2025
5293b61
Revert "Update anvil state"
maximopalopoli Apr 9, 2025
ff0e896
Modify config attribute's name
maximopalopoli Apr 9, 2025
34b0392
Remove TimesFailing config attribute
maximopalopoli Apr 9, 2025
0d86930
fmt changes
maximopalopoli Apr 9, 2025
ba65c25
Remove v2 entries from changelog
maximopalopoli Apr 9, 2025
0facd78
Add initial implementation of task generator
maximopalopoli Apr 9, 2025
71ac2a3
Add comment on attribute
maximopalopoli Apr 9, 2025
84fb308
Remove example related code
maximopalopoli Apr 9, 2025
3cb44ea
make fmt changes
maximopalopoli Apr 9, 2025
f1742f2
Add example contracts bindings
maximopalopoli Apr 10, 2025
5019234
Add use example for taskGenerator
maximopalopoli Apr 10, 2025
043f5f2
Capitalize interface TaskGen Logic method
maximopalopoli Apr 10, 2025
9584955
format changes
maximopalopoli Apr 10, 2025
b8f09c9
Change name of attribute that defines time between iterations
maximopalopoli Apr 10, 2025
821d38b
Change logs and task number variable name
maximopalopoli Apr 10, 2025
b5fe355
Change interval duration variable to int type
maximopalopoli Apr 10, 2025
d041a97
Remove logger from TaskGenLogic parameters
maximopalopoli Apr 10, 2025
f6adf34
Add config values to the AvsConfig struct
maximopalopoli Apr 10, 2025
085ebcb
Remove unncesary code
maximopalopoli Apr 10, 2025
b206c65
Move TxMgr generation to a separate function
maximopalopoli Apr 10, 2025
ce6fea8
Remove service Manager binding since it is unused
maximopalopoli Apr 10, 2025
2791482
Add inital challenger implementation
maximopalopoli Apr 10, 2025
1fd2e5f
Remove unused Client interface
maximopalopoli Apr 10, 2025
f7ffc2c
Reassign specific topic instead of all structure
maximopalopoli Apr 10, 2025
39a749a
format changes
maximopalopoli Apr 10, 2025
e42b538
Add example for challenger
maximopalopoli Apr 10, 2025
7e5bc94
Remove unnecesary interface
maximopalopoli Apr 10, 2025
7c4272e
format changes
maximopalopoli Apr 10, 2025
55f4925
Complete Challenger config initialization
maximopalopoli Apr 10, 2025
d24443d
Move challenge function to after process events functions
maximopalopoli Apr 10, 2025
a99ee7e
Add comment on unnecesary validation
maximopalopoli Apr 10, 2025
b79f5d6
more formatting changes
maximopalopoli Apr 10, 2025
4426d3a
Move registration to a separate file
maximopalopoli Apr 11, 2025
9751018
fmt changes
maximopalopoli Apr 11, 2025
561d607
Fix logging error
maximopalopoli Apr 14, 2025
49175f9
Change ProcessNewTaskCreatedLog intrface method input parameter
maximopalopoli Apr 14, 2025
a98e0d5
Modify interfaces with parameter of type any to receive a log instead
maximopalopoli Apr 14, 2025
e1fcd3f
Move task manager binding to an upper folder in /examples
maximopalopoli Apr 14, 2025
abcc807
Merge branch 'v2' into fix/improve-interfaces-parameters
maximopalopoli Apr 14, 2025
b160e21
Delete duplicated binding
maximopalopoli Apr 14, 2025
829c174
Remove task generator's example bindings subfolder
maximopalopoli Apr 14, 2025
6eae9ff
Change operator AggregatorRpcClient to use generic instead of empty i…
maximopalopoli Apr 15, 2025
76bde58
Avoid passing type as parameter, and restore inteface
maximopalopoli Apr 15, 2025
53aaa78
fmt changes
maximopalopoli Apr 15, 2025
e7cee63
Fix operator use example
maximopalopoli Apr 15, 2025
c56e191
Add Generic response type param to aggregator
maximopalopoli Apr 15, 2025
0ba82ee
Merge branch 'v2' into fix/improve-interfaces-parameters
MegaRedHand Apr 16, 2025
bf89a45
Add initial idea of the use example
maximopalopoli Apr 21, 2025
65d1c53
fmt changes
maximopalopoli Apr 21, 2025
70e9b35
Add utils and constants to the aggregator example
maximopalopoli Apr 22, 2025
7eef6f2
Merge branch 'v2-dev-0' into feat/add-aggregator-use-example
maximopalopoli Apr 28, 2025
5ab892c
Update example with the new changes
maximopalopoli Apr 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
350 changes: 350 additions & 0 deletions examples/aggregator/aggregator_use_example.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,350 @@
package aggregator_example

import (
"context"
"math/big"

"github.com/Layr-Labs/eigensdk-go/aggregator"
"github.com/Layr-Labs/eigensdk-go/chainio/txmgr"
"github.com/Layr-Labs/eigensdk-go/challenger"
"github.com/Layr-Labs/eigensdk-go/crypto/bls"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/testutils"
"github.com/Layr-Labs/eigensdk-go/types"
"github.com/Layr-Labs/eigensdk-go/utils"
"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
"golang.org/x/crypto/sha3"

cstaskmanager "github.com/Layr-Labs/eigensdk-go/examples/bindings/taskManager"
taskgeneratorexample "github.com/Layr-Labs/eigensdk-go/examples/task-generator"
blsagg "github.com/Layr-Labs/eigensdk-go/services/bls_aggregation"
gethtypes "github.com/ethereum/go-ethereum/core/types"
)

type TaskResponseData struct {
TaskResponse cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse
TaskResponseMetadata cstaskmanager.IIncredibleSquaringTaskManagerTaskResponseMetadata
NonSigningOperatorPubKeys []cstaskmanager.BN254G1Point
}

func main() {
logger, err := logging.NewZapLogger(logging.Production) // Change here if want to change logging level
if err != nil {
println("Failure creating logger")
return
}

ethHttpUrl := "http://localhost:8545"
ethHttpClient, err := ethclient.Dial(ethHttpUrl)
if err != nil {
return
}

txMgr, err := taskgeneratorexample.GetTxManager(logger, ethHttpClient, testutils.ANVIL_FIRST_PRIVATE_KEY)
if err != nil {
return
}

ecdsaPrivateKey, err := crypto.HexToECDSA(testutils.ANVIL_FIRST_PRIVATE_KEY)
if err != nil {
logger.Errorf("Cannot parse ecdsa private key", "err", err)
return
}

cfg := aggregator.AggregatorConfig{
RegistryCoordinatorAddress: common.HexToAddress("0x7bc06c482dead17c0e297afbc32f6e63d3846650"),
OperatorStateRetrieverAddress: common.HexToAddress("0x4c5859f0f772848b2d91f1d83e2fe57935348029"),
ServiceManagerAddress: common.HexToAddress("0x5f3f1dbd7b74c6b46e8c44f98792a1daf8d69154"),
EthHttpClient: ethHttpClient,
Logger: logger,
EthHttpUrl: ethHttpUrl,
EthWsUrl: "ws://localhost:8545",
EcdsaPrivateKey: ecdsaPrivateKey,
AggregatorServerIpPortAddr: "localhost:8090",
}

taskProcessor, err := NewTaskProcessor(&cfg, txMgr)
if err != nil {
logger.Fatalf(err.Error())
}

taskManagerAbi, err := cstaskmanager.ContractIncredibleSquaringTaskManagerMetaData.GetAbi()
if err != nil {
logger.Fatalf(err.Error())
}

// This is the same hash function used by the operator to hash the task response before signing it.
hashFunction := func(taskResponse types.TaskResponse) (types.TaskResponseDigest, error) {
// The order here has to match the field ordering of cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse
taskResponseType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{
{
Name: "referenceTaskIndex",
Type: "uint32",
},
{
Name: "numberSquared",
Type: "uint256",
},
})
if err != nil {
return types.TaskResponseDigest{}, utils.WrapError("Error creating taskResponseType", err)
}
arguments := abi.Arguments{
{
Type: taskResponseType,
},
}

encodeTaskResponseByte, err := arguments.Pack(taskResponse)
if err != nil {
return types.TaskResponseDigest{}, utils.WrapError("Error Packing taskResponse", err)
}

var taskResponseDigest [32]byte
hasher := sha3.NewLegacyKeccak256()
hasher.Write(encodeTaskResponseByte)
copy(taskResponseDigest[:], hasher.Sum(nil)[:32])

return taskResponseDigest, nil
}
cfg.TaskResponseHashFn = hashFunction

blockHash := taskManagerAbi.Events["NewTaskCreated"].ID
agg, err := aggregator.NewAggregator[*big.Int, *big.Int](cfg, taskProcessor, blockHash, taskManagerAbi)
if err != nil {
logger.Fatalf(err.Error())
}

err = agg.Start(context.Background())
if err != nil {
logger.Fatalf(err.Error())
}
}

type IncredibleTaskProcessor struct {
logger logging.Logger
avsWriter AvsWriter
}

var _ aggregator.TaskProcessor[*big.Int] = (*IncredibleTaskProcessor)(nil)

func NewTaskProcessor(c *aggregator.AggregatorConfig, txMgr txmgr.TxManager) (*IncredibleTaskProcessor, error) {
avsConfig := AvsConfig{
Logger: c.Logger,
//IncredibleSquaringTaskManager: taskMana,
TxMgr: txMgr,
EthHttpClient: c.EthHttpClient,
}
avsWriter, err := BuildAvsWriterFromConfig(&avsConfig)
if err != nil {
c.Logger.Errorf("Cannot create avsWriter", "err", err)
return nil, err
}

return &IncredibleTaskProcessor{
logger: c.Logger,
avsWriter: *avsWriter,
}, nil
}

// This method sends the aggregated response to the on-chain Task Manager contract
func (tp *IncredibleTaskProcessor) ProcessAggregatedResponse(
ctx context.Context,
response blsagg.BlsAggregationServiceResponse,
task challenger.GenericInputTask[*big.Int],
) error {
if response.Err != nil {
return utils.WrapError("BlsAggregationServiceResponse contains an error", response.Err)
}
nonSignerPubkeys := []cstaskmanager.BN254G1Point{}
for _, nonSignerPubkey := range response.NonSignersPubkeysG1 {
nonSignerPubkeys = append(nonSignerPubkeys, ConvertToBN254G1Point(nonSignerPubkey))
}
quorumApks := []cstaskmanager.BN254G1Point{}
for _, quorumApk := range response.QuorumApksG1 {
quorumApks = append(quorumApks, ConvertToBN254G1Point(quorumApk))
}
nonSignerStakesAndSignature := cstaskmanager.IBLSSignatureCheckerTypesNonSignerStakesAndSignature{
NonSignerPubkeys: nonSignerPubkeys,
QuorumApks: quorumApks,
ApkG2: ConvertToBN254G2Point(response.SignersApkG2),
Sigma: ConvertToBN254G1Point(response.SignersAggSigG1.G1Point),
NonSignerQuorumBitmapIndices: response.NonSignerQuorumBitmapIndices,
QuorumApkIndices: response.QuorumApkIndices,
TotalStakeIndices: response.TotalStakeIndices,
NonSignerStakeIndices: response.NonSignerStakeIndices,
}

tp.logger.Info("Threshold reached. Sending aggregated response onchain.", "taskIndex", response.TaskIndex)

taskResponseAgg, ok := response.TaskResponse.(challenger.GenericOutputTaskResponse[*big.Int])
if !ok {
tp.logger.Error("task Response could not be converted to sdk aggregator's Task Response type")
}

taskResponse := cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse{
ReferenceTaskIndex: taskResponseAgg.ReferenceTaskIndex,
NumberSquared: taskResponseAgg.OutputValue,
}

incredibleSquaringTask := cstaskmanager.IIncredibleSquaringTaskManagerTask{
NumberToBeSquared: task.InputValue,
TaskCreatedBlock: task.TaskCreatedBlock,
QuorumNumbers: task.QuorumNumbers,
QuorumThresholdPercentage: task.QuorumThresholdPercentage,
}

_, err := tp.avsWriter.SendAggregatedResponse(
context.Background(),
incredibleSquaringTask,
taskResponse,
nonSignerStakesAndSignature,
)
if err != nil {
return utils.WrapError("Aggregator failed to respond to task", err)
}
return nil
}

type IncredibleSquaringTaskResponse struct {
ReferenceTaskIndex uint32
NumberSquared *big.Int
}

func (tr IncredibleSquaringTaskResponse) TaskIndex() types.TaskIndex {
return tr.ReferenceTaskIndex
}

func (tr IncredibleSquaringTaskResponse) Digest() [32]byte {
tmresponse := cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse(tr)
taskResponseHash, err := GetTaskResponseDigest(&tmresponse)
if err != nil {
return [32]byte{}
}
return taskResponseHash
}

// Avs Writer
type AvsConfig struct {
Logger logging.Logger
IncredibleSquaringTaskManager common.Address
TxMgr txmgr.TxManager
EthHttpClient *ethclient.Client
}

type AvsWriter struct {
logger logging.Logger
TxMgr txmgr.TxManager
taskManagerContract *cstaskmanager.ContractIncredibleSquaringTaskManager
}

func BuildAvsWriterFromConfig(c *AvsConfig) (*AvsWriter, error) {
contractTaskManager, err := cstaskmanager.NewContractIncredibleSquaringTaskManager(
c.IncredibleSquaringTaskManager,
c.EthHttpClient,
)
if err != nil {
return nil, utils.WrapError("Failed to fetch task manager contract", err)
}

return &AvsWriter{
logger: c.Logger,
TxMgr: c.TxMgr,
taskManagerContract: contractTaskManager,
}, nil
}

func (w *AvsWriter) SendAggregatedResponse(
ctx context.Context, task cstaskmanager.IIncredibleSquaringTaskManagerTask,
taskResponse cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse,
nonSignerStakesAndSignature cstaskmanager.IBLSSignatureCheckerTypesNonSignerStakesAndSignature,
) (*gethtypes.Receipt, error) {
txOpts, err := w.TxMgr.GetNoSendTxOpts()
if err != nil {
w.logger.Errorf("Error getting tx opts")
return nil, err
}
tx, err := w.taskManagerContract.RespondToTask(txOpts, task, taskResponse, nonSignerStakesAndSignature)
if err != nil {
w.logger.Error("Error submitting SubmitTaskResponse tx while calling respondToTask", "err", err)
return nil, err
}
receipt, err := w.TxMgr.Send(ctx, tx, true)
if err != nil {
w.logger.Errorf("Error submitting respondToTask tx")
return nil, err
}
w.logger.Info("tx hash :respond to task")
return receipt, nil
}

// Incredible Squaring Utils
// this hardcodes abi.encode() for cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse
// unclear why abigen doesn't provide this out of the box...
func AbiEncodeTaskResponse(h *cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse) ([]byte, error) {

// The order here has to match the field ordering of cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse
taskResponseType, err := abi.NewType("tuple", "", []abi.ArgumentMarshaling{
{
Name: "referenceTaskIndex",
Type: "uint32",
},
{
Name: "numberSquared",
Type: "uint256",
},
})
if err != nil {
return nil, err
}
arguments := abi.Arguments{
{
Type: taskResponseType,
},
}

bytes, err := arguments.Pack(h)
if err != nil {
return nil, err
}

return bytes, nil
}

// GetTaskResponseDigest returns the hash of the TaskResponse, which is what operators sign over
func GetTaskResponseDigest(h *cstaskmanager.IIncredibleSquaringTaskManagerTaskResponse) ([32]byte, error) {

encodeTaskResponseByte, err := AbiEncodeTaskResponse(h)
if err != nil {
return [32]byte{}, err
}

var taskResponseDigest [32]byte
hasher := sha3.NewLegacyKeccak256()
hasher.Write(encodeTaskResponseByte)
copy(taskResponseDigest[:], hasher.Sum(nil)[:32])

return taskResponseDigest, nil
}

// BN254.sol is a library, so bindings for G1 Points and G2 Points are only generated
// in every contract that imports that library. Thus the output here will need to be
// type casted if G1Point is needed to interface with another contract (eg: BLSPublicKeyCompendium.sol)
func ConvertToBN254G1Point(input *bls.G1Point) cstaskmanager.BN254G1Point {
output := cstaskmanager.BN254G1Point{
X: input.X.BigInt(big.NewInt(0)),
Y: input.Y.BigInt(big.NewInt(0)),
}
return output
}

func ConvertToBN254G2Point(input *bls.G2Point) cstaskmanager.BN254G2Point {
output := cstaskmanager.BN254G2Point{
X: [2]*big.Int{input.X.A1.BigInt(big.NewInt(0)), input.X.A0.BigInt(big.NewInt(0))},
Y: [2]*big.Int{input.Y.A1.BigInt(big.NewInt(0)), input.Y.A0.BigInt(big.NewInt(0))},
}
return output
}
1 change: 0 additions & 1 deletion operator/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ type AggregatorRpcClient[Output any] struct {
func NewAggregatorRpcClient[Output any](
aggregatorIpPortAddr string,
logger logging.Logger,
// metrics metrics.Metrics,
) (*AggregatorRpcClient[Output], error) {
return &AggregatorRpcClient[Output]{
// set to nil so that we can create an rpc client even if the aggregator is not running
Expand Down
Loading