From 57e48a5ae613961ef835f6b8200e95eba98b5c66 Mon Sep 17 00:00:00 2001 From: qevolg <2227465945@qq.com> Date: Mon, 6 Jan 2025 22:35:08 +0800 Subject: [PATCH 1/2] refactor(keeper): modify log and optimize code --- tools/keeper/api/report.go | 2 +- tools/keeper/cmd/command.go | 6 ++---- tools/keeper/monitor/system.go | 14 ++++++-------- 3 files changed, 9 insertions(+), 13 deletions(-) diff --git a/tools/keeper/api/report.go b/tools/keeper/api/report.go index 876d99da2d..86658c3ea6 100644 --- a/tools/keeper/api/report.go +++ b/tools/keeper/api/report.go @@ -332,7 +332,7 @@ func (r *Reporter) handlerFunc() gin.HandlerFunc { logger.Tracef("report data:%s", string(data)) if e := json.Unmarshal(data, &report); e != nil { - logger.Errorf("error occurred while unmarshal request, data:%s, error:%s", data, err) + logger.Errorf("error occurred while unmarshal request, data:%s, error:%v", data, e) return } var sqls []string diff --git a/tools/keeper/cmd/command.go b/tools/keeper/cmd/command.go index 33d2482628..25230bdaef 100644 --- a/tools/keeper/cmd/command.go +++ b/tools/keeper/cmd/command.go @@ -58,7 +58,7 @@ func NewCommand(conf *config.Config) *Command { panic(err) } - imp := &Command{ + return &Command{ client: client, conn: conn, username: conf.TDengine.Username, @@ -70,7 +70,6 @@ func NewCommand(conf *config.Config) *Command { RawQuery: fmt.Sprintf("db=%s&precision=ms", conf.Metrics.Database.Name), }, } - return imp } func (cmd *Command) Process(conf *config.Config) { @@ -101,7 +100,7 @@ func (cmd *Command) Process(conf *config.Config) { } func (cmd *Command) ProcessTransfer(conf *config.Config) { - fromTime, err := time.Parse("2006-01-02T15:04:05Z07:00", conf.FromTime) + fromTime, err := time.Parse(time.RFC3339, conf.FromTime) if err != nil { logger.Errorf("parse fromTime error, msg:%s", err) return @@ -401,7 +400,6 @@ func (cmd *Command) TransferTaosdClusterBasicInfo() error { // cluster_info func (cmd *Command) TransferTableToDst(sql string, dstTable string, tagNum int) error { - ctx := context.Background() endTime := time.Now() diff --git a/tools/keeper/monitor/system.go b/tools/keeper/monitor/system.go index 7d5ef5bd54..e088be5fc1 100644 --- a/tools/keeper/monitor/system.go +++ b/tools/keeper/monitor/system.go @@ -41,33 +41,31 @@ func (s *sysMonitor) collect() { } s.Lock() + defer s.Unlock() + for output := range s.outputs { select { case output <- *s.status: default: } } - s.Unlock() } func (s *sysMonitor) Register(c chan<- SysStatus) { s.Lock() + defer s.Unlock() if s.outputs == nil { - s.outputs = map[chan<- SysStatus]struct{}{ - c: {}, - } - } else { - s.outputs[c] = struct{}{} + s.outputs = map[chan<- SysStatus]struct{}{} } - s.Unlock() + s.outputs[c] = struct{}{} } func (s *sysMonitor) Deregister(c chan<- SysStatus) { s.Lock() + defer s.Unlock() if s.outputs != nil { delete(s.outputs, c) } - s.Unlock() } var SysMonitor = &sysMonitor{status: &SysStatus{}} From c47c62165978f2fdbb91c11897e39ccef2c7855f Mon Sep 17 00:00:00 2001 From: qevolg <2227465945@qq.com> Date: Thu, 16 Jan 2025 09:17:16 +0800 Subject: [PATCH 2/2] refactor(keeper): optimize code --- tools/keeper/cmd/command.go | 17 +++++++---------- tools/keeper/infrastructure/log/log.go | 3 +-- tools/keeper/util/util.go | 11 +++-------- 3 files changed, 11 insertions(+), 20 deletions(-) diff --git a/tools/keeper/cmd/command.go b/tools/keeper/cmd/command.go index 25230bdaef..bebb098bec 100644 --- a/tools/keeper/cmd/command.go +++ b/tools/keeper/cmd/command.go @@ -155,6 +155,7 @@ func (cmd *Command) TransferTaosdDnodesInfo() error { dstTable := "taosd_dnodes_info" return cmd.TransferTableToDst(sql, dstTable, 3) } + func (cmd *Command) TransferTaosdDnodesStatus() error { sql := "select cluster_id, dnode_id, dnode_ep, CASE status WHEN 'ready' THEN 1 ELSE 0 END as status, ts from d_info a where " dstTable := "taosd_dnodes_status" @@ -166,6 +167,7 @@ func (cmd *Command) TransferTaosdDnodesLogDirs1() error { dstTable := "taosd_dnodes_log_dirs" return cmd.TransferTableToDst(sql, dstTable, 4) } + func (cmd *Command) TransferTaosdDnodesLogDirs2() error { sql := "select cluster_id, dnode_id, dnode_ep, name as log_dir_name, avail, used, total, ts from temp_dir a where " dstTable := "taosd_dnodes_log_dirs" @@ -223,13 +225,12 @@ func (cmd *Command) ProcessDrop(conf *config.Config) { } func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum int) { - - var buf bytes.Buffer - if len(data.Data) < 1 { return } + var buf bytes.Buffer + for _, row := range data.Data { // get one row here buf.WriteString(dstTable) @@ -261,7 +262,6 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in // write metrics for j := tagNum; j < len(row)-1; j++ { - switch v := row[j].(type) { case int: buf.WriteString(fmt.Sprintf("%s=%ff64", data.Head[j], float64(v))) @@ -291,8 +291,7 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in if logger.Logger.IsLevelEnabled(logrus.TraceLevel) { logger.Tracef("buf:%v", buf.String()) } - err := cmd.lineWriteBody(&buf) - if err != nil { + if err := cmd.lineWriteBody(&buf); err != nil { logger.Errorf("insert data error, msg:%s", err) panic(err) } @@ -304,8 +303,7 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in if logger.Logger.IsLevelEnabled(logrus.TraceLevel) { logger.Tracef("buf:%v", buf.String()) } - err := cmd.lineWriteBody(&buf) - if err != nil { + if err := cmd.lineWriteBody(&buf); err != nil { logger.Errorf("insert data error, msg:%s", err) panic(err) } @@ -443,13 +441,12 @@ func (cmd *Command) lineWriteBody(buf *bytes.Buffer) error { req.Body = io.NopCloser(buf) resp, err := cmd.client.Do(req) - if err != nil { logger.Errorf("writing metrics exception, msg:%s", err) return err } - defer resp.Body.Close() + if resp.StatusCode != http.StatusNoContent { body, _ := io.ReadAll(resp.Body) return fmt.Errorf("unexpected status code %d:body:%s", resp.StatusCode, string(body)) diff --git a/tools/keeper/infrastructure/log/log.go b/tools/keeper/infrastructure/log/log.go index bd7b026897..32a28bd1a7 100644 --- a/tools/keeper/infrastructure/log/log.go +++ b/tools/keeper/infrastructure/log/log.go @@ -112,8 +112,7 @@ var once sync.Once func ConfigLog() { once.Do(func() { - err := SetLevel(config.Conf.LogLevel) - if err != nil { + if err := SetLevel(config.Conf.LogLevel); err != nil { panic(err) } writer, err := rotatelogs.New( diff --git a/tools/keeper/util/util.go b/tools/keeper/util/util.go index 6711db71e1..e5d92b8291 100644 --- a/tools/keeper/util/util.go +++ b/tools/keeper/util/util.go @@ -60,7 +60,7 @@ func EscapeInfluxProtocol(s string) string { } func GetCfg() *config.Config { - c := &config.Config{ + return &config.Config{ InstanceID: 64, Port: 6043, LogLevel: "trace", @@ -87,7 +87,6 @@ func GetCfg() *config.Config { ReservedDiskSize: 1073741824, }, } - return c } func SafeSubstring(s string, n int) string { @@ -123,8 +122,7 @@ func GetQidOwn() uint64 { atomic.StoreUint64(&globalCounter64, 1) id = 1 } - qid64 := uint64(config.Conf.InstanceID)<<56 | id - return qid64 + return uint64(config.Conf.InstanceID)<<56 | id } func GetMd5HexStr(str string) string { @@ -138,7 +136,6 @@ func isValidChar(r rune) bool { func ToValidTableName(input string) string { var builder strings.Builder - for _, r := range input { if isValidChar(r) { builder.WriteRune(unicode.ToLower(r)) @@ -146,7 +143,5 @@ func ToValidTableName(input string) string { builder.WriteRune('_') } } - - result := builder.String() - return result + return builder.String() }