Skip to content

refactor: TaskGenerator rewrite #635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 30 additions & 98 deletions examples/task-generator/task_generator_use_example.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package taskgeneratorexample

import (
"context"
"iter"
"math/big"
"time"

Expand All @@ -11,51 +12,17 @@ import (
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/signerv2"
taskgenerator "github.com/Layr-Labs/eigensdk-go/task-generator"
"github.com/Layr-Labs/eigensdk-go/utils"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient"
)

// Task generator logic code
type TaskGenLogic struct {
avsWriter *AvsWriter
thresholdNumerator uint8
quorumNumbers []uint8
logger logging.Logger
}

func NewTaskGenLogic(c *AvsConfig, thresholdNumerator uint8, quorumNumbers []uint8) (*TaskGenLogic, error) {
avsWriter, err := BuildAvsWriterFromConfig(c)
if err != nil {
c.Logger.Errorf("Cannot create avsWriter", "err", err)
return nil, err
}

return &TaskGenLogic{
avsWriter, thresholdNumerator, quorumNumbers, c.Logger}, nil
}

func (tgl *TaskGenLogic) SendNewTask(taskNumber int64) error {
err := tgl.avsWriter.SendNewTaskNumberToSquare(context.Background(), big.NewInt(taskNumber),
tgl.thresholdNumerator, tgl.quorumNumbers)
if err != nil {
tgl.logger.Error("TaskGenerator failed to send number to square", "err", err)
return err
}

return nil
}

func main() {
logger, err := logging.NewZapLogger(logging.Development)
if err != nil {
return
}

thresholdNumerator := uint8(100)
quorumNumbers := []uint8{0}

// This pk should be related to the address passed to TaskManager as task_generator_addr when initialized
taskgeneratorPk := "0x4bbbf85ce3377467afe5d46f804f221813b2bb87f24d81f60f1fcdbf7cbf4356"

Expand All @@ -70,31 +37,38 @@ func main() {
return
}

// The values from this config are extracted from an incredible squaring config file and also the deployment output files
avsConfig := AvsConfig{
Logger: logger,
IncredibleSquaringTaskManager: common.HexToAddress("0x2bdcc0de6be1f7d2ee689a0342d76f52e8efaba3"),
TxMgr: txMgr,
EthHttpClient: ethHttpClient,
}
// This value is extracted from the deployment output files
taskManagerAddress := common.HexToAddress("0x2bdcc0de6be1f7d2ee689a0342d76f52e8efaba3")

logic, err := NewTaskGenLogic(&avsConfig, thresholdNumerator, quorumNumbers)
contractTaskManager, err := cstaskmanager.NewContractIncredibleSquaringTaskManager(
taskManagerAddress,
ethHttpClient,
)
if err != nil {
return
}

secondsInterval := 10 // This means TaskGenerator will send tasks every 10 seconds
taskGen, err := taskgenerator.BuildTaskGenerator(logger, logic, secondsInterval)
taskGeneratorConfig := taskgenerator.Config{
Logger: logger,
TimeBetweenTasks: 10 * time.Second,

QuorumThresholdPercentage: 100,
QuorumNumbers: []uint8{0},
}
taskGen, err := taskgenerator.NewTaskGenerator(contractTaskManager, txMgr, taskGeneratorConfig)
if err != nil {
return
}

err = taskGen.Start(context.Background())
seq := NewNumberToSquareSequence()

err = taskGen.Start(context.Background(), seq)
if err != nil {
return
}
}

// TODO: this should be in the SDK
func GetTxManager(logger logging.Logger, ethHttpClient *ethclient.Client, taskgeneratorPk string) (*txmgr.SimpleTxManager, error) {
ecdsaPrivateKey, err := crypto.HexToECDSA(taskgeneratorPk)
if err != nil {
Expand Down Expand Up @@ -123,59 +97,17 @@ func GetTxManager(logger logging.Logger, ethHttpClient *ethclient.Client, taskge
return txMgr, nil
}

// Avs Writer code
type AvsWriter struct {
logger logging.Logger
TxMgr txmgr.TxManager
taskManagerContract *cstaskmanager.ContractIncredibleSquaringTaskManager
}

type AvsConfig struct {
Logger logging.Logger
IncredibleSquaringTaskManager common.Address
TxMgr txmgr.TxManager
EthHttpClient *ethclient.Client
}

func BuildAvsWriterFromConfig(c *AvsConfig) (*AvsWriter, error) {
contractTaskManager, err := cstaskmanager.NewContractIncredibleSquaringTaskManager(
c.IncredibleSquaringTaskManager,
c.EthHttpClient,
)
if err != nil {
return nil, utils.WrapError("Failed to fetch IServiceManager contract", err)
}

return &AvsWriter{
logger: c.Logger,
TxMgr: c.TxMgr,
taskManagerContract: contractTaskManager,
}, nil
}

func (w *AvsWriter) SendNewTaskNumberToSquare(
ctx context.Context,
numToSquare *big.Int,
quorumThresholdPercentage uint8,
quorumNumbers []uint8,
) error {
txOpts, err := w.TxMgr.GetNoSendTxOpts()
if err != nil {
return utils.WrapError("Error getting tx opts", err)
// Returns an iterator for the sequence 1, 2, 3, ...
func NewNumberToSquareSequence() iter.Seq[*big.Int] {
acc := big.NewInt(1)
delta := big.NewInt(1)
return func(yield func(*big.Int) bool) {
for {
if !yield(acc) {
break
}
acc.Add(acc, delta)
}
}

tx, err := w.taskManagerContract.CreateNewTask(
txOpts,
numToSquare,
uint32(quorumThresholdPercentage),
quorumNumbers,
)
if err != nil {
return utils.WrapError("Error assembling CreateNewTask tx", err)
}
_, err = w.TxMgr.Send(ctx, tx, true)
if err != nil {
return utils.WrapError("Error submitting CreateNewTask tx", err)
}
return nil
}
19 changes: 19 additions & 0 deletions task-generator/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package taskgenerator

import (
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
)

// Task generator configuration struct.
//
// Contains optional parameters for the task generator.
// TODO: have default values
type Config struct {
Logger logging.Logger
TimeBetweenTasks time.Duration

QuorumThresholdPercentage uint32
QuorumNumbers []uint8
}
101 changes: 69 additions & 32 deletions task-generator/task_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,60 +2,97 @@ package taskgenerator

import (
"context"
"iter"
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/Layr-Labs/eigensdk-go/chainio/txmgr"
"github.com/Layr-Labs/eigensdk-go/utils"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
gethtypes "github.com/ethereum/go-ethereum/core/types"
)

type TaskGenerator struct {
logger logging.Logger
logic TaskGeneratorLogic
secondsInterval int
// Type is generic over the task input type
type TaskManager[Input any] interface {
CreateNewTask(opts *bind.TransactOpts, input Input, quorumThresholdPercentage uint32, quorumNumbers []byte) (*gethtypes.Transaction, error)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't like that this requires the implementor to return the Transaction because it limits the kind of things they could do inside of this. However, one could hack around this restriction by simply returning an empty transaction and nil error, and using a custom TxManager that ignores all transactions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a good catch. However, I think we can leave this change for a future change.

}

func BuildTaskGenerator(logger logging.Logger, logic TaskGeneratorLogic, secondsInterval int) (*TaskGenerator, error) {
return &TaskGenerator{
logger,
logic,
secondsInterval,
type TaskGenerator[Input any] struct {
taskManager TaskManager[Input]
txMgr txmgr.TxManager
config Config
}

func NewTaskGenerator[Input any](taskManager TaskManager[Input], txMgr txmgr.TxManager, config Config) (*TaskGenerator[Input], error) {
// TODO: validate config
return &TaskGenerator[Input]{
taskManager,
txMgr,
config,
}, nil
}

func (taskGen *TaskGenerator) Start(ctx context.Context) error {
time.Sleep(time.Duration(2 * time.Second))
func (taskGen *TaskGenerator[Input]) Start(ctx context.Context, inputGen iter.Seq[Input]) error {
logger := taskGen.config.Logger

taskGen.logger.Info("Starting Task Generator.")
logger.Info("Starting Task Generator.")

ticker := time.NewTicker(time.Duration(taskGen.secondsInterval) * time.Second)
ticker := time.NewTicker(taskGen.config.TimeBetweenTasks)
defer ticker.Stop()
taskGen.logger.Infof("Task Generator set to send new task every %v seconds...", taskGen.secondsInterval)
logger.Infof("Task Generator set to send new task every %v seconds...", taskGen.config.TimeBetweenTasks)

taskNumber := int64(0)
taskIndex := int64(0)

// Send a task before looping
err := taskGen.logic.SendNewTask(taskNumber)
if err != nil {
taskGen.logger.Error("Task Generator failed to send new task", "err", err)
return err
}
taskNumber++
nextInput, stop := iter.Pull(inputGen)
defer stop()

for {
// Submit new task
taskIndex++
logger.Infof("Task Generator sending new task, task index: %v", taskIndex)
value, ok := nextInput()
if !ok {
logger.Info("Task Generator finished sending tasks")
return nil
}
err := taskGen.CreateNewTask(ctx, value)
if err != nil {
logger.Error("Task Generator failed to send new task", "err", err)
return err
}

select {
case <-ctx.Done():
return nil
case <-ticker.C:
taskGen.logger.Infof("Task Generator sending new task, task number: %v", taskNumber)
err := taskGen.logic.SendNewTask(taskNumber)
if err != nil {
taskGen.logger.Error("Task Generator failed to send new task", "err", err)
return err
}
taskNumber++
continue
}
}
}

type TaskGeneratorLogic interface {
SendNewTask(taskNumber int64) error
func (taskGen *TaskGenerator[Input]) CreateNewTask(
ctx context.Context,
input Input,
) error {
txOpts, err := taskGen.txMgr.GetNoSendTxOpts()
if err != nil {
return utils.WrapError("Error getting tx opts", err)
}

tx, err := taskGen.taskManager.CreateNewTask(
txOpts,
input,
taskGen.config.QuorumThresholdPercentage,
taskGen.config.QuorumNumbers,
)
if err != nil {
return utils.WrapError("Error assembling CreateNewTask tx", err)
}
receipt, err := taskGen.txMgr.Send(ctx, tx, true)
if err != nil {
return utils.WrapError("Error submitting CreateNewTask tx", err)
}
if receipt.Status != gethtypes.ReceiptStatusSuccessful {
return utils.WrapError("CreateNewTask tx failed", nil)
}
return nil
}
Loading