Skip to content

set default params for quick test and add scenario5 #130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 19, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions cmd/tsbs_load_influx3/creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@ func (d *dbCreator) DBExists(dbName string) bool {

func (d *dbCreator) listDatabases() ([]string, error) {
u := fmt.Sprintf("%s/api/v3/configure/database?show_deleted=true&format=csv", d.daemonURL)
resp, err := http.Get(u)
req, err := http.NewRequest("GET", u, nil)
req.Header = http.Header{
headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)},
}
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("listDatabases error: %s", err.Error())
}
Expand Down Expand Up @@ -72,10 +77,11 @@ func (d *dbCreator) listDatabases() ([]string, error) {
func (d *dbCreator) RemoveOldDB(dbName string) error {
u := fmt.Sprintf("%s/api/v3/configure/database?db=%s", d.daemonURL, dbName)
req, err := http.NewRequest("DELETE", u, nil)
if err != nil {
return fmt.Errorf("drop db error: %s", err.Error())
req.Header = http.Header{
"Content-Type": []string{"text/plain"},
headerAuthorization: []string{fmt.Sprintf("Token %s", authToken)},
}
client := &http.Client{}
client := http.Client{}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("drop db error: %s", err.Error())
Expand All @@ -100,6 +106,7 @@ func (d *dbCreator) CreateDB(dbName string) error {

// Set the content type to application/json
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Authorization", "Bearer "+authToken)

client := &http.Client{}
resp, err := client.Do(req)
Expand Down
6 changes: 4 additions & 2 deletions cmd/tsbs_load_influx3/http_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ const (
httpClientName = "tsbs_load_influx"
headerContentEncoding = "Content-Encoding"
headerGzip = "gzip"
headerAuthorization = "Authorization"
)

var (
Expand Down Expand Up @@ -65,13 +66,14 @@ var (
textPlain = []byte("text/plain")
)

func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) {
func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool, authToken string) {
req.Header.SetContentTypeBytes(textPlain)
req.Header.SetMethodBytes(methodPost)
req.Header.SetRequestURIBytes(w.url)
if isGzip {
req.Header.Add(headerContentEncoding, headerGzip)
}
req.Header.Add(headerAuthorization, fmt.Sprintf("Token %s", authToken))
req.SetBody(body)
}

Expand Down Expand Up @@ -101,7 +103,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response)
func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) {
req := fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, body, isGzip)
w.initializeReq(req, body, isGzip, authToken)

resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
Expand Down
10 changes: 5 additions & 5 deletions cmd/tsbs_load_influx3/http_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) {
defer fasthttp.ReleaseRequest(req)
w := NewHTTPWriter(testConf, testConsistency)
body := "this is a test body"
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")

if got := string(req.Body()); got != body {
t.Errorf("non-gzip: body not correct: got '%s' want '%s'", got, body)
Expand All @@ -129,7 +129,7 @@ func TestHTTPWriterInitializeReq(t *testing.T) {
t.Errorf("non-gzip: Content-Encoding is not empty: got %s", got)
}

w.initializeReq(req, []byte(body), true)
w.initializeReq(req, []byte(body), true, "")
if got := string(req.Header.Peek(headerContentEncoding)); got != headerGzip {
t.Errorf("gzip: Content-Encoding is not correct: got %s want %s", got, headerGzip)
}
Expand All @@ -144,7 +144,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w := NewHTTPWriter(testConf, testConsistency)
body := "this is a test body"
normalURL := w.url // save for later modification
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
resp := fasthttp.AcquireResponse()
defer fasthttp.ReleaseResponse(resp)
lat, err := w.executeReq(req, resp)
Expand All @@ -161,7 +161,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldBackoffParam))
req = fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
lat, err = w.executeReq(req, resp)
if err != errBackoff {
t.Errorf("unexpected error response received (not backoff error): %v", err)
Expand All @@ -176,7 +176,7 @@ func TestHTTPWriterExecuteReq(t *testing.T) {
w.url = []byte(fmt.Sprintf("%s&%s=true", string(normalURL), shouldInvalidParam))
req = fasthttp.AcquireRequest()
defer fasthttp.ReleaseRequest(req)
w.initializeReq(req, []byte(body), false)
w.initializeReq(req, []byte(body), false, "")
lat, err = w.executeReq(req, resp)
if err == nil {
t.Errorf("unexpected non-error response received")
Expand Down
5 changes: 5 additions & 0 deletions cmd/tsbs_load_influx3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ var (
useGzip bool
doAbortOnExist bool
consistency string
authToken string
)

// Global vars
Expand Down Expand Up @@ -73,13 +74,17 @@ func init() {
csvDaemonURLs = viper.GetString("urls")
replicationFactor = viper.GetInt("replication-factor")
consistency = viper.GetString("consistency")
authToken = viper.GetString("auth-token")
backoff = viper.GetDuration("backoff")
useGzip = viper.GetBool("gzip")

if _, ok := consistencyChoices[consistency]; !ok {
log.Fatalf("invalid consistency settings")
}

if authToken == "" {
log.Fatalf("invalid auth token settings")
}
daemonURLs = strings.Split(csvDaemonURLs, ",")
if len(daemonURLs) == 0 {
log.Fatal("missing 'urls' flag")
Expand Down
7 changes: 6 additions & 1 deletion cmd/tsbs_run_queries_influx3/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
)

var bytesSlash = []byte("/") // heap optimization
var headerAuthorization = "Authorization"

// HTTPClient is a reusable HTTP Client.
type HTTPClient struct {
Expand All @@ -22,6 +23,7 @@ type HTTPClient struct {
Host []byte
HostString string
uri []byte
authToken string
}

// HTTPClientDoOptions wraps options uses when calling `Do`.
Expand All @@ -46,12 +48,14 @@ func getHttpClient() *http.Client {
}

// NewHTTPClient creates a new HTTPClient.
func NewHTTPClient(host string) *HTTPClient {
func NewHTTPClient(host string, authToken string) *HTTPClient {
token := fmt.Sprintf("Token %s", authToken)
return &HTTPClient{
client: getHttpClient(),
Host: []byte(host),
HostString: host,
uri: []byte{}, // heap optimization
authToken: token,
}
}

Expand All @@ -74,6 +78,7 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64,
if err != nil {
panic(err)
}
req.Header.Add(headerAuthorization, w.authToken)

// Perform the request while tracking latency:
start := time.Now()
Expand Down
8 changes: 7 additions & 1 deletion cmd/tsbs_run_queries_influx3/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
var (
daemonUrls []string
chunkSize uint64
authToken string
)

// Global vars:
Expand All @@ -35,6 +36,7 @@ func init() {

pflag.String("urls", "http://localhost:8086", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.")
pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.")
pflag.String("auth-token", "", "Use the Authorization header with the Token scheme to provide your token to InfluxDB3.")

pflag.Parse()

Expand All @@ -49,7 +51,11 @@ func init() {
}

csvDaemonUrls = viper.GetString("urls")
authToken = viper.GetString("auth-token")
chunkSize = viper.GetUint64("chunk-response-size")
if authToken == "" {
log.Fatalf("invalid auth token settings")
}

daemonUrls = strings.Split(csvDaemonUrls, ",")
if len(daemonUrls) == 0 {
Expand Down Expand Up @@ -78,7 +84,7 @@ func (p *processor) Init(workerNumber int) {
database: runner.DatabaseName(),
}
url := daemonUrls[workerNumber%len(daemonUrls)]
p.w = NewHTTPClient(url)
p.w = NewHTTPClient(url, authToken)
}

func (p *processor) ProcessQuery(q query.Query, _ bool) ([]*query.Stat, error) {
Expand Down
1 change: 1 addition & 0 deletions pkg/targets/influx3/implemented_target.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func (t *influxTarget) TargetSpecificFlags(flagPrefix string, flagSet *pflag.Fla
flagSet.String(flagPrefix+"consistency", "all", "Write consistency. Must be one of: any, one, quorum, all.")
flagSet.Duration(flagPrefix+"backoff", time.Second, "Time to sleep between requests when server indicates backpressure is needed.")
flagSet.Bool(flagPrefix+"gzip", true, "Whether to gzip encode requests (default true).")
flagSet.String(flagPrefix+"auth-token", "", "Authentication token for InfluxDB 3.0")
}

func (t *influxTarget) TargetName() string {
Expand Down
29 changes: 29 additions & 0 deletions scripts/tsdbComp/common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -360,4 +360,33 @@ function get_db_set() {
fi

echo "${!db_set[@]}" # Return the keys of the associative array
}

function get_influxdb3_token() {
local db_host=$1
local db_port=$2
local influxdb3_token
if [ -n "$INFLUXDB3_AUTH_TOKEN" ]; then
influxdb3_token=$INFLUXDB3_AUTH_TOKEN
echo "$influxdb3_token"
return 0
fi
# if influxdb3_auth_token in test.ini is set, use it
if [ -n "$influxdb3_auth_token" ]; then
influxdb3_token=$influxdb3_auth_token
echo "$influxdb3_token"
return 0
fi
# if influxdb3_auth_token is not set, get it from the server
response=$(curl -s -X POST "http://$db_host:$db_port/api/v3/configure/token/admin" \
-H "accept: application/json" \
-H "Content-Type: application/json")

influxdb3_token=$(echo "$response" | grep -o '"token":"[^"]*"' | sed 's/"token":"//;s/"//')
if [ -z "$influxdb3_token" ]; then
echo "Error: Failed to retrieve token. Response: $response"
return 1
fi

echo "$influxdb3_token"
}
39 changes: 25 additions & 14 deletions scripts/tsdbComp/full_cycle_minitest_loading.sh
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ if [ "${FORMAT}" == "timescaledb" ];then
DATABASE_PORT=${timescaledb_port:-5432}
PGPASSWORD=${DATABASE_PWD} psql -U postgres -h $DATABASE_HOST -d postgres -c "drop database IF EXISTS ${DATABASE_NAME} "
if [ -d "${TimePath}" ]; then
disk_usage_before=$(set_command "du -s ${TimePath} --exclude="pgsql_tmp" | cut -f 1 " )
disk_usage_before=$(set_command "du -sk ${TimePath} --exclude="pgsql_tmp" | cut -f 1 " )
else
disk_usage_before=0
fi
Expand Down Expand Up @@ -148,7 +148,7 @@ if [ "${FORMAT}" == "timescaledb" ];then
do
tempCompressNum=$(PGPASSWORD=password psql -U postgres -d ${DATABASE_NAME} -h ${DATABASE_HOST} -c "SELECT chunk_name, is_compressed FROM timescaledb_information.chunks WHERE is_compressed = true" |grep row |awk '{print $1}')

disk_usage_after=$(set_command "du -s ${TimePath} --exclude="pgsql_tmp"| cut -f 1 " )
disk_usage_after=$(set_command "du -sk ${TimePath} --exclude="pgsql_tmp"| cut -f 1 " )

tempCompressNum=`echo ${tempCompressNum} | sed 's/(//g' `
log_debug "Compression count: ${tempCompressNum}, disk usage after load: ${disk_usage_after}, expected compressed data Block count :${timesHours}"
Expand All @@ -162,9 +162,9 @@ if [ "${FORMAT}" == "timescaledb" ];then
speeds_rows=`cat ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}|grep loaded |awk '{print $11" "$12}'| awk '{print $0"\b \t"}' |tail -1 |awk '{print $1}' `
times_rows=`cat ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}|grep loaded |awk '{print $5}'|head -1 |awk '{print $1}' |sed "s/sec//g" `
log_info "${FORMAT} ${USE_CASE} data compression has been completed"
disk_usage_after=$(set_command "du -s ${TimePath} --exclude="pgsql_tmp"| cut -f 1 " )
disk_usage_after=$(set_command "du -ks ${TimePath} --exclude="pgsql_tmp"| cut -f 1 " )
log_debug "disk usage before load :$disk_usage_before and disk usage after load: ${disk_usage_after}"
disk_usage=`expr ${disk_usage_after} - ${disk_usage_before}`
disk_usage=$((disk_usage_after - disk_usage_before))
log_debug "${FORMAT},${USE_CASE},${SCALE},${BATCH_SIZE},${NUM_WORKER},${speeds_rows},${times_rows},${speed_metrics},${disk_usage},0,${records_per_table}"
log_debug "target file: ${BULK_DATA_DIR_RES_LOAD}/load_input.csv"
echo ${FORMAT},${USE_CASE},${SCALE},${BATCH_SIZE},${NUM_WORKER},${speeds_rows},${times_rows},${speed_metrics},${disk_usage},0,${records_per_table} >> ${BULK_DATA_DIR_RES_LOAD}/load_input.csv
Expand Down Expand Up @@ -227,15 +227,26 @@ elif [ ${FORMAT} == "influx" ] || [ ${FORMAT} == "influx3" ]; then
log_error "influxdb3 failed to start"
exit 0
fi
log_debug "influxdb3 started successfully"
unset influxdb3_auth_token
token=$(get_influxdb3_token ${serverIP} ${DATABASE_PORT})
if [ $? -ne 0 ]; then
log_error "Failed to get influxdb3 token"
exit 0
fi
export influxdb3_auth_token=${token}
log_debug "Get influxdb3 token successfully"
fi
if [ -d "${InfPath}" ]; then
disk_usage_before=`set_command "du -s ${InfPath} | cut -f 1 " `
disk_usage_before=`set_command "du -sk ${InfPath} | cut -f 1 " `
else
disk_usage_before=0
fi
log_debug "COMMAND:${load_command} BATCH_SIZE:${BATCH_SIZE} USE_CASE:${USE_CASE} FORMAT:${FORMAT} NUM_WORKER:${NUM_WORKER} SCALE:${SCALE}"
log_debug "cat ${BULK_DATA_DIR}/${INSERT_DATA_FILE_NAME}| gunzip | ${load_command} --workers=${NUM_WORKER} --batch-size=${BATCH_SIZE} --db-name=${DATABASE_NAME} --urls=http://${DATABASE_HOST}:${DATABASE_PORT} --hash-workers=true > ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}"
cat ${BULK_DATA_DIR}/${INSERT_DATA_FILE_NAME} | gunzip | ${load_command} --workers=${NUM_WORKER} --batch-size=${BATCH_SIZE} --db-name=${DATABASE_NAME} --urls=http://${DATABASE_HOST}:${DATABASE_PORT} --hash-workers=true > ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}
load_params="--workers=${NUM_WORKER} --batch-size=${BATCH_SIZE} --db-name=${DATABASE_NAME} --urls=http://${DATABASE_HOST}:${DATABASE_PORT} --hash-workers=true"
[ "${FORMAT}" == "influx3" ] && load_params+=" --auth-token ${token}"
log_debug "cat ${BULK_DATA_DIR}/${INSERT_DATA_FILE_NAME}| gunzip | ${load_command} ${load_params} > ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}"
cat ${BULK_DATA_DIR}/${INSERT_DATA_FILE_NAME} | gunzip | ${load_command} ${load_params} > ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}

speed_metrics=`cat ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}|grep loaded |awk '{print $11" "$12}'| awk '{print $0"\b \t"}' |head -1 |awk '{print $1}'`
speeds_rows=`cat ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}|grep loaded |awk '{print $11" "$12}'| awk '{print $0"\b \t"}' |tail -1 |awk '{print $1}' `
times_rows=`cat ${BULK_DATA_DIR_RES_LOAD}/${RESULT_NAME}|grep loaded |awk '{print $5}'|head -1 |awk '{print $1}' |sed "s/sec//g" `
Expand All @@ -261,7 +272,7 @@ elif [ ${FORMAT} == "influx" ] || [ ${FORMAT} == "influx3" ]; then
fi
log_debug "influxdb data compression has been completed"
set_command "rm -rf /usr/local/src/teststatus.log"
disk_usage_after=`set_command "du -s ${InfPath} | cut -f 1 " `
disk_usage_after=`set_command "du -sk ${InfPath} | cut -f 1 " `
log_debug "disk_usage_before: ${disk_usage_before}, disk_usage_after: ${disk_usage_after}"
disk_usage=$((disk_usage_after - disk_usage_before))
log_debug "${FORMAT},${USE_CASE},${SCALE},${BATCH_SIZE},${NUM_WORKER},${speeds_rows},${times_rows},${speed_metrics},${disk_usage},0,${records_per_table}"
Expand Down Expand Up @@ -300,7 +311,7 @@ elif [ ${FORMAT} == "TDengine" ] || [ ${FORMAT} == "TDengineStmt2" ]; then
sleep 2"

if [ -d "${TDPath}" ]; then
disk_usage_before=`set_command "du -s ${TDPath}/vnode | cut -f 1 " `
disk_usage_before=`set_command "du -sk ${TDPath}/vnode | cut -f 1 " `
else
disk_usage_before=0
fi
Expand Down Expand Up @@ -342,11 +353,11 @@ elif [ ${FORMAT} == "TDengine" ] || [ ${FORMAT} == "TDengineStmt2" ]; then
fi
log_debug "TDengine data writing to disk has been completed "
set_command "rm -rf /usr/local/src/teststatus.log"
disk_usage_after=`set_command "du -s ${TDPath}/vnode | cut -f 1 " `
log_debug "${disk_usage_before},${disk_usage_after}"
wal_uasge=`set_command "du ${TDPath}/vnode/*/wal/ -cs|tail -1 | cut -f 1 " `
disk_usage_after=`set_command "du -ks ${TDPath}/vnode | cut -f 1 " `
wal_uasge=`set_command "du ${TDPath}/vnode/*/wal/ -cs -k |tail -1 | cut -f 1 " `
disk_usage_nowal=`expr ${disk_usage_after} - ${disk_usage_before} - ${wal_uasge}`
disk_usage=`expr ${disk_usage_after} - ${disk_usage_before}`
log_debug "disk_usage_before: ${disk_usage_before}, disk_usage_after: ${disk_usage_after}"
disk_usage=$((disk_usage_after - disk_usage_before))
log_debug "${FORMAT},${USE_CASE},${SCALE},${BATCH_SIZE},${NUM_WORKER},${speeds_rows},${times_rows},${speed_metrics},${disk_usage},${disk_usage_nowal},${records_per_table}"
echo ${FORMAT},${USE_CASE},${SCALE},${BATCH_SIZE},${NUM_WORKER},${speeds_rows},${times_rows},${speed_metrics},${disk_usage},${disk_usage_nowal},${records_per_table} >> ${BULK_DATA_DIR_RES_LOAD}/load_input.csv
else
Expand Down
Loading