Skip to content

Commit

Permalink
Merge pull request #21 from sunya-ch/v1.0.1
Browse files Browse the repository at this point in the history
init V1.0.1
  • Loading branch information
sunya-ch authored Jul 31, 2023
2 parents a8cf985 + da1625f commit 598320b
Show file tree
Hide file tree
Showing 15 changed files with 282 additions and 17 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/controller-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
push:
branches:
- main
- v1.0.0
- v1.0.1
paths:
- .github/workflows/**
- controllers/**
Expand All @@ -17,7 +17,7 @@ on:
- ./Makefile

env:
IMAGE_VERSION: '1.0.0'
IMAGE_VERSION: '1.0.1'

jobs:
build-push-controller:
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/parser-image.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,13 @@ on:
push:
branches:
- main
- v1.0.0
- v1.0.1
paths:
- .github/workflows/**
- cpe-parser/**

env:
IMAGE_VERSION: '1.0.0'
IMAGE_VERSION: '1.0.1'

jobs:
build-push-parser:
Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
# To re-generate a bundle for another specific version without changing the standard setup, you can:
# - use the VERSION as arg of the bundle target (e.g make bundle VERSION=0.0.2)
# - use environment variables to overwrite this value (e.g export VERSION=0.0.2)
export VERSION ?= 1.0.0
export VERSION ?= 1.0.1

# CHANNELS define the bundle channels used in the bundle.
# Add a new line here if you would like to change its default config. (E.g CHANNELS = "preview,fast,stable")
Expand Down
2 changes: 1 addition & 1 deletion config/parser/kustomization.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,6 @@ resources:
images:
- name: parser
newName: ghcr.io/ibm/cpe-operator/parser
newTag: v1.0.0
newTag: v1.0.1
apiVersion: kustomize.config.k8s.io/v1beta1
kind: Kustomization
4 changes: 2 additions & 2 deletions config/samples/cpe-operator/default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -810,7 +810,7 @@ spec:
fieldPath: metadata.namespace
- name: PARSER_SERVICE
value: http://cpe-operator-cpe-parser.$(OPERATOR_NAMESPACE)
image: ghcr.io/ibm/cpe-operator/controller:v1.0.0
image: ghcr.io/ibm/cpe-operator/controller:v1.0.1
imagePullPolicy: Always
livenessProbe:
httpGet:
Expand Down Expand Up @@ -857,7 +857,7 @@ spec:
app: cpe-parser
spec:
containers:
- image: ghcr.io/ibm/cpe-operator/parser:v1.0.0
- image: ghcr.io/ibm/cpe-operator/parser:v1.0.1
imagePullPolicy: IfNotPresent
name: parser
ports:
Expand Down
4 changes: 2 additions & 2 deletions config/samples/cpe-operator/recommended.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -882,7 +882,7 @@ spec:
fieldPath: metadata.namespace
- name: PARSER_SERVICE
value: http://cpe-operator-cpe-parser.$(OPERATOR_NAMESPACE)
image: ghcr.io/ibm/cpe-operator/controller:v1.0.0
image: ghcr.io/ibm/cpe-operator/controller:v1.0.1
imagePullPolicy: Always
livenessProbe:
httpGet:
Expand Down Expand Up @@ -929,7 +929,7 @@ spec:
app: cpe-parser
spec:
containers:
- image: ghcr.io/ibm/cpe-operator/parser:v1.0.0
- image: ghcr.io/ibm/cpe-operator/parser:v1.0.1
imagePullPolicy: IfNotPresent
name: parser
ports:
Expand Down
1 change: 1 addition & 0 deletions controllers/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
JOBHASH_KEY = "cpe-jobhash"
HASH_DELIMIT = "-cpeh-"
INVALID_REGEX = "[^A-Za-z0-9]"
JOB_LOG_PATH = "/cpe-local-log"
)

func GetInformerFromGVK(dc *discovery.DiscoveryClient, dyn dynamic.Interface, gvk schema.GroupVersionKind) (cache.SharedIndexInformer, dynamicinformer.DynamicSharedInformerFactory) {
Expand Down
18 changes: 18 additions & 0 deletions controllers/job_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"io"
"io/ioutil"
"os"
"strconv"
"time"

Expand Down Expand Up @@ -317,6 +318,7 @@ func (r *JobTracker) ProcessJobQueue() {
r.Log.Info(fmt.Sprintf("PutLog Error #%v, parse raw log", err))
response, err = parseRawLog(parserKey, logBytes)
} else {
r.writeLogToFile(benchmarkName, CLUSTER_ID, jobName, podName, logBytes)
r.Log.Info("Parse remote put log")
response, err = parseAndPushLog(instance, benchmarkName, jobName, podName, parserKey, constLabels)
}
Expand Down Expand Up @@ -515,6 +517,22 @@ func (r *JobTracker) Init() {

}

func (r *JobTracker) writeLogToFile(benchmarkName, CLUSTER_ID, jobName, podName string, data []byte) (err error) {
if _, err = os.Stat(JOB_LOG_PATH); err == nil {
filePath := fmt.Sprintf("%s/%s_%s_%s_%s.log", JOB_LOG_PATH, benchmarkName, CLUSTER_ID, jobName, podName)
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return err
}
defer file.Close()
_, err = file.Write(data)
if err == nil {
fmt.Printf("Successfully save %s.\n", filePath)
}
}
return err
}

func (r *JobTracker) indexOf(benchmarkName string) int {
index := -1
for index = 0; index < len(r.Subscribers); index++ {
Expand Down
2 changes: 2 additions & 0 deletions cpe-parser/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var timeParser parser.Parser = parser.NewTimeParser()
var stressParser parser.Parser = parser.NewStressParser()
var mlperfParser parser.Parser = parser.NewMlPerfParser()
var fmworkParser parser.Parser = parser.NewFMWorkParser()
var fmtrainParser parser.Parser = parser.NewFMTrainParser()

var parserMap map[string]parser.Parser = map[string]parser.Parser{
"codait": codaitParser,
Expand All @@ -51,6 +52,7 @@ var parserMap map[string]parser.Parser = map[string]parser.Parser{
"stress": stressParser,
"mlperf": mlperfParser,
"fmwork": fmworkParser,
"fmtrain": fmtrainParser,
}

/////////////////////////////////////////////
Expand Down
21 changes: 20 additions & 1 deletion cpe-parser/parser/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@

package parser

import (
"fmt"
"strconv"
"strings"
)

const (
BIGLINE = "===================================="
BIGLINE = "===================================="
defaultDelimit = "="
)

func splitValue(linestr, delimit string) (key string, value float64, err error) {
splits := strings.Split(linestr, delimit)
if len(splits) != 2 {
err = fmt.Errorf("cannot split value %s with %s", linestr, delimit)
return
}
key = strings.TrimSpace(splits[0])
valueStr := strings.TrimSpace(splits[1])
value, err = strconv.ParseFloat(valueStr, 64)
return
}
171 changes: 171 additions & 0 deletions cpe-parser/parser/fmtrain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
/*
* Copyright 2022- IBM Inc. All rights reserved
* SPDX-License-Identifier: Apache2.0
*/

