From 5be21b10c3e7c07b69b4e4550e652a9f3a97be38 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Fri, 13 Jan 2023 11:52:05 +0800 Subject: [PATCH 01/13] Fit CnosDB v2.0.1 --- .gitignore | 1 + .../databases/cnosdb/common.go | 16 +-- cmd/generate_queries/databases/cnosdb/iot.go | 120 +++++++++--------- cmd/load_cnosdb/creator.go | 92 ++++++++------ cmd/load_cnosdb/http_writer.go | 3 +- cmd/run_queries_cnosdb/http_client.go | 47 ++++--- cmd/run_queries_cnosdb/main.go | 21 ++- 7 files changed, 161 insertions(+), 139 deletions(-) diff --git a/.gitignore b/.gitignore index caf0afa..aa4bbda 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ .idea .vscode *~ +/bin/ # High Dynamic Range (HDR) Histogram files *.hdr diff --git a/cmd/generate_queries/databases/cnosdb/common.go b/cmd/generate_queries/databases/cnosdb/common.go index f2e9e61..20bdec5 100644 --- a/cmd/generate_queries/databases/cnosdb/common.go +++ b/cmd/generate_queries/databases/cnosdb/common.go @@ -1,10 +1,8 @@ package cnosdb import ( - "fmt" - "net/url" "time" - + "github.com/cnosdb/tsdb-comparisons/cmd/generate_queries/uses/iot" "github.com/cnosdb/tsdb-comparisons/cmd/generate_queries/utils" "github.com/cnosdb/tsdb-comparisons/pkg/query" @@ -21,29 +19,27 @@ func (g *BaseGenerator) GenerateEmptyQuery() query.Query { // fillInQuery fills the query struct with data. func (g *BaseGenerator) fillInQuery(qi query.Query, humanLabel, humanDesc, cnosql string) { - v := url.Values{} - v.Set("q", cnosql) q := qi.(*query.HTTP) q.HumanLabel = []byte(humanLabel) q.RawQuery = []byte(cnosql) q.HumanDescription = []byte(humanDesc) q.Method = []byte("POST") - q.Path = []byte(fmt.Sprintf("/query?%s", v.Encode())) - q.Body = nil + q.Path = []byte("/api/v1/sql") + q.Body = []byte(cnosql) } // NewIoT creates a new iot use case query generator. func (g *BaseGenerator) NewIoT(start, end time.Time, scale int) (utils.QueryGenerator, error) { core, err := iot.NewCore(start, end, scale) - + if err != nil { return nil, err } - + devops := &IoT{ BaseGenerator: g, Core: core, } - + return devops, nil } diff --git a/cmd/generate_queries/databases/cnosdb/iot.go b/cmd/generate_queries/databases/cnosdb/iot.go index 73fc041..8da06fa 100644 --- a/cmd/generate_queries/databases/cnosdb/iot.go +++ b/cmd/generate_queries/databases/cnosdb/iot.go @@ -4,7 +4,7 @@ import ( "fmt" "strings" "time" - + "github.com/cnosdb/tsdb-comparisons/cmd/generate_queries/databases" "github.com/cnosdb/tsdb-comparisons/cmd/generate_queries/uses/iot" "github.com/cnosdb/tsdb-comparisons/pkg/query" @@ -31,7 +31,7 @@ func (i *IoT) getTrucksWhereWithNames(names []string) string { for _, s := range names { nameClauses = append(nameClauses, fmt.Sprintf("\"name\" = '%s'", s)) } - + combinedHostnameClause := strings.Join(nameClauses, " or ") return "(" + combinedHostnameClause + ")" } @@ -52,83 +52,83 @@ func (i *IoT) LastLocByTruck(qi query.Query, nTrucks int) { ORDER BY "time" LIMIT 1`, i.getTruckWhereString(nTrucks)) - + humanLabel := "cnosdb last location by specific truck" humanDesc := fmt.Sprintf("%s: random %4d trucks", humanLabel, nTrucks) - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } // LastLocPerTruck finds all the truck locations along with truck and driver names. func (i *IoT) LastLocPerTruck(qi query.Query) { - - cnosql := fmt.Sprintf(`SELECT "latitude", "longitude" + + cnosql := fmt.Sprintf(`SELECT min("latitude"), min("longitude") FROM "readings" WHERE "fleet"='%s' GROUP BY "name","driver" - ORDER BY "time" + ORDER BY "name","driver" LIMIT 1`, i.GetRandomFleet()) - + humanLabel := "cnosdb last location per truck" humanDesc := humanLabel - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } // TrucksWithLowFuel finds all trucks with low fuel (less than 10%). func (i *IoT) TrucksWithLowFuel(qi query.Query) { - cnosql := fmt.Sprintf(`SELECT "name", "driver", "fuel_state" + cnosql := fmt.Sprintf(`SELECT "name", min("driver"), min("fuel_state") FROM "diagnostics" WHERE "fuel_state" <= 0.1 AND "fleet" = '%s' GROUP BY "name" - ORDER BY "time" DESC + ORDER BY "name" DESC LIMIT 1`, i.GetRandomFleet()) - + humanLabel := "cnosdb trucks with low fuel" humanDesc := fmt.Sprintf("%s: under 10 percent", humanLabel) - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } // TrucksWithHighLoad finds all trucks that have load over 90%. func (i *IoT) TrucksWithHighLoad(qi query.Query) { - cnosql := fmt.Sprintf(`SELECT "name", "driver", "current_load", "load_capacity" - FROM (SELECT "current_load", "load_capacity" + cnosql := fmt.Sprintf(`SELECT "name", min("driver"), min("current_load"), min("load_capacity") + FROM (SELECT "name", "driver", min("current_load") AS "current_load" ,min("load_capacity") AS "load_capacity" FROM "diagnostics" WHERE fleet = '%s' GROUP BY "name","driver" - ORDER BY "time" DESC + ORDER BY "name","driver" DESC LIMIT 1) WHERE "current_load" >= 0.9 * "load_capacity" GROUP BY "name" - ORDER BY "time" DESC`, + ORDER BY "name" DESC`, i.GetRandomFleet()) - + humanLabel := "cnosdb trucks with high load" humanDesc := fmt.Sprintf("%s: over 90 percent", humanLabel) - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } // StationaryTrucks finds all trucks that have low average velocity in a time window. func (i *IoT) StationaryTrucks(qi query.Query) { interval := i.Interval.MustRandWindow(iot.StationaryDuration) - cnosql := fmt.Sprintf(`SELECT "name", "driver" - FROM(SELECT mean("velocity") as mean_velocity + cnosql := fmt.Sprintf(`SELECT "name", min("driver") + FROM(SELECT "name", "driver", "fleet", avg("velocity") as mean_velocity FROM "readings" WHERE time > '%s' AND time <= '%s' - GROUP BY time(10m),"name","driver","fleet" + GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "name", "driver", "fleet" LIMIT 1) - WHERE "fleet" = '%s' AND "mean_velocity" < 1 + WHERE "fleet" = '%s' AND "mean_velocity" < 1 GROUP BY "name"`, interval.Start().Format(time.RFC3339), interval.End().Format(time.RFC3339), i.GetRandomFleet()) - + humanLabel := "cnosdb stationary trucks" humanDesc := fmt.Sprintf("%s: with low avg velocity in last 10 minutes", humanLabel) - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } @@ -136,11 +136,11 @@ func (i *IoT) StationaryTrucks(qi query.Query) { func (i *IoT) TrucksWithLongDrivingSessions(qi query.Query) { interval := i.Interval.MustRandWindow(iot.LongDrivingSessionDuration) cnosql := fmt.Sprintf(`SELECT "name","driver" - FROM(SELECT count(*) AS ten_min - FROM(SELECT mean("velocity") AS mean_velocity + FROM(SELECT "name", "driver", count(*) AS ten_min_mean_velocity + FROM(SELECT "name", "driver", avg("velocity") AS mean_velocity FROM readings WHERE "fleet" = '%s' AND time > '%s' AND time <= '%s' - GROUP BY time(10m),"name","driver") + GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "name", "driver") WHERE "mean_velocity" > 1 GROUP BY "name","driver") WHERE ten_min_mean_velocity > %d`, @@ -149,10 +149,10 @@ func (i *IoT) TrucksWithLongDrivingSessions(qi query.Query) { interval.End().Format(time.RFC3339), // Calculate number of 10 min intervals that is the max driving duration for the session if we rest 5 mins per hour. tenMinutePeriods(5, iot.LongDrivingSessionDuration)) - + humanLabel := "cnosdb trucks with longer driving sessions" humanDesc := fmt.Sprintf("%s: stopped less than 20 mins in 4 hour period", humanLabel) - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } @@ -160,11 +160,11 @@ func (i *IoT) TrucksWithLongDrivingSessions(qi query.Query) { func (i *IoT) TrucksWithLongDailySessions(qi query.Query) { interval := i.Interval.MustRandWindow(iot.DailyDrivingDuration) cnosql := fmt.Sprintf(`SELECT "name","driver" - FROM(SELECT count(*) AS ten_min - FROM(SELECT mean("velocity") AS mean_velocity + FROM(SELECT "name", "driver", count(*) AS ten_min_mean_velocity + FROM(SELECT "name", "driver", avg("velocity") AS mean_velocity FROM readings WHERE "fleet" = '%s' AND time > '%s' AND time <= '%s' - GROUP BY time(10m),"name","driver") + GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "name", "driver") WHERE "mean_velocity" > 1 GROUP BY "name","driver") WHERE ten_min_mean_velocity > %d`, @@ -173,23 +173,23 @@ func (i *IoT) TrucksWithLongDailySessions(qi query.Query) { interval.End().Format(time.RFC3339), // Calculate number of 10 min intervals that is the max driving duration for the session if we rest 35 mins per hour. tenMinutePeriods(35, iot.DailyDrivingDuration)) - + humanLabel := "cnosdb trucks with longer daily sessions" humanDesc := fmt.Sprintf("%s: drove more than 10 hours in the last 24 hours", humanLabel) - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } // AvgVsProjectedFuelConsumption calculates average and projected fuel consumption per fleet. func (i *IoT) AvgVsProjectedFuelConsumption(qi query.Query) { - cnosql := `SELECT mean("fuel_consumption") AS "mean_fuel_consumption", mean("nominal_fuel_consumption") AS "nominal_fuel_consumption" + cnosql := `SELECT avg("fuel_consumption") AS "mean_fuel_consumption", avg("nominal_fuel_consumption") AS "nominal_fuel_consumption" FROM "readings" WHERE "velocity" > 1 GROUP BY "fleet"` - + humanLabel := "cnosdb average vs projected fuel consumption per fleet" humanDesc := humanLabel - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } @@ -198,21 +198,21 @@ func (i *IoT) AvgDailyDrivingDuration(qi query.Query) { start := i.Interval.Start().Format(time.RFC3339) end := i.Interval.End().Format(time.RFC3339) cnosql := fmt.Sprintf(`SELECT count("mv")/6 as "hours driven" - FROM (SELECT mean("velocity") as "mv" + FROM (SELECT DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z') as "time", "fleet", "name", "driver", avg("velocity") as "mv" FROM "readings" WHERE time > '%s' AND time < '%s' - GROUP BY time(10m),"fleet", "name", "driver") + GROUP BY time, "fleet", "name", "driver") WHERE time > '%s' AND time < '%s' - GROUP BY time(1d),"fleet", "name", "driver"`, + GROUP BY DATE_BIN(INTERVAL '1 day', time, TIMESTAMP '1970-01-01T00:00:00Z'), "fleet", "name", "driver"`, start, end, start, end, ) - + humanLabel := "cnosdb average driver driving duration per day" humanDesc := humanLabel - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } @@ -225,43 +225,43 @@ func (i *IoT) AvgDailyDrivingSession(qi query.Query) { FROM (SELECT difference("difka"), elapsed("difka", 1m) FROM (SELECT "difka" FROM (SELECT difference("mv") AS difka - FROM (SELECT floor(mean("velocity")/10)/floor(mean("velocity")/10) AS "mv" + FROM (SELECT floor(avg("velocity")/10)/floor(avg("velocity")/10) AS "mv" FROM "readings" WHERE "name"!='' AND time > '%s' AND time < '%s' - GROUP BY time(10m), "name" fill(0)) + GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "name") GROUP BY "name") WHERE "difka"!=0 GROUP BY "name") GROUP BY "name") WHERE "difference" = -2 GROUP BY "name"; - SELECT mean("elapsed") + SELECT avg("elapsed") FROM "random_measure2_1" WHERE time > '%s' AND time < '%s' - GROUP BY time(1d),"name"`, + GROUP BY DATE_BIN(INTERVAL '1 day', time, TIMESTAMP '1970-01-01T00:00:00Z'),"name"`, start, end, start, end, ) - + humanLabel := "cnosdb average driver driving session without stopping per day" humanDesc := humanLabel - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } // AvgLoad finds the average load per truck model per fleet. func (i *IoT) AvgLoad(qi query.Query) { - cnosql := `SELECT mean("ml") AS mean_load_percentage + cnosql := `SELECT avg("ml") AS mean_load_percentage FROM (SELECT "current_load"/"load_capacity" AS "ml" FROM "diagnostics" GROUP BY "name", "fleet", "model") GROUP BY "fleet", "model"` - + humanLabel := "cnosdb average load per truck model per fleet" humanDesc := humanLabel - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } @@ -270,21 +270,21 @@ func (i *IoT) DailyTruckActivity(qi query.Query) { start := i.Interval.Start().Format(time.RFC3339) end := i.Interval.End().Format(time.RFC3339) cnosql := fmt.Sprintf(`SELECT count("ms")/144 - FROM (SELECT mean("status") AS ms + FROM (SELECT avg("status") AS ms FROM "diagnostics" WHERE time >= '%s' AND time < '%s' - GROUP BY time(10m), "model", "fleet") + GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "model", "fleet") WHERE time >= '%s' AND time < '%s' AND "ms"<1 - GROUP BY time(1d), "model", "fleet"`, + GROUP BY DATE_BIN(INTERVAL '10 day', time, TIMESTAMP '1970-01-01T00:00:00Z'), "model", "fleet"`, start, end, start, end, ) - + humanLabel := "cnosdb daily truck activity per fleet per model" humanDesc := humanLabel - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } @@ -299,7 +299,7 @@ func (i *IoT) TruckBreakdownFrequency(qi query.Query) { FROM "diagnostics" WHERE time >= '%s' AND time < '%s') WHERE time >= '%s' AND time < '%s' - GROUP BY time(10m),"model") + GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "model") GROUP BY "model") WHERE "state_changed" = 1 GROUP BY "model"`, @@ -308,10 +308,10 @@ func (i *IoT) TruckBreakdownFrequency(qi query.Query) { start, end, ) - + humanLabel := "cnosdb truck breakdown frequency per model" humanDesc := humanLabel - + i.fillInQuery(qi, humanLabel, humanDesc, cnosql) } diff --git a/cmd/load_cnosdb/creator.go b/cmd/load_cnosdb/creator.go index 28fc9a6..8f9cb87 100644 --- a/cmd/load_cnosdb/creator.go +++ b/cmd/load_cnosdb/creator.go @@ -1,13 +1,15 @@ package main import ( + "bytes" "encoding/json" "fmt" - "io/ioutil" + "io" "log" "net/http" - "net/url" "time" + + "github.com/valyala/fasthttp" ) type dbCreator struct { @@ -33,86 +35,98 @@ func (d *dbCreator) DBExists(dbName string) bool { } func (d *dbCreator) listDatabases() ([]string, error) { - u := fmt.Sprintf("%s/query?q=show%%20databases", d.daemonURL) - resp, err := http.Get(u) + u := fmt.Sprintf("%s/api/v1/sql", d.daemonURL) + sql := []byte("SHOW DATABASES") + req, err := http.NewRequest("POST", u, bytes.NewReader(sql)) + if err != nil { + return nil, err + } + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) + + client := &http.Client{} + resp, err := client.Do(req) if err != nil { - return nil, fmt.Errorf("listDatabases error: %s", err.Error()) + return nil, fmt.Errorf("listDatabases db error: %s", err.Error()) } defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + body, err := io.ReadAll(resp.Body) if err != nil { return nil, err } // Do ad-hoc parsing to find existing database names: - // {"results":[{"series":[{"name":"databases","columns":["name"],"values":[["_internal"],["benchmark_db"]]}]}]}% + // [{"Database":"public"}]% type listingType struct { - Results []struct { - Series []struct { - Values [][]string - } - } + Database string } - var listing listingType - err = json.Unmarshal(body, &listing) + var listingValues []listingType + err = json.Unmarshal(body, &listingValues) if err != nil { return nil, err } ret := []string{} - for _, nestedName := range listing.Results[0].Series[0].Values { - name := nestedName[0] - // the _internal database is skipped: - if name == "_internal" { - continue - } - ret = append(ret, name) + for _, db := range listingValues { + ret = append(ret, db.Database) } return ret, nil } func (d *dbCreator) RemoveOldDB(dbName string) error { - u := fmt.Sprintf("%s/query?q=drop+database+%s", d.daemonURL, dbName) - resp, err := http.Post(u, "text/plain", nil) + u := fmt.Sprintf("%s/api/v1/sql", d.daemonURL) + sql := []byte("DROP DATABASE IF EXISTS " + dbName) + req, err := http.NewRequest("POST", u, bytes.NewReader(sql)) + if err != nil { + return err + } + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) + + client := &http.Client{} + resp, err := client.Do(req) if err != nil { return fmt.Errorf("drop db error: %s", err.Error()) } + defer resp.Body.Close() + // does the body need to be read into the void? + if resp.StatusCode != 200 { - return fmt.Errorf("drop db returned non-200 code: %d", resp.StatusCode) + respMsg, err := io.ReadAll(resp.Body) + if err == nil { + return fmt.Errorf("drop db returned non-200 code: %d: %s", resp.StatusCode, respMsg) + } else { + return fmt.Errorf("drop db returned non-200 code: %d", resp.StatusCode) + } } + time.Sleep(time.Second) return nil } func (d *dbCreator) CreateDB(dbName string) error { - u, err := url.Parse(d.daemonURL) - if err != nil { - return err - } - - // serialize params the right way: - u.Path = "query" - v := u.Query() - v.Set("consistency", "all") - v.Set("q", fmt.Sprintf("CREATE DATABASE %s WITH REPLICATION %d", dbName, replicationFactor)) - u.RawQuery = v.Encode() - - req, err := http.NewRequest("GET", u.String(), nil) + u := fmt.Sprintf("%s/api/v1/sql", d.daemonURL) + sql := []byte("CREATE DATABASE " + dbName) + req, err := http.NewRequest("POST", u, bytes.NewReader(sql)) if err != nil { return err } + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) client := &http.Client{} resp, err := client.Do(req) if err != nil { - return err + return fmt.Errorf("create db error: %s", err.Error()) } defer resp.Body.Close() // does the body need to be read into the void? if resp.StatusCode != 200 { - return fmt.Errorf("bad db create") + respMsg, err := io.ReadAll(resp.Body) + if err == nil { + return fmt.Errorf("drop db returned non-200 code: %d: %s", resp.StatusCode, respMsg) + } else { + return fmt.Errorf("create db returned non-200 code: %d", resp.StatusCode) + } } time.Sleep(time.Second) diff --git a/cmd/load_cnosdb/http_writer.go b/cmd/load_cnosdb/http_writer.go index f3dc46b..58ae942 100644 --- a/cmd/load_cnosdb/http_writer.go +++ b/cmd/load_cnosdb/http_writer.go @@ -70,11 +70,12 @@ func basicAuth(username, password string) string { auth := username + ":" + password return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) } + func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) { req.Header.SetContentTypeBytes(textPlain) req.Header.SetMethodBytes(methodPost) req.Header.SetRequestURIBytes(w.url) - req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("cnosdb", "")) + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) if isGzip { req.Header.Add(headerContentEncoding, headerGzip) diff --git a/cmd/run_queries_cnosdb/http_client.go b/cmd/run_queries_cnosdb/http_client.go index d608777..abfbab4 100644 --- a/cmd/run_queries_cnosdb/http_client.go +++ b/cmd/run_queries_cnosdb/http_client.go @@ -1,16 +1,20 @@ package main import ( + "bytes" + "encoding/base64" "encoding/json" "fmt" + "io" "io/ioutil" "net/http" "net/url" "os" "sync" "time" - + "github.com/cnosdb/tsdb-comparisons/pkg/query" + "github.com/valyala/fasthttp" ) var bytesSlash = []byte("/") // heap optimization @@ -28,7 +32,7 @@ type HTTPClient struct { type HTTPClientDoOptions struct { Debug int PrettyPrintResponses bool - chunkSize uint64 + tenant string database string } @@ -55,6 +59,11 @@ func NewHTTPClient(host string) *HTTPClient { } } +func basicAuth(username, password string) string { + auth := username + ":" + password + return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) +} + // Do performs the action specified by the given Query. It uses fasthttp, and // tries to minimize heap allocations. func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, err error) { @@ -63,18 +72,15 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, w.uri = append(w.uri, w.Host...) // w.uri = append(w.uri, bytesSlash...) w.uri = append(w.uri, q.Path...) - w.uri = append(w.uri, []byte("&db="+url.QueryEscape(opts.database))...) - if opts.chunkSize > 0 { - s := fmt.Sprintf("&chunked=true&chunk_size=%d", opts.chunkSize) - w.uri = append(w.uri, []byte(s)...) - } - + w.uri = append(w.uri, []byte("?tenant=cnosdb&db="+url.QueryEscape(opts.database))...) + // populate a request with data from the Query: - req, err := http.NewRequest(string(q.Method), string(w.uri), nil) + req, err := http.NewRequest(string(q.Method), string(w.uri), bytes.NewReader(q.Body)) + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) if err != nil { panic(err) } - + // Perform the request while tracking latency: start := time.Now() resp, err := w.client.Do(req) @@ -83,18 +89,23 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { - panic("http request did not return status 200 OK") + respMsg, err := io.ReadAll(resp.Body) + if err == nil { + panic(fmt.Sprintf("query request returned non-200 code: %d: %s", resp.StatusCode, respMsg)) + } else { + panic(fmt.Sprintf("query request returned non-200 code: %d", resp.StatusCode)) + } } - + var body []byte body, err = ioutil.ReadAll(resp.Body) - + if err != nil { panic(err) } - + lag = float64(time.Since(start).Nanoseconds()) / 1e6 // milliseconds - + if opts != nil { // Print debug messages, if applicable: switch opts.Debug { @@ -111,12 +122,12 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, fmt.Fprintf(os.Stderr, "debug: response: %s\n", string(body)) default: } - + // Pretty print JSON responses, if applicable: if opts.PrettyPrintResponses { // Assumes the response is JSON! This holds for Influx // and Elastic. - + prefix := fmt.Sprintf("ID %d: ", q.GetID()) var v interface{} var line []byte @@ -131,6 +142,6 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, fmt.Println(string(line) + "\n") } } - + return lag, err } diff --git a/cmd/run_queries_cnosdb/main.go b/cmd/run_queries_cnosdb/main.go index fdb30a3..03b2d50 100644 --- a/cmd/run_queries_cnosdb/main.go +++ b/cmd/run_queries_cnosdb/main.go @@ -9,7 +9,7 @@ import ( "fmt" "log" "strings" - + "github.com/blagojts/viper" "github.com/cnosdb/tsdb-comparisons/internal/utils" "github.com/cnosdb/tsdb-comparisons/pkg/query" @@ -32,30 +32,30 @@ func init() { var config query.BenchmarkRunnerConfig config.AddToFlagSet(pflag.CommandLine) var csvDaemonUrls string - - pflag.String("urls", "http://localhost:8086/", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") + + pflag.String("urls", "http://localhost:31007/", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.") - + pflag.Parse() - + err := utils.SetupConfigFile() - + if err != nil { panic(fmt.Errorf("fatal error config file: %s", err)) } - + if err := viper.Unmarshal(&config); err != nil { panic(fmt.Errorf("unable to decode config: %s", err)) } - + csvDaemonUrls = viper.GetString("urls") chunkSize = viper.GetUint64("chunk-response-size") - + daemonUrls = strings.Split(csvDaemonUrls, ",") if len(daemonUrls) == 0 { log.Fatal("missing 'urls' flag") } - + runner = query.NewBenchmarkRunner(config) } @@ -74,7 +74,6 @@ func (p *processor) Init(workerNumber int) { p.opts = &HTTPClientDoOptions{ Debug: runner.DebugLevel(), PrettyPrintResponses: runner.DoPrintResponses(), - chunkSize: chunkSize, database: runner.DatabaseName(), } url := daemonUrls[workerNumber%len(daemonUrls)] From be24ab98a300ed844e155aaa0c29c291d7ff7e8d Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Wed, 18 Jan 2023 10:56:34 +0800 Subject: [PATCH 02/13] fix: insert a null instead of empty slot for tdengine load-data --- pkg/data/serialize/util.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/data/serialize/util.go b/pkg/data/serialize/util.go index 3196d6a..5a394a3 100644 --- a/pkg/data/serialize/util.go +++ b/pkg/data/serialize/util.go @@ -29,6 +29,7 @@ func FastFormatAppend(v interface{}, buf []byte) []byte { buf = append(buf, v.(string)...) return buf case nil: + buf = append(buf, []byte("NULL")...) return buf default: panic(fmt.Sprintf("unknown field type for %#v", v)) From e7b9b80598e3087793e4d2d2a64b6bdf0512389c Mon Sep 17 00:00:00 2001 From: yanyunliu Date: Thu, 9 Feb 2023 11:51:02 +0800 Subject: [PATCH 03/13] write error not panic, print the error message --- cmd/load_cnosdb/process.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/load_cnosdb/process.go b/cmd/load_cnosdb/process.go index 0cd2db0..a383b50 100644 --- a/cmd/load_cnosdb/process.go +++ b/cmd/load_cnosdb/process.go @@ -60,7 +60,7 @@ func (p *processor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint64) } } if err != nil { - fatal("Error writing: %s\n", err.Error()) + fmt.Printf("Error writing: %s\n", err.Error()) } } metricCnt := batch.metrics From b94fd87cffee914b6121d046a56a9f9ea221d7e3 Mon Sep 17 00:00:00 2001 From: ZuoTiJia <2239651886@qq.com> Date: Thu, 16 Mar 2023 17:15:30 +0800 Subject: [PATCH 04/13] fix: fix avg daily driving duration sql --- cmd/generate_queries/databases/cnosdb/iot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/generate_queries/databases/cnosdb/iot.go b/cmd/generate_queries/databases/cnosdb/iot.go index 8da06fa..f2d161c 100644 --- a/cmd/generate_queries/databases/cnosdb/iot.go +++ b/cmd/generate_queries/databases/cnosdb/iot.go @@ -201,7 +201,7 @@ func (i *IoT) AvgDailyDrivingDuration(qi query.Query) { FROM (SELECT DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z') as "time", "fleet", "name", "driver", avg("velocity") as "mv" FROM "readings" WHERE time > '%s' AND time < '%s' - GROUP BY time, "fleet", "name", "driver") + GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "fleet", "name", "driver") WHERE time > '%s' AND time < '%s' GROUP BY DATE_BIN(INTERVAL '1 day', time, TIMESTAMP '1970-01-01T00:00:00Z'), "fleet", "name", "driver"`, start, From a43d2b3c85c68e90546309b64d61d750508e854b Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Tue, 4 Apr 2023 15:58:15 +0800 Subject: [PATCH 05/13] Upgrade iotdb_client to v1.1.0 --- cmd/load/main.go | 7 ++++++- cmd/load_cnosdb/scan.go | 2 +- go.mod | 3 +-- go.sum | 21 +++++++++++++-------- pkg/targets/iotdb/creator.go | 7 ++++--- pkg/targets/iotdb/process.go | 5 +++-- 6 files changed, 28 insertions(+), 17 deletions(-) diff --git a/cmd/load/main.go b/cmd/load/main.go index 414f6a5..e52b2a3 100644 --- a/cmd/load/main.go +++ b/cmd/load/main.go @@ -1,5 +1,10 @@ package main +import "log" + func main() { - rootCmd.Execute() + err := rootCmd.Execute() + if err != nil { + log.Fatal(err) + } } diff --git a/cmd/load_cnosdb/scan.go b/cmd/load_cnosdb/scan.go index 4b8c22c..42deba5 100644 --- a/cmd/load_cnosdb/scan.go +++ b/cmd/load_cnosdb/scan.go @@ -45,7 +45,7 @@ func (b *batch) Append(item data.LoadedPoint) { that := item.Data.([]byte) thatStr := string(that) b.rows++ - // Each cnosdb line is format "csv-tags csv-fields timestamp", so we split by space + // Each line is format "csv-tags csv-fields timestamp", so we split by space // and then on the middle element, we split by comma to count number of fields added args := strings.Split(thatStr, " ") if len(args) != 3 { diff --git a/go.mod b/go.mod index 35f8d1e..f39ffb7 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/HdrHistogram/hdrhistogram-go v1.0.0 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 - github.com/apache/iotdb-client-go v0.12.0 + github.com/apache/iotdb-client-go v1.1.0 github.com/blagojts/viper v1.6.3-0.20200313094124-068f44cf5e69 github.com/go-ole/go-ole v1.2.4 // indirect github.com/go-sql-driver/mysql v1.5.0 // indirect @@ -30,5 +30,4 @@ require ( golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a // indirect golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e gopkg.in/yaml.v2 v2.3.0 - gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect ) diff --git a/go.sum b/go.sum index 495f956..bc914ca 100644 --- a/go.sum +++ b/go.sum @@ -29,10 +29,10 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883 h1:bvNMNQO63//z+xNg github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/andybalholm/brotli v1.0.0 h1:7UCwP93aiSfvWpapti8g88vVVGp2qqtGyePsSuDafo4= github.com/andybalholm/brotli v1.0.0/go.mod h1:loMXtMfwqflxFJPmdbJO0a3KNoPuLBgiu3qAvBg8x/Y= -github.com/apache/iotdb-client-go v0.12.0 h1:Cq9k2ndylXhRPZLO6ovQOU8CJ5UhC/RyYUdwxL2D0IE= -github.com/apache/iotdb-client-go v0.12.0/go.mod h1:oUQAIsitLK9aaQdWP0jkOybLtD9iI3QH7IFcbjl3VBQ= -github.com/apache/thrift v0.13.0 h1:5hryIiq9gtn+MiLVn0wP37kb/uTeRZgN08WoCsAhIhI= -github.com/apache/thrift v0.13.0/go.mod h1:cp2SuWMxlEZw2r+iP2GNCdIi4C1qmUzdZFSVb+bacwQ= +github.com/apache/iotdb-client-go v1.1.0 h1:QFrDc4YNqmQdWwaDBvENPrWirclw9QOmyeyH4zQyDgs= +github.com/apache/iotdb-client-go v1.1.0/go.mod h1:3D6QYkqRmASS/4HsjU+U/3fscyc5M9xKRfywZsKuoZY= +github.com/apache/thrift v0.15.0 h1:aGvdaR0v1t9XLgjtBYwxcBvBOTMqClzwE26CHOgjW1Y= +github.com/apache/thrift v0.15.0/go.mod h1:PHK3hniurgQaNMZYaCLEqXKsYK8upmhPbmdP2FXSqgU= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY= @@ -106,6 +106,7 @@ github.com/golang/groupcache v0.0.0-20190129154638-5b532d6fd5ef/go.mod h1:cIg4er github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= +github.com/golang/mock v1.5.0/go.mod h1:CWnOUgYIOo4TcNZ0wHX3YZCqsaM1I1Jvs6v3mP3KVu8= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -400,14 +401,18 @@ github.com/ssgreg/nlreturn/v2 v2.1.0/go.mod h1:E/iiPB78hV7Szg2YfRgyIrk1AD6JVMTRk github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/taosdata/driver-go/v2 v2.0.1 h1:7O3/gDASUUf07shVcelbp547hUT7Hxn3lYz7pMJqCIw= @@ -650,8 +655,8 @@ gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 h1:tQIYjPdBoyREyB9XMu+nnTclpTYkz2zFM+lzLJFO4gQ= -gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/targets/iotdb/creator.go b/pkg/targets/iotdb/creator.go index a5c36a7..b0de348 100644 --- a/pkg/targets/iotdb/creator.go +++ b/pkg/targets/iotdb/creator.go @@ -35,7 +35,7 @@ type dbCreator struct { ds targets.DataSource opts *LoadingOptions - session *client.Session + session client.Session } func (d *dbCreator) Init() { @@ -45,7 +45,8 @@ func (d *dbCreator) Init() { Host: d.opts.Host, Port: d.opts.Port, UserName: d.opts.User, - Password: d.opts.Pass} + Password: d.opts.Pass, + } d.session = client.NewSession(config) if err := d.session.Open(false, 0); err != nil { fmt.Printf("Connect to iotdb %+v failed %v\n", config, err) @@ -84,7 +85,7 @@ func (d *dbCreator) PostCreateDB(dbName string) error { tableCols[tableName] = columns path := "root." + dbName + "." + tableName - d.session.ExecuteStatement("delete from " + path) + d.session.DeleteTimeseries([]string{path}) //fmt.Printf("=== path %s\n", path) for i, name := range tagNames { diff --git a/pkg/targets/iotdb/process.go b/pkg/targets/iotdb/process.go index d3d2fb8..7f3b5e5 100644 --- a/pkg/targets/iotdb/process.go +++ b/pkg/targets/iotdb/process.go @@ -19,7 +19,7 @@ type processor struct { opts *LoadingOptions dbName string - session *client.Session + session client.Session } func newProcessor(opts *LoadingOptions, dbName string) *processor { @@ -34,7 +34,8 @@ func (p *processor) Init(_ int, doLoad, hashWorkers bool) { Host: p.opts.Host, Port: p.opts.Port, UserName: p.opts.User, - Password: p.opts.Pass} + Password: p.opts.Pass, + } p.session = client.NewSession(config) if err := p.session.Open(false, 0); err != nil { fmt.Printf("Connect to iotdb %+v failed %v\n", config, err) From 198a83225d24505ff9943bed44829fcc93e719bc Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Mon, 17 Apr 2023 16:42:30 +0800 Subject: [PATCH 06/13] Fit cnosdb v2.2.0 --- cmd/run_queries_cnosdb/http_client.go | 69 ++++++++++++--------------- cmd/run_queries_cnosdb/main.go | 10 ++-- 2 files changed, 38 insertions(+), 41 deletions(-) diff --git a/cmd/run_queries_cnosdb/http_client.go b/cmd/run_queries_cnosdb/http_client.go index abfbab4..975c19d 100644 --- a/cmd/run_queries_cnosdb/http_client.go +++ b/cmd/run_queries_cnosdb/http_client.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "io/ioutil" "net/http" "net/url" "os" @@ -14,25 +13,20 @@ import ( "time" "github.com/cnosdb/tsdb-comparisons/pkg/query" - "github.com/valyala/fasthttp" ) -var bytesSlash = []byte("/") // heap optimization - // HTTPClient is a reusable HTTP Client. type HTTPClient struct { - // client fasthttp.Client - client *http.Client - Host []byte - HostString string - uri []byte + client *http.Client + url []byte + urlPrefixLen int + basicAuth string } // HTTPClientDoOptions wraps options uses when calling `Do`. type HTTPClientDoOptions struct { - Debug int - PrettyPrintResponses bool - tenant string + debug int + prettyPrintResponses bool database string } @@ -50,12 +44,12 @@ func getHttpClient() *http.Client { } // NewHTTPClient creates a new HTTPClient. -func NewHTTPClient(host string) *HTTPClient { +func NewHTTPClient(url string) *HTTPClient { return &HTTPClient{ - client: getHttpClient(), - Host: []byte(host), - HostString: host, - uri: []byte{}, // heap optimization + client: getHttpClient(), + url: []byte(url), + urlPrefixLen: len(url), + basicAuth: basicAuth("root", ""), } } @@ -67,16 +61,15 @@ func basicAuth(username, password string) string { // Do performs the action specified by the given Query. It uses fasthttp, and // tries to minimize heap allocations. func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, err error) { - // populate uri from the reusable byte slice: - w.uri = w.uri[:0] - w.uri = append(w.uri, w.Host...) - // w.uri = append(w.uri, bytesSlash...) - w.uri = append(w.uri, q.Path...) - w.uri = append(w.uri, []byte("?tenant=cnosdb&db="+url.QueryEscape(opts.database))...) + w.url = w.url[:w.urlPrefixLen] + w.url = append(w.url, []byte(url.QueryEscape(opts.database))...) // populate a request with data from the Query: - req, err := http.NewRequest(string(q.Method), string(w.uri), bytes.NewReader(q.Body)) - req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) + req, err := http.NewRequest(string(q.Method), string(w.url), bytes.NewReader(q.Body)) + if err != nil { + panic(err) + } + req.Header.Add("Authorization", w.basicAuth) if err != nil { panic(err) } @@ -98,7 +91,7 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, } var body []byte - body, err = ioutil.ReadAll(resp.Body) + body, err = io.ReadAll(resp.Body) if err != nil { panic(err) @@ -108,23 +101,23 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, if opts != nil { // Print debug messages, if applicable: - switch opts.Debug { + switch opts.debug { case 1: - fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms\n", q.HumanLabel, lag) + _, _ = fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms\n", q.HumanLabel, lag) case 2: - fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + _, _ = fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) case 3: - fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) - fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String())) + _, _ = fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + _, _ = fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String())) case 4: - fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) - fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String())) - fmt.Fprintf(os.Stderr, "debug: response: %s\n", string(body)) + _, _ = fmt.Fprintf(os.Stderr, "debug: %s in %7.2fms -- %s\n", q.HumanLabel, lag, q.HumanDescription) + _, _ = fmt.Fprintf(os.Stderr, "debug: request: %s\n", string(q.String())) + _, _ = fmt.Fprintf(os.Stderr, "debug: response: %s\n", string(body)) default: } // Pretty print JSON responses, if applicable: - if opts.PrettyPrintResponses { + if opts.prettyPrintResponses { // Assumes the response is JSON! This holds for Influx // and Elastic. @@ -132,9 +125,9 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, var v interface{} var line []byte full := make(map[string]interface{}) - full["influxql"] = string(q.RawQuery) - json.Unmarshal(body, &v) - full["response"] = v + full["sql"] = string(q.RawQuery) + _ = json.Unmarshal(body, &v) + full["result"] = v line, err = json.MarshalIndent(full, prefix, " ") if err != nil { return diff --git a/cmd/run_queries_cnosdb/main.go b/cmd/run_queries_cnosdb/main.go index 03b2d50..f81c04e 100644 --- a/cmd/run_queries_cnosdb/main.go +++ b/cmd/run_queries_cnosdb/main.go @@ -33,7 +33,7 @@ func init() { config.AddToFlagSet(pflag.CommandLine) var csvDaemonUrls string - pflag.String("urls", "http://localhost:31007/", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") + pflag.String("urls", "http://localhost:8902/", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") pflag.Uint64("chunk-response-size", 0, "Number of series to chunk results into. 0 means no chunking.") pflag.Parse() @@ -55,6 +55,10 @@ func init() { if len(daemonUrls) == 0 { log.Fatal("missing 'urls' flag") } + for i, url := range daemonUrls { + u := strings.TrimSuffix(url, "/") + daemonUrls[i] = u + "/api/v1/sql?tenant=cnosdb&db=" + } runner = query.NewBenchmarkRunner(config) } @@ -72,8 +76,8 @@ func newProcessor() query.Processor { return &processor{} } func (p *processor) Init(workerNumber int) { p.opts = &HTTPClientDoOptions{ - Debug: runner.DebugLevel(), - PrettyPrintResponses: runner.DoPrintResponses(), + debug: runner.DebugLevel(), + prettyPrintResponses: runner.DoPrintResponses(), database: runner.DatabaseName(), } url := daemonUrls[workerNumber%len(daemonUrls)] From b2580a03d667bfb7739cb1a923457c5eec0b186a Mon Sep 17 00:00:00 2001 From: "yukkit.zhang" Date: Sun, 23 Apr 2023 18:21:59 +0800 Subject: [PATCH 07/13] support AvgDailyDrivingSession,AvgLoad,DailyTruckActivity,TruckBreakdownFrequency --- cmd/generate_queries/databases/cnosdb/iot.go | 110 +++++++++++-------- 1 file changed, 62 insertions(+), 48 deletions(-) diff --git a/cmd/generate_queries/databases/cnosdb/iot.go b/cmd/generate_queries/databases/cnosdb/iot.go index f2d161c..86dc458 100644 --- a/cmd/generate_queries/databases/cnosdb/iot.go +++ b/cmd/generate_queries/databases/cnosdb/iot.go @@ -220,27 +220,28 @@ func (i *IoT) AvgDailyDrivingDuration(qi query.Query) { func (i *IoT) AvgDailyDrivingSession(qi query.Query) { start := i.Interval.Start().Format(time.RFC3339) end := i.Interval.End().Format(time.RFC3339) - cnosql := fmt.Sprintf(`SELECT "elapsed" - INTO "random_measure2_1" - FROM (SELECT difference("difka"), elapsed("difka", 1m) - FROM (SELECT "difka" - FROM (SELECT difference("mv") AS difka - FROM (SELECT floor(avg("velocity")/10)/floor(avg("velocity")/10) AS "mv" - FROM "readings" - WHERE "name"!='' AND time > '%s' AND time < '%s' - GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "name") - GROUP BY "name") - WHERE "difka"!=0 - GROUP BY "name") - GROUP BY "name") - WHERE "difference" = -2 - GROUP BY "name"; - SELECT avg("elapsed") - FROM "random_measure2_1" - WHERE time > '%s' AND time < '%s' - GROUP BY DATE_BIN(INTERVAL '1 day', time, TIMESTAMP '1970-01-01T00:00:00Z'),"name"`, - start, - end, + cnosql := fmt.Sprintf(`WITH driver_status + AS ( + SELECT name, time_window(time, '10m') as ten_minutes, avg(velocity) > 5 AS driving + FROM readings + WHERE name!='' AND time > '%s' AND time < '%s' + GROUP BY name, ten_minutes + ORDER BY name, ten_minutes.start + ), driver_status_change + AS ( + SELECT name, start, lead(start) OVER (PARTITION BY name ORDER BY start) AS stop, driving + FROM ( + SELECT name, ten_minutes.start AS start, driving, lag(driving) OVER (PARTITION BY name ORDER BY ten_minutes.start) AS prev_driving + FROM driver_status + ) x + WHERE x.driving <> x.prev_driving + ) + SELECT name, time_window(start, '24h') AS day, avg(stop::bigint - start::bigint) AS duration + FROM driver_status_change + WHERE name IS NOT NULL + AND driving = true + GROUP BY name, day + ORDER BY name, day.start`, start, end, ) @@ -253,11 +254,16 @@ func (i *IoT) AvgDailyDrivingSession(qi query.Query) { // AvgLoad finds the average load per truck model per fleet. func (i *IoT) AvgLoad(qi query.Query) { - cnosql := `SELECT avg("ml") AS mean_load_percentage - FROM (SELECT "current_load"/"load_capacity" AS "ml" - FROM "diagnostics" - GROUP BY "name", "fleet", "model") - GROUP BY "fleet", "model"` + start := i.Interval.Start().Format(time.RFC3339) + end := i.Interval.End().Format(time.RFC3339) + cnosql := fmt.Sprintf(`SELECT avg(current_load/load_capacity) + FROM diagnostics + WHERE time >= '%s' AND time < '%s' + GROUP BY fleet, model + limit 10`, + start, + end, + ) humanLabel := "cnosdb average load per truck model per fleet" humanDesc := humanLabel @@ -269,15 +275,14 @@ func (i *IoT) AvgLoad(qi query.Query) { func (i *IoT) DailyTruckActivity(qi query.Query) { start := i.Interval.Start().Format(time.RFC3339) end := i.Interval.End().Format(time.RFC3339) - cnosql := fmt.Sprintf(`SELECT count("ms")/144 - FROM (SELECT avg("status") AS ms - FROM "diagnostics" - WHERE time >= '%s' AND time < '%s' - GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "model", "fleet") - WHERE time >= '%s' AND time < '%s' AND "ms"<1 - GROUP BY DATE_BIN(INTERVAL '10 day', time, TIMESTAMP '1970-01-01T00:00:00Z'), "model", "fleet"`, - start, - end, + cnosql := fmt.Sprintf(`SELECT count(ms)/144 + FROM + (SELECT mean(status) AS ms, time_window(time, '10m') AS window, model, fleet + FROM diagnostics + WHERE time >= '%s' AND time < '%s' + GROUP BY window, model, fleet) + WHERE ms < 1 + GROUP BY time_window(window.start, '1d'), model, fleet`, start, end, ) @@ -292,19 +297,28 @@ func (i *IoT) DailyTruckActivity(qi query.Query) { func (i *IoT) TruckBreakdownFrequency(qi query.Query) { start := i.Interval.Start().Format(time.RFC3339) end := i.Interval.End().Format(time.RFC3339) - cnosql := fmt.Sprintf(`SELECT count("state_changed") - FROM (SELECT difference("broken_down") AS "state_changed" - FROM (SELECT floor(2*(sum("nzs")/count("nzs")))/floor(2*(sum("nzs")/count("nzs"))) AS "broken_down" - FROM (SELECT "model", "status"/"status" AS nzs - FROM "diagnostics" - WHERE time >= '%s' AND time < '%s') - WHERE time >= '%s' AND time < '%s' - GROUP BY DATE_BIN(INTERVAL '10 minutes', time, TIMESTAMP '1970-01-01T00:00:00Z'), "model") - GROUP BY "model") - WHERE "state_changed" = 1 - GROUP BY "model"`, - start, - end, + cnosql := fmt.Sprintf(`WITH base + AS ( + SELECT time, model, status/status AS nzs + FROM "diagnostics" + where model is not null + and time >= '%s' AND time < '%s' + ), breakdown_per_truck_per_ten_minutes + AS ( + SELECT time_window(TIME, '10m') AS ten_minutes, model, count(nzs) / count(*) < 0.5 AS broken_down + FROM base + GROUP BY ten_minutes, model + ), breakdowns_per_truck + AS ( + SELECT model, broken_down, lead(broken_down) OVER ( + PARTITION BY model ORDER BY ten_minutes.start + ) AS next_broken_down + FROM breakdown_per_truck_per_ten_minutes + ) + SELECT model, count(*) + FROM breakdowns_per_truck + WHERE broken_down = false AND next_broken_down = true + GROUP BY model`, start, end, ) From 065a90bfe9e70b6be53e2ccc2b12d1d2caf527be Mon Sep 17 00:00:00 2001 From: "yukkit.zhang" Date: Mon, 10 Jul 2023 13:50:32 +0800 Subject: [PATCH 08/13] adapt to cnosdb --- cmd/generate_queries/databases/cnosdb/iot.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/generate_queries/databases/cnosdb/iot.go b/cmd/generate_queries/databases/cnosdb/iot.go index 86dc458..ebf1876 100644 --- a/cmd/generate_queries/databases/cnosdb/iot.go +++ b/cmd/generate_queries/databases/cnosdb/iot.go @@ -222,7 +222,7 @@ func (i *IoT) AvgDailyDrivingSession(qi query.Query) { end := i.Interval.End().Format(time.RFC3339) cnosql := fmt.Sprintf(`WITH driver_status AS ( - SELECT name, time_window(time, '10m') as ten_minutes, avg(velocity) > 5 AS driving + SELECT name, time_window(time, interval '10 minutes') as ten_minutes, avg(velocity) > 5 AS driving FROM readings WHERE name!='' AND time > '%s' AND time < '%s' GROUP BY name, ten_minutes @@ -236,7 +236,7 @@ func (i *IoT) AvgDailyDrivingSession(qi query.Query) { ) x WHERE x.driving <> x.prev_driving ) - SELECT name, time_window(start, '24h') AS day, avg(stop::bigint - start::bigint) AS duration + SELECT name, time_window(start, interval '24 hours') AS day, avg(stop::bigint - start::bigint) AS duration FROM driver_status_change WHERE name IS NOT NULL AND driving = true @@ -277,12 +277,12 @@ func (i *IoT) DailyTruckActivity(qi query.Query) { end := i.Interval.End().Format(time.RFC3339) cnosql := fmt.Sprintf(`SELECT count(ms)/144 FROM - (SELECT mean(status) AS ms, time_window(time, '10m') AS window, model, fleet + (SELECT mean(status) AS ms, time_window(time, interval '10 minutes') AS window, model, fleet FROM diagnostics WHERE time >= '%s' AND time < '%s' GROUP BY window, model, fleet) WHERE ms < 1 - GROUP BY time_window(window.start, '1d'), model, fleet`, + GROUP BY time_window(window.start, interval '1 day'), model, fleet`, start, end, ) @@ -305,7 +305,7 @@ func (i *IoT) TruckBreakdownFrequency(qi query.Query) { and time >= '%s' AND time < '%s' ), breakdown_per_truck_per_ten_minutes AS ( - SELECT time_window(TIME, '10m') AS ten_minutes, model, count(nzs) / count(*) < 0.5 AS broken_down + SELECT time_window(TIME, interval '10 minutes') AS ten_minutes, model, count(nzs) / count(*) < 0.5 AS broken_down FROM base GROUP BY ten_minutes, model ), breakdowns_per_truck From e394061fe0f397abd76484ea6fa255b973e8c101 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Fri, 3 Nov 2023 19:34:36 +0800 Subject: [PATCH 09/13] Support gzip encoding in load_cnosdb, copied from load_influxdb --- cmd/load_cnosdb/process.go | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/cmd/load_cnosdb/process.go b/cmd/load_cnosdb/process.go index a383b50..df888f4 100644 --- a/cmd/load_cnosdb/process.go +++ b/cmd/load_cnosdb/process.go @@ -1,7 +1,9 @@ package main import ( + "bytes" "fmt" + "github.com/valyala/fasthttp" "time" "github.com/cnosdb/tsdb-comparisons/pkg/targets" @@ -48,8 +50,16 @@ func (p *processor) ProcessBatch(b targets.Batch, doLoad bool) (uint64, uint64) if doLoad { var err error for { - - _, err = p.httpWriter.WriteLineProtocol(batch.buf.Bytes(), false) + if useGzip { + compressedBatch := bufPool.Get().(*bytes.Buffer) + fasthttp.WriteGzip(compressedBatch, batch.buf.Bytes()) + _, err = p.httpWriter.WriteLineProtocol(compressedBatch.Bytes(), true) + // Return the compressed batch buffer to the pool. + compressedBatch.Reset() + bufPool.Put(compressedBatch) + } else { + _, err = p.httpWriter.WriteLineProtocol(batch.buf.Bytes(), false) + } if err == errBackoff { p.backingOffChan <- true From 5892d1ee14a8d36ea5544756e72c2035d605f174 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Wed, 15 May 2024 11:38:29 +0800 Subject: [PATCH 10/13] Add arg scanner-buffer-size, by defautl is 1024*1024, for reading long lines --- cmd/load_influx/main.go | 29 +++++++++++++++++++---------- 1 file changed, 19 insertions(+), 10 deletions(-) diff --git a/cmd/load_influx/main.go b/cmd/load_influx/main.go index a0884b0..4ac3e28 100644 --- a/cmd/load_influx/main.go +++ b/cmd/load_influx/main.go @@ -12,7 +12,7 @@ import ( "strings" "sync" "time" - + "github.com/blagojts/viper" "github.com/cnosdb/tsdb-comparisons/internal/utils" "github.com/cnosdb/tsdb-comparisons/load" @@ -30,6 +30,7 @@ var ( useGzip bool doAbortOnExist bool consistency string + scannerBufferSize int ) // Global vars @@ -57,41 +58,49 @@ func init() { config.AddToFlagSet(pflag.CommandLine) target.TargetSpecificFlags("", pflag.CommandLine) var csvDaemonURLs string - + pflag.Parse() - + err := utils.SetupConfigFile() - + if err != nil { panic(fmt.Errorf("fatal error config file: %s", err)) } - + if err := viper.Unmarshal(&config); err != nil { panic(fmt.Errorf("unable to decode config: %s", err)) } - + csvDaemonURLs = viper.GetString("urls") replicationFactor = viper.GetInt("replication-factor") consistency = viper.GetString("consistency") backoff = viper.GetDuration("backoff") useGzip = viper.GetBool("gzip") - + if _, ok := consistencyChoices[consistency]; !ok { log.Fatalf("invalid consistency settings") } - + daemonURLs = strings.Split(csvDaemonURLs, ",") if len(daemonURLs) == 0 { log.Fatal("missing 'urls' flag") } config.HashWorkers = false loader = load.GetBenchmarkRunner(config) + + scannerBufferSize = viper.GetInt("scanner-buffer-size") + if scannerBufferSize == 0 { + scannerBufferSize = 1024 * 1024 + } } type benchmark struct{} func (b *benchmark) GetDataSource() targets.DataSource { - return &fileDataSource{scanner: bufio.NewScanner(load.GetBufferedReader(config.FileName))} + scanner := bufio.NewScanner(load.GetBufferedReader(config.FileName)) + buf := make([]byte, 0, scannerBufferSize) + scanner.Buffer(buf, scannerBufferSize*4) + return &fileDataSource{scanner: scanner} } func (b *benchmark) GetBatchFactory() targets.BatchFactory { @@ -116,6 +125,6 @@ func main() { return bytes.NewBuffer(make([]byte, 0, 4*1024*1024)) }, } - + loader.RunBenchmark(&benchmark{}) } From 139c0c688244315fa5992125afba31bd77d126f9 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Sat, 1 Jun 2024 11:01:44 +0800 Subject: [PATCH 11/13] fix: back pressure messages for influxdb1.8 --- cmd/load_influx/http_writer.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/load_influx/http_writer.go b/cmd/load_influx/http_writer.go index fa0fbce..6f23434 100644 --- a/cmd/load_influx/http_writer.go +++ b/cmd/load_influx/http_writer.go @@ -7,7 +7,7 @@ import ( "fmt" "net/url" "time" - + "github.com/valyala/fasthttp" ) @@ -20,22 +20,22 @@ const ( var ( errBackoff = fmt.Errorf("backpressure is needed") backoffMagicWords0 = []byte("engine: cache maximum memory size exceeded") - backoffMagicWords1 = []byte("write failed: hinted handoff queue not empty") - backoffMagicWords2a = []byte("write failed: read message type: read tcp") + backoffMagicWords1 = []byte("hinted handoff queue not empty") + backoffMagicWords2a = []byte("read message type: read tcp") backoffMagicWords2b = []byte("i/o timeout") - backoffMagicWords3 = []byte("write failed: engine: cache-max-memory-size exceeded") + backoffMagicWords3 = []byte("engine: cache-max-memory-size exceeded") backoffMagicWords4 = []byte("timeout") - backoffMagicWords5 = []byte("write failed: can not exceed max connections of 500") + backoffMagicWords5 = []byte("can not exceed max connections of 500") ) // HTTPWriterConfig is the configuration used to create an HTTPWriter. type HTTPWriterConfig struct { // URL of the host, in form "http://example.com:8086/" Host string - + // Name of the target database into which points will be written. Database string - + // Debug label for more informative errors. DebugInfo string } @@ -43,7 +43,7 @@ type HTTPWriterConfig struct { // HTTPWriter is a Writer that writes to an InfluxDB HTTP server. type HTTPWriter struct { client fasthttp.Client - + c HTTPWriterConfig url []byte } @@ -54,7 +54,7 @@ func NewHTTPWriter(c HTTPWriterConfig, consistency string) *HTTPWriter { client: fasthttp.Client{ Name: httpClientName, }, - + c: c, url: []byte(c.Host + "/write?consistency=" + consistency + "&db=" + url.QueryEscape(c.Database)), } @@ -97,10 +97,10 @@ func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) req := fasthttp.AcquireRequest() defer fasthttp.ReleaseRequest(req) w.initializeReq(req, body, isGzip) - + resp := fasthttp.AcquireResponse() defer fasthttp.ReleaseResponse(resp) - + return w.executeReq(req, resp) } From 51c88d9a9b394819cd0570c6d7e32a361bf91a22 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Fri, 14 Jun 2024 10:59:37 +0800 Subject: [PATCH 12/13] fix: back pressure messages for cnosdb-2.3 --- cmd/load_cnosdb/http_writer.go | 21 ++++----------------- cmd/load_cnosdb/http_writer_test.go | 26 +++----------------------- 2 files changed, 7 insertions(+), 40 deletions(-) diff --git a/cmd/load_cnosdb/http_writer.go b/cmd/load_cnosdb/http_writer.go index 58ae942..bea1dca 100644 --- a/cmd/load_cnosdb/http_writer.go +++ b/cmd/load_cnosdb/http_writer.go @@ -19,14 +19,9 @@ const ( ) var ( - errBackoff = fmt.Errorf("backpressure is needed") - backoffMagicWords0 = []byte("engine: cache maximum memory size exceeded") - backoffMagicWords1 = []byte("write failed: hinted handoff queue not empty") - backoffMagicWords2a = []byte("write failed: read message type: read tcp") - backoffMagicWords2b = []byte("i/o timeout") - backoffMagicWords3 = []byte("write failed: engine: cache-max-memory-size exceeded") - backoffMagicWords4 = []byte("timeout") - backoffMagicWords5 = []byte("write failed: can not exceed max connections of 500") + errBackoff = fmt.Errorf("backpressure is needed") + backoffMagicWords0 = []byte("Memory Exhausted Retry Later") + backoffMagicWords4 = []byte("timeout") ) // HTTPWriterConfig is the configuration used to create an HTTPWriter. @@ -89,7 +84,7 @@ func (w *HTTPWriter) executeReq(req *fasthttp.Request, resp *fasthttp.Response) lat := time.Since(start).Nanoseconds() if err == nil { sc := resp.StatusCode() - if sc == 500 && backpressurePred(resp.Body()) { + if sc == 422 && backpressurePred(resp.Body()) { err = errBackoff } else if sc != fasthttp.StatusOK { err = fmt.Errorf("[DebugInfo: %s] Invalid write response (status %d): %s", w.c.DebugInfo, sc, resp.Body()) @@ -115,16 +110,8 @@ func (w *HTTPWriter) WriteLineProtocol(body []byte, isGzip bool) (int64, error) func backpressurePred(body []byte) bool { if bytes.Contains(body, backoffMagicWords0) { return true - } else if bytes.Contains(body, backoffMagicWords1) { - return true - } else if bytes.Contains(body, backoffMagicWords2a) && bytes.Contains(body, backoffMagicWords2b) { - return true - } else if bytes.Contains(body, backoffMagicWords3) { - return true } else if bytes.Contains(body, backoffMagicWords4) { return true - } else if bytes.Contains(body, backoffMagicWords5) { - return true } else { return false } diff --git a/cmd/load_cnosdb/http_writer_test.go b/cmd/load_cnosdb/http_writer_test.go index 170ae4e..0bead1a 100644 --- a/cmd/load_cnosdb/http_writer_test.go +++ b/cmd/load_cnosdb/http_writer_test.go @@ -37,7 +37,7 @@ func runHTTPServer(c chan struct{}) { coinflip := atomic.AddInt64(&i, 1) if coinflip%2 == 1 { w.WriteHeader(http.StatusInternalServerError) - fmt.Fprintf(w, string(backoffMagicWords1)) + fmt.Fprintf(w, string(backoffMagicWords0)) } else { w.WriteHeader(http.StatusNoContent) fmt.Fprintf(w, "") @@ -198,33 +198,13 @@ func TestBackpressurePred(t *testing.T) { want: true, }, { - body: "yadda" + string(backoffMagicWords1), - want: true, - }, - { - body: string(backoffMagicWords2a), - want: false, // need both magic strings or it fails - }, - { - body: string(backoffMagicWords2a) + " AND " + string(backoffMagicWords2b), - want: true, - }, - { - body: string(backoffMagicWords3) + " yadda", - want: true, + body: string(backoffMagicWords0[2:]), + want: false, }, { body: "yadda " + string(backoffMagicWords4) + " yadda", want: true, }, - { - body: "foo " + string(backoffMagicWords5) + " yadda", - want: true, - }, - { - body: string(backoffMagicWords0[2:]), - want: false, - }, } for _, c := range cases { From b78fb3101379c73a7127a7dcb3622906f1311bd9 Mon Sep 17 00:00:00 2001 From: zipper-meng Date: Thu, 7 Nov 2024 17:40:02 +0800 Subject: [PATCH 13/13] feat: add arg --username and --password for load_cnosdb and load_queries_cnosdb --- cmd/load_cnosdb/creator.go | 12 ++++++++--- cmd/load_cnosdb/http_writer.go | 10 +++------ cmd/load_cnosdb/main.go | 29 ++++++++++++++++++--------- cmd/run_queries_cnosdb/http_client.go | 13 +++--------- cmd/run_queries_cnosdb/main.go | 10 +++++++++ 5 files changed, 45 insertions(+), 29 deletions(-) diff --git a/cmd/load_cnosdb/creator.go b/cmd/load_cnosdb/creator.go index 8f9cb87..272996b 100644 --- a/cmd/load_cnosdb/creator.go +++ b/cmd/load_cnosdb/creator.go @@ -41,7 +41,9 @@ func (d *dbCreator) listDatabases() ([]string, error) { if err != nil { return nil, err } - req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) + if basicAuth != "" { + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth) + } client := &http.Client{} resp, err := client.Do(req) @@ -80,7 +82,9 @@ func (d *dbCreator) RemoveOldDB(dbName string) error { if err != nil { return err } - req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) + if basicAuth != "" { + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth) + } client := &http.Client{} resp, err := client.Do(req) @@ -110,7 +114,9 @@ func (d *dbCreator) CreateDB(dbName string) error { if err != nil { return err } - req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) + if basicAuth != "" { + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth) + } client := &http.Client{} resp, err := client.Do(req) diff --git a/cmd/load_cnosdb/http_writer.go b/cmd/load_cnosdb/http_writer.go index bea1dca..469a8bb 100644 --- a/cmd/load_cnosdb/http_writer.go +++ b/cmd/load_cnosdb/http_writer.go @@ -4,7 +4,6 @@ package main import ( "bytes" - "encoding/base64" "fmt" "net/url" "time" @@ -61,16 +60,13 @@ var ( textPlain = []byte("text/plain") ) -func basicAuth(username, password string) string { - auth := username + ":" + password - return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) -} - func (w *HTTPWriter) initializeReq(req *fasthttp.Request, body []byte, isGzip bool) { req.Header.SetContentTypeBytes(textPlain) req.Header.SetMethodBytes(methodPost) req.Header.SetRequestURIBytes(w.url) - req.Header.Add(fasthttp.HeaderAuthorization, basicAuth("root", "")) + if basicAuth != "" { + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth) + } if isGzip { req.Header.Add(headerContentEncoding, headerGzip) diff --git a/cmd/load_cnosdb/main.go b/cmd/load_cnosdb/main.go index b269d0c..773697c 100644 --- a/cmd/load_cnosdb/main.go +++ b/cmd/load_cnosdb/main.go @@ -7,12 +7,13 @@ package main import ( "bufio" "bytes" + "encoding/base64" "fmt" "log" "strings" "sync" "time" - + "github.com/blagojts/viper" "github.com/cnosdb/tsdb-comparisons/internal/utils" "github.com/cnosdb/tsdb-comparisons/load" @@ -30,6 +31,7 @@ var ( useGzip bool doAbortOnExist bool consistency string + basicAuth string ) // Global vars @@ -55,31 +57,34 @@ func init() { target = initializers.GetTarget(constants.FormatCnosDB) config = load.BenchmarkRunnerConfig{} config.AddToFlagSet(pflag.CommandLine) + pflag.CommandLine.String("username", "root", "Basic access authentication username") + pflag.CommandLine.String("password", "", "Basic access authentication password") + target.TargetSpecificFlags("", pflag.CommandLine) var csvDaemonURLs string - + pflag.Parse() - + err := utils.SetupConfigFile() - + if err != nil { panic(fmt.Errorf("fatal error config file: %s", err)) } - + if err := viper.Unmarshal(&config); err != nil { panic(fmt.Errorf("unable to decode config: %s", err)) } - + csvDaemonURLs = viper.GetString("urls") replicationFactor = viper.GetInt("replication-factor") consistency = viper.GetString("consistency") backoff = viper.GetDuration("backoff") useGzip = viper.GetBool("gzip") - + if _, ok := consistencyChoices[consistency]; !ok { log.Fatalf("invalid consistency settings") } - + daemonURLs = strings.Split(csvDaemonURLs, ",") if len(daemonURLs) == 0 { log.Fatal("missing 'urls' flag") @@ -116,6 +121,12 @@ func main() { return bytes.NewBuffer(make([]byte, 0, 4*1024*1024)) }, } - + + username := viper.GetString("username") + password := viper.GetString("password") + if username != "" || password != "" { + basicAuth = "Basic " + base64.StdEncoding.EncodeToString([]byte(username+":"+password)) + } + loader.RunBenchmark(&benchmark{}) } diff --git a/cmd/run_queries_cnosdb/http_client.go b/cmd/run_queries_cnosdb/http_client.go index 975c19d..7413c9d 100644 --- a/cmd/run_queries_cnosdb/http_client.go +++ b/cmd/run_queries_cnosdb/http_client.go @@ -2,9 +2,9 @@ package main import ( "bytes" - "encoding/base64" "encoding/json" "fmt" + "github.com/valyala/fasthttp" "io" "net/http" "net/url" @@ -49,15 +49,9 @@ func NewHTTPClient(url string) *HTTPClient { client: getHttpClient(), url: []byte(url), urlPrefixLen: len(url), - basicAuth: basicAuth("root", ""), } } -func basicAuth(username, password string) string { - auth := username + ":" + password - return "Basic " + base64.StdEncoding.EncodeToString([]byte(auth)) -} - // Do performs the action specified by the given Query. It uses fasthttp, and // tries to minimize heap allocations. func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, err error) { @@ -69,9 +63,8 @@ func (w *HTTPClient) Do(q *query.HTTP, opts *HTTPClientDoOptions) (lag float64, if err != nil { panic(err) } - req.Header.Add("Authorization", w.basicAuth) - if err != nil { - panic(err) + if basicAuth != "" { + req.Header.Add(fasthttp.HeaderAuthorization, basicAuth) } // Perform the request while tracking latency: diff --git a/cmd/run_queries_cnosdb/main.go b/cmd/run_queries_cnosdb/main.go index f81c04e..1b88235 100644 --- a/cmd/run_queries_cnosdb/main.go +++ b/cmd/run_queries_cnosdb/main.go @@ -6,6 +6,7 @@ package main import ( + "encoding/base64" "fmt" "log" "strings" @@ -20,6 +21,7 @@ import ( var ( daemonUrls []string chunkSize uint64 + basicAuth string ) // Global vars: @@ -31,6 +33,8 @@ var ( func init() { var config query.BenchmarkRunnerConfig config.AddToFlagSet(pflag.CommandLine) + pflag.CommandLine.String("username", "root", "Basic access authentication username") + pflag.CommandLine.String("password", "", "Basic access authentication password") var csvDaemonUrls string pflag.String("urls", "http://localhost:8902/", "Daemon URLs, comma-separated. Will be used in a round-robin fashion.") @@ -64,6 +68,12 @@ func init() { } func main() { + username := viper.GetString("username") + password := viper.GetString("password") + if username != "" || password != "" { + basicAuth = "Basic " + base64.StdEncoding.EncodeToString([]byte(username+":"+password)) + } + runner.Run(&query.HTTPPool, newProcessor) }