Skip to content

Commit

Permalink
Merge pull request #1802 from lavanet/add-prometheus-metrics-for-refa…
Browse files Browse the repository at this point in the history
…ctor-optimizer

refactor: Optimizer Refactor Part 2: Add tier to optimizer metrics
  • Loading branch information
ranlavanet authored Jan 30, 2025
2 parents c69025a + 3110fff commit 4e0430c
Show file tree
Hide file tree
Showing 9 changed files with 332 additions and 83 deletions.
6 changes: 6 additions & 0 deletions protocol/metrics/consumer_optimizer_qos_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ type OptimizerQoSReport struct {
LatencyScore float64
GenericScore float64
EntryIndex int
Tier int
TierChances string
}

type OptimizerQoSReportToSend struct {
Expand All @@ -58,6 +60,8 @@ type OptimizerQoSReportToSend struct {
Epoch uint64 `json:"epoch"`
ProviderStake int64 `json:"provider_stake"`
EntryIndex int `json:"entry_index"`
Tier int `json:"tier"`
TierChances string `json:"tier_chances"`
GeoLocation uint64 `json:"geo_location"`
}

Expand Down Expand Up @@ -148,6 +152,8 @@ func (coqc *ConsumerOptimizerQoSClient) appendOptimizerQoSReport(report *Optimiz
Epoch: epoch,
NodeErrorRate: coqc.calculateNodeErrorRate(chainId, report.ProviderAddress),
ProviderStake: coqc.getProviderChainStake(chainId, report.ProviderAddress, epoch),
Tier: report.Tier,
TierChances: report.TierChances,
GeoLocation: coqc.geoLocation,
}

Expand Down
105 changes: 73 additions & 32 deletions protocol/provideroptimizer/provider_optimizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,46 @@ func (po *ProviderOptimizer) AppendProbeRelayData(providerAddress string, latenc
)
}

func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*metrics.OptimizerQoSReport {
selectionTier, _, providersScores := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock)
reports := []*metrics.OptimizerQoSReport{}

rawScores := selectionTier.GetRawScores()
tierChances := selectionTier.ShiftTierChance(po.OptimizerNumTiers, map[int]float64{0: ATierChance})
for idx, entry := range rawScores {
qosReport := providersScores[entry.Address]
qosReport.EntryIndex = idx
qosReport.TierChances = PrintTierChances(tierChances)
qosReport.Tier = po.GetProviderTier(entry.Address, selectionTier)
reports = append(reports, qosReport)
}

return reports
}

func PrintTierChances(tierChances map[int]float64) string {
var tierChancesString string
for tier, chance := range tierChances {
tierChancesString += fmt.Sprintf("%d: %f, ", tier, chance)
}
return tierChancesString
}

func (po *ProviderOptimizer) GetProviderTier(providerAddress string, selectionTier SelectionTier) int {
selectionTierScoresCount := selectionTier.ScoresCount()
numTiersWanted := po.GetNumTiersWanted(selectionTier, selectionTierScoresCount)
minTierEntries := po.GetMinTierEntries(selectionTier, selectionTierScoresCount)
for tier := 0; tier < numTiersWanted; tier++ {
tierProviders := selectionTier.GetTier(tier, numTiersWanted, minTierEntries)
for _, provider := range tierProviders {
if provider.Address == providerAddress {
return tier
}
}
}
return -1
}

func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (SelectionTier, Exploration, map[string]*metrics.OptimizerQoSReport) {
explorationCandidate := Exploration{address: "", time: time.Now().Add(time.Hour)}
selectionTier := NewSelectionTier()
Expand Down Expand Up @@ -317,31 +357,15 @@ func (po *ProviderOptimizer) CalculateSelectionTiers(allAddresses []string, igno
func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) (addresses []string, tier int) {
selectionTier, explorationCandidate, _ := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock)
selectionTierScoresCount := selectionTier.ScoresCount()

localMinimumEntries := po.OptimizerMinTierEntries
if AutoAdjustTiers {
adjustedProvidersPerTier := int(stdMath.Ceil(float64(selectionTierScoresCount) / float64(po.OptimizerNumTiers)))
if localMinimumEntries > adjustedProvidersPerTier {
utils.LavaFormatTrace("optimizer AutoAdjustTiers activated",
utils.LogAttr("set_to_adjustedProvidersPerTier", adjustedProvidersPerTier),
utils.LogAttr("was_MinimumEntries", po.OptimizerMinTierEntries),
utils.LogAttr("tiers_count_po.OptimizerNumTiers", po.OptimizerNumTiers),
utils.LogAttr("selectionTierScoresCount", selectionTierScoresCount))
localMinimumEntries = adjustedProvidersPerTier
}
}
localMinimumEntries := po.GetMinTierEntries(selectionTier, selectionTierScoresCount)

if selectionTierScoresCount == 0 {
// no providers to choose from
return []string{}, -1
}
initialChances := map[int]float64{0: ATierChance}

// check if we have enough providers to create the tiers, if not set the number of tiers to the number of providers we currently have
numberOfTiersWanted := po.OptimizerNumTiers
if selectionTierScoresCount < po.OptimizerNumTiers {
numberOfTiersWanted = selectionTierScoresCount
}
numberOfTiersWanted := po.GetNumTiersWanted(selectionTier, selectionTierScoresCount)
if selectionTierScoresCount >= localMinimumEntries*2 {
// if we have more than 2*localMinimumEntries we set the LastTierChance configured
initialChances[(numberOfTiersWanted - 1)] = LastTierChance
Expand All @@ -365,6 +389,37 @@ func (po *ProviderOptimizer) ChooseProvider(allAddresses []string, ignoredProvid
return returnedProviders, tier
}

// GetMinTierEntries gets minimum number of entries in a tier to be considered for selection
// if AutoAdjustTiers global is true, the number of providers per tier is divided equally
// between them
func (po *ProviderOptimizer) GetMinTierEntries(selectionTier SelectionTier, selectionTierScoresCount int) int {
localMinimumEntries := po.OptimizerMinTierEntries
if AutoAdjustTiers {
adjustedProvidersPerTier := int(stdMath.Ceil(float64(selectionTierScoresCount) / float64(po.OptimizerNumTiers)))
if localMinimumEntries > adjustedProvidersPerTier {
utils.LavaFormatTrace("optimizer AutoAdjustTiers activated",
utils.LogAttr("set_to_adjustedProvidersPerTier", adjustedProvidersPerTier),
utils.LogAttr("was_MinimumEntries", po.OptimizerMinTierEntries),
utils.LogAttr("tiers_count_po.OptimizerNumTiers", po.OptimizerNumTiers),
utils.LogAttr("selectionTierScoresCount", selectionTierScoresCount))
localMinimumEntries = adjustedProvidersPerTier
}
}
return localMinimumEntries
}

// GetNumTiersWanted returns the number of tiers wanted
// if we have enough providers to create the tiers return the configured number of tiers wanted
// if not, set the number of tiers to the number of providers we currently have
func (po *ProviderOptimizer) GetNumTiersWanted(selectionTier SelectionTier, selectionTierScoresCount int) int {
numberOfTiersWanted := po.OptimizerNumTiers
if selectionTierScoresCount < po.OptimizerNumTiers {
numberOfTiersWanted = selectionTierScoresCount
}

return numberOfTiersWanted
}

// CalculateProbabilityOfBlockError calculates the probability that a provider doesn't a specific requested
// block when the consumer asks the optimizer to fetch a provider with the specific block
func (po *ProviderOptimizer) CalculateProbabilityOfBlockError(requestedBlock int64, providerData ProviderData) sdk.Dec {
Expand Down Expand Up @@ -661,17 +716,3 @@ func (po *ProviderOptimizer) GetReputationReportForProvider(providerAddress stri

return report, providerData.Latency.GetLastUpdateTime()
}

func (po *ProviderOptimizer) CalculateQoSScoresForMetrics(allAddresses []string, ignoredProviders map[string]struct{}, cu uint64, requestedBlock int64) []*metrics.OptimizerQoSReport {
selectionTier, _, providersScores := po.CalculateSelectionTiers(allAddresses, ignoredProviders, cu, requestedBlock)
reports := []*metrics.OptimizerQoSReport{}

rawScores := selectionTier.GetRawScores()
for idx, entry := range rawScores {
qosReport := providersScores[entry.Address]
qosReport.EntryIndex = idx
reports = append(reports, qosReport)
}

return reports
}
97 changes: 67 additions & 30 deletions protocol/provideroptimizer/provider_optimizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -950,13 +950,13 @@ func TestProviderOptimizerRetriesWithReducedProvidersSet(t *testing.T) {
}
}

// TestProviderOptimizerChoiceSimulation checks that the overall choice mechanism acts as expected,
// TestProviderOptimizerChoiceSimulationBasedOnLatency checks that the overall choice mechanism acts as expected,
// For each of the following metrics: latency, sync, availability and stake we do the following:
// 0. Assume 2 providers
// 0. Assume 3 providers
// 1. Append relay data for both providers with random samples. The "better" provider will have a randomized
// sample with a better range (for example, the better one gets latency of 10-30ms and the bad one gets 25-40ms)
// 2. Choose between them and verify the better one is chosen more.
func TestProviderOptimizerChoiceSimulation(t *testing.T) {
func TestProviderOptimizerChoiceSimulationBasedOnLatency(t *testing.T) {
rand.InitRandomSeed()
providerOptimizer := setupProviderOptimizer(1)
providersCount := 3
Expand All @@ -983,47 +983,84 @@ func TestProviderOptimizerChoiceSimulation(t *testing.T) {
// address, after it the 2nd and so on
for i := 0; i < 1000; i++ {
// randomize latency, provider 0 gets a better latency than provider 1
p1Latency += float64(rand.Int63n(21)+10) * float64(time.Millisecond) // Random number between 10-30
p2Latency += float64(rand.Int63n(11)+30) * float64(time.Millisecond) // Random number between 30-40
p3Latency += float64(rand.Int63n(12)+30) * float64(time.Millisecond) // Random number between 30-40
p1Latency += 10 * float64(time.Millisecond)
p2Latency += 20 * float64(time.Millisecond)
p3Latency += 30 * float64(time.Millisecond)

// randomize sync, provider 0 gets a better sync than provider 1
if rand.Float64() < 0.1 { // 10% chance to increment both
p1SyncBlock++
p2SyncBlock++
p3SyncBlock++
}
if rand.Float64() < 0.05 { // 5% chance to increment only p1
p1SyncBlock++
}

// randomize availability, provider 0 gets a better availability than provider 1
if rand.Float64() < 0.1 { // 10% chance to false for p2
p2Availability = false
}
if rand.Float64() < 0.05 { // 5% chance to false for p3
p3Availability = false
}
if rand.Float64() < 0.05 { // 5% chance to false for both
p1Availability = false
p2Availability = false
p3Availability = false
}
p1SyncBlock++
p2SyncBlock++
p3SyncBlock++

time.Sleep(1 * time.Millisecond)
providerOptimizer.appendRelayData(providersGen.providersAddresses[0], time.Duration(p1Latency), p1Availability, cu, p1SyncBlock, time.Now())
providerOptimizer.appendRelayData(providersGen.providersAddresses[1], time.Duration(p2Latency), p2Availability, cu, p2SyncBlock, time.Now())
providerOptimizer.appendRelayData(providersGen.providersAddresses[2], time.Duration(p3Latency), p3Availability, cu, p3SyncBlock, time.Now())
time.Sleep(10 * time.Millisecond)
}

// choose many times and check the better provider is chosen more often (provider 0)
iterations := 1000
res, _ := runChooseManyTimesAndReturnResults(t, providerOptimizer, providersGen.providersAddresses, nil, iterations, cu, requestBlock)
utils.LavaFormatInfo("res", utils.LogAttr("res", res))
res, tierResults := runChooseManyTimesAndReturnResults(t, providerOptimizer, providersGen.providersAddresses, nil, iterations, cu, requestBlock)
utils.LavaFormatInfo("res", utils.LogAttr("res", res), utils.LogAttr("tierResults", tierResults))
require.Greater(t, res[providersGen.providersAddresses[0]], res[providersGen.providersAddresses[1]])
require.Greater(t, res[providersGen.providersAddresses[0]], res[providersGen.providersAddresses[2]])
require.Greater(t, res[providersGen.providersAddresses[1]], res[providersGen.providersAddresses[2]])
}

// func TestProviderOptimizerChoiceSimulationBasedOnSync(t *testing.T) {
// rand.InitRandomSeed()
// providerOptimizer := setupProviderOptimizer(1)
// providersCount := 3
// providersGen := (&providersGenerator{}).setupProvidersForTest(providersCount)
// cu := uint64(10)
// requestBlock := int64(1000)
// syncBlock := uint64(1000)
// baseLatency := TEST_BASE_WORLD_LATENCY.Seconds()
// providerOptimizer.OptimizerNumTiers = 4
// providerOptimizer.OptimizerMinTierEntries = 1

// // initial values
// p1Latency := baseLatency * float64(time.Millisecond)
// p2Latency := baseLatency * float64(time.Millisecond)
// p3Latency := baseLatency * float64(time.Millisecond)
// p1SyncBlock := syncBlock
// p2SyncBlock := syncBlock
// p3SyncBlock := syncBlock
// p1Availability := true
// p2Availability := true
// p3Availability := true
// // append relay data for each provider depending on its index in the providers array
// // the latency gets worse for increasing index so we assume the best provider is the 1st
// // address, after it the 2nd and so on
// for i := 0; i < 1000; i++ {
// // randomize latency, provider 0 gets a better latency than provider 1
// p1Latency += 10 * float64(time.Millisecond)
// p2Latency += 10 * float64(time.Millisecond)
// p3Latency += 10 * float64(time.Millisecond)

// // randomize sync, provider 0 gets a better sync than provider 1
// p1SyncBlock++
// if i%100 == 1 {
// utils.LavaFormatInfo("p1SyncBlock", utils.LogAttr("p1SyncBlock", p1SyncBlock))
// p1SyncBlock++
// }
// p2SyncBlock++
// p3SyncBlock++

// time.Sleep(1 * time.Millisecond)
// providerOptimizer.appendRelayData(providersGen.providersAddresses[0], time.Duration(p1Latency), p1Availability, cu, p1SyncBlock, time.Now())
// providerOptimizer.appendRelayData(providersGen.providersAddresses[1], time.Duration(p2Latency), p2Availability, cu, p2SyncBlock, time.Now())
// providerOptimizer.appendRelayData(providersGen.providersAddresses[2], time.Duration(p3Latency), p3Availability, cu, p3SyncBlock, time.Now())
// }
// // choose many times and check the better provider is chosen more often (provider 0)
// iterations := 1000
// res, tierResults := runChooseManyTimesAndReturnResults(t, providerOptimizer, providersGen.providersAddresses, nil, iterations, cu, requestBlock)
// utils.LavaFormatInfo("res", utils.LogAttr("res", res), utils.LogAttr("tierResults", tierResults))
// require.Greater(t, res[providersGen.providersAddresses[0]], res[providersGen.providersAddresses[1]])
// require.Greater(t, res[providersGen.providersAddresses[0]], res[providersGen.providersAddresses[2]])
// require.Greater(t, res[providersGen.providersAddresses[1]], res[providersGen.providersAddresses[2]])
// }

// TestProviderOptimizerLatencySyncScore tests that a provider with 100ms latency and x sync block
// has the same score as a provider with 1100ms latency but x+1 sync block
// This is true since the average block time is 10sec and the default sync factor is 0.3. So
Expand Down
6 changes: 3 additions & 3 deletions protocol/rpcprovider/rpcprovider_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,12 +581,12 @@ func (rpcps *RPCProviderServer) verifyRelaySession(ctx context.Context, request
}
utils.LavaFormatInfo(errorMessage,
utils.Attribute{Key: "Info Type", Value: lavasession.EpochMismatchError},
utils.Attribute{Key: "current lava block", Value: latestBlock},
utils.Attribute{Key: "requested lava block", Value: request.RelaySession.Epoch},
utils.Attribute{Key: "provider lava block", Value: latestBlock},
utils.Attribute{Key: "consumer lava block", Value: request.RelaySession.Epoch},
utils.Attribute{Key: "threshold", Value: rpcps.providerSessionManager.GetBlockedEpochHeight()},
utils.Attribute{Key: "GUID", Value: ctx},
)
return nil, nil, lavasession.EpochMismatchError
return nil, nil, lavasession.EpochMismatchError.Wrapf("provider lava block %d, consumer lava block %d, threshold: %d", latestBlock, request.RelaySession.Epoch, rpcps.providerSessionManager.GetBlockedEpochHeight())
}

// Check data
Expand Down
52 changes: 35 additions & 17 deletions scripts/test/httpServer.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,38 @@
import json
import csv
from http.server import BaseHTTPRequestHandler, HTTPServer
import sys

payload_ret = "OK"


class RequestHandler(BaseHTTPRequestHandler):
def __init__(self, *args, **kwargs):
self.csv_file_name = kwargs.pop('csv_file_name', 'data.csv')
super().__init__(*args, **kwargs)

def do_GET(self):
self.print_request()

def do_POST(self):
self.print_request()
content_length = int(self.headers.get("Content-Length", 0))
if content_length > 0:
body = self.rfile.read(content_length)
data = json.loads(body.decode('utf-8'))
self.write_to_csv(data)
self.send_response(200)
self.send_header("Content-type", "text/html")
self.end_headers()
self.wfile.write(b"OK")

def write_to_csv(self, data):
with open(self.csv_file_name, 'a', newline='') as csvfile:
fieldnames = data[0].keys()
writer = csv.DictWriter(csvfile, fieldnames=fieldnames)
if csvfile.tell() == 0: # Check if file is empty to write header
writer.writeheader()
for entry in data:
writer.writerow(entry)

def print_request(self):
# Print request line
Expand All @@ -27,24 +51,18 @@ def print_request(self):
body = self.rfile.read(content_length)
print(f"Body:\n{body.decode('utf-8')}")

# Send a response back to the client
response = payload_ret.encode('utf-8')
self.send_response(200)
self.send_header("Content-type", "application/json")
self.end_headers()
self.wfile.write(response)

def run_server(port=8000):
def run_server(port=8000, csv_file_name='data.csv'):
server_address = ('', port)
httpd = HTTPServer(server_address, RequestHandler)
print(f"Server running on port {port}")

def handler(*args, **kwargs):
return RequestHandler(*args, csv_file_name=csv_file_name, **kwargs)
httpd = HTTPServer(server_address, handler)
print(f"Server running on port {port}, writing to {csv_file_name}")
httpd.serve_forever()


if __name__ == '__main__':
if len(sys.argv) > 1:
port = int(sys.argv[1])
if len(sys.argv) > 2:
payload_ret = sys.argv[2]
run_server(port)
else:
run_server()
port = int(sys.argv[1]) if len(sys.argv) > 1 else 8000
csv_file_name = sys.argv[2] if len(sys.argv) > 2 else 'data.csv'
run_server(port, csv_file_name)
Loading

0 comments on commit 4e0430c

Please sign in to comment.