package parser

import (
"bufio"
"bytes"
"fmt"
"io"
"strings"
"time"
)

const (
fmtrainCheckpointStartKey = "Started to load checkpoint"
fmtrainCheckpointEndKey = "Ended to load checkpoint"
fmtrainDataPrepareStartKey = "Started to prepare data"
fmtrainDataPrepareEndKey = "Datasets constructed"
fmtrainTrainStartKey = "Beginning training"
fmtrainTrainEndKey = "Writing"
fmtrainEndKey = "Job Complete!"
)

type FMTrainParser struct {
*BaseParser
}

/*
2023-07-27 16:31:18 Started to load checkpoint
2023-07-27 16:31:18 Ended to load checkpoint
2023-07-27 16:31:18 Started to prepare data
2023-07-27 16:31:19 Datasets constructed!
2023-27-2023 16:31:19 Beginning training! If using a large dataset w/o aggressive caching, may take ~1 min per 20GB before starting.
step = 10
trainloss = 10.766671752929687
speed = 1.0044547319412231
step = 20
trainloss = 9.861251831054688
speed = 0.6618351697921753
...
step = 1000
trainloss = 6.763322448730468
speed = 0.6619813680648804
2023-07-27 16:42:26 Writing final checkpoint
step = 1000
2023-07-27 16:42:28 Training ended.
Job Complete!
total_hours = 0.1876769094996982
*/

func NewFMTrainParser() *FMTrainParser {
fmtrainParser := &FMTrainParser{}
abs := &BaseParser{
Parser: fmtrainParser,
}
fmtrainParser.BaseParser = abs
return fmtrainParser
}

func (p *FMTrainParser) getTimestamp(linestr string) string {
splits := strings.Fields(linestr)
if len(splits) < 2 {
return ""
}
dateString := fmt.Sprintf("%s %s", splits[0], splits[1])
return dateString
}

func (p *FMTrainParser) ParseValue(body []byte) (map[string]interface{}, error) {
values := make(map[string]interface{})
valuesWithLabels := make(map[string][]ValueWithLabels)
bytesReader := bytes.NewReader(body)
bufReader := bufio.NewReader(bytesReader)
var trainStart bool

stepLabel := make(map[string]string)
for {
line, _, err := bufReader.ReadLine()
linestr := string(line)
if err == io.EOF {
break
} else if err != nil {
return nil, err
}
if strings.Contains(linestr, fmtrainCheckpointStartKey) {
// set checkpointStart
stepLabel["checkpointStart"] = p.getTimestamp(linestr)
} else if strings.Contains(linestr, fmtrainCheckpointEndKey) {
// set checkpointEnd
stepLabel["checkpointEnd"] = p.getTimestamp(linestr)
} else if strings.Contains(linestr, fmtrainDataPrepareStartKey) {
// set dataPrepareStart
stepLabel["dataPrepareStart"] = p.getTimestamp(linestr)
} else if strings.Contains(linestr, fmtrainDataPrepareEndKey) {
// set dataPrepareEnd
stepLabel["dataPrepareEnd"] = p.getTimestamp(linestr)
} else if strings.Contains(linestr, fmtrainTrainStartKey) {
// set trainStart
stepLabel["trainStart"] = p.getTimestamp(linestr)
trainStart = true
} else if strings.Contains(linestr, fmtrainEndKey) {
// job completed
// read next line
line, _, err := bufReader.ReadLine()
linestr := string(line)
if err == io.EOF {
break
} else if err == nil {
// expect total_hours key
key, value, err := splitValue(linestr, defaultDelimit)
if err == nil {
values[key] = value
}
}
} else if strings.Contains(linestr, fmtrainTrainEndKey) {
dateString := p.getTimestamp(linestr)
layout := "2006-01-02 15:04:05"
timestamp, err := time.Parse(layout, dateString)
if err == nil {
values["trainEnd"] = float64(timestamp.Unix())
}
trainStart = false
} else if trainStart {
key, value, err := splitValue(linestr, defaultDelimit)
if err == nil {
// trainStepValues
if strings.Contains(linestr, "step") {
stepLabel[key] = fmt.Sprintf("%f", value)
} else {
copyLabels := make(map[string]string)
for labelKey, labelValue := range stepLabel {
copyLabels[labelKey] = labelValue
}
newValue := ValueWithLabels{
Labels: copyLabels,
Value: value,
}
if valueWithLabelsArr, ok := valuesWithLabels[key]; ok {
valuesWithLabels[key] = append(valueWithLabelsArr, newValue)
} else {
valuesWithLabels[key] = []ValueWithLabels{newValue}
}
}
}
}
}
for key, valueWithLabelsArr := range valuesWithLabels {
values[key] = valueWithLabelsArr
}
return values, nil
}

func (p *FMTrainParser) GetPerformanceValue(values map[string]interface{}) (string, float64) {
performanceKey := "trainloss"
if valuesWithLabelsInterface, ok := values[performanceKey]; ok {
valuesWithLabels := valuesWithLabelsInterface.([]ValueWithLabels)
if len(valuesWithLabels) == 0 {
return "NoValue", -1
}
lastValue := valuesWithLabels[len(valuesWithLabels)-1]
return performanceKey, lastValue.Value
}
return "NoKey", -1
}
Loading

0 comments on commit 598320b

Please sign in to comment.