Merge pull request #29493 from taosdata/refactor/code-readability

refactor(keeper): modify log and optimize code
This commit is contained in:
She Yanjie 2025-02-07 10:36:48 +08:00 committed by GitHub
commit c91c8c57e9
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 20 additions and 33 deletions

View File

@ -332,7 +332,7 @@ func (r *Reporter) handlerFunc() gin.HandlerFunc {
logger.Tracef("report data:%s", string(data)) logger.Tracef("report data:%s", string(data))
if e := json.Unmarshal(data, &report); e != nil { if e := json.Unmarshal(data, &report); e != nil {
logger.Errorf("error occurred while unmarshal request, data:%s, error:%s", data, err) logger.Errorf("error occurred while unmarshal request, data:%s, error:%v", data, e)
return return
} }
var sqls []string var sqls []string

View File

@ -58,7 +58,7 @@ func NewCommand(conf *config.Config) *Command {
panic(err) panic(err)
} }
imp := &Command{ return &Command{
client: client, client: client,
conn: conn, conn: conn,
username: conf.TDengine.Username, username: conf.TDengine.Username,
@ -70,7 +70,6 @@ func NewCommand(conf *config.Config) *Command {
RawQuery: fmt.Sprintf("db=%s&precision=ms", conf.Metrics.Database.Name), RawQuery: fmt.Sprintf("db=%s&precision=ms", conf.Metrics.Database.Name),
}, },
} }
return imp
} }
func (cmd *Command) Process(conf *config.Config) { func (cmd *Command) Process(conf *config.Config) {
@ -101,7 +100,7 @@ func (cmd *Command) Process(conf *config.Config) {
} }
func (cmd *Command) ProcessTransfer(conf *config.Config) { func (cmd *Command) ProcessTransfer(conf *config.Config) {
fromTime, err := time.Parse("2006-01-02T15:04:05Z07:00", conf.FromTime) fromTime, err := time.Parse(time.RFC3339, conf.FromTime)
if err != nil { if err != nil {
logger.Errorf("parse fromTime error, msg:%s", err) logger.Errorf("parse fromTime error, msg:%s", err)
return return
@ -156,6 +155,7 @@ func (cmd *Command) TransferTaosdDnodesInfo() error {
dstTable := "taosd_dnodes_info" dstTable := "taosd_dnodes_info"
return cmd.TransferTableToDst(sql, dstTable, 3) return cmd.TransferTableToDst(sql, dstTable, 3)
} }
func (cmd *Command) TransferTaosdDnodesStatus() error { func (cmd *Command) TransferTaosdDnodesStatus() error {
sql := "select cluster_id, dnode_id, dnode_ep, CASE status WHEN 'ready' THEN 1 ELSE 0 END as status, ts from d_info a where " sql := "select cluster_id, dnode_id, dnode_ep, CASE status WHEN 'ready' THEN 1 ELSE 0 END as status, ts from d_info a where "
dstTable := "taosd_dnodes_status" dstTable := "taosd_dnodes_status"
@ -167,6 +167,7 @@ func (cmd *Command) TransferTaosdDnodesLogDirs1() error {
dstTable := "taosd_dnodes_log_dirs" dstTable := "taosd_dnodes_log_dirs"
return cmd.TransferTableToDst(sql, dstTable, 4) return cmd.TransferTableToDst(sql, dstTable, 4)
} }
func (cmd *Command) TransferTaosdDnodesLogDirs2() error { func (cmd *Command) TransferTaosdDnodesLogDirs2() error {
sql := "select cluster_id, dnode_id, dnode_ep, name as log_dir_name, avail, used, total, ts from temp_dir a where " sql := "select cluster_id, dnode_id, dnode_ep, name as log_dir_name, avail, used, total, ts from temp_dir a where "
dstTable := "taosd_dnodes_log_dirs" dstTable := "taosd_dnodes_log_dirs"
@ -224,13 +225,12 @@ func (cmd *Command) ProcessDrop(conf *config.Config) {
} }
func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum int) { func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum int) {
var buf bytes.Buffer
if len(data.Data) < 1 { if len(data.Data) < 1 {
return return
} }
var buf bytes.Buffer
for _, row := range data.Data { for _, row := range data.Data {
// get one row here // get one row here
buf.WriteString(dstTable) buf.WriteString(dstTable)
@ -262,7 +262,6 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in
// write metrics // write metrics
for j := tagNum; j < len(row)-1; j++ { for j := tagNum; j < len(row)-1; j++ {
switch v := row[j].(type) { switch v := row[j].(type) {
case int: case int:
buf.WriteString(fmt.Sprintf("%s=%ff64", data.Head[j], float64(v))) buf.WriteString(fmt.Sprintf("%s=%ff64", data.Head[j], float64(v)))
@ -292,8 +291,7 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in
if logger.Logger.IsLevelEnabled(logrus.TraceLevel) { if logger.Logger.IsLevelEnabled(logrus.TraceLevel) {
logger.Tracef("buf:%v", buf.String()) logger.Tracef("buf:%v", buf.String())
} }
err := cmd.lineWriteBody(&buf) if err := cmd.lineWriteBody(&buf); err != nil {
if err != nil {
logger.Errorf("insert data error, msg:%s", err) logger.Errorf("insert data error, msg:%s", err)
panic(err) panic(err)
} }
@ -305,8 +303,7 @@ func (cmd *Command) TransferDataToDest(data *db.Data, dstTable string, tagNum in
if logger.Logger.IsLevelEnabled(logrus.TraceLevel) { if logger.Logger.IsLevelEnabled(logrus.TraceLevel) {
logger.Tracef("buf:%v", buf.String()) logger.Tracef("buf:%v", buf.String())
} }
err := cmd.lineWriteBody(&buf) if err := cmd.lineWriteBody(&buf); err != nil {
if err != nil {
logger.Errorf("insert data error, msg:%s", err) logger.Errorf("insert data error, msg:%s", err)
panic(err) panic(err)
} }
@ -401,7 +398,6 @@ func (cmd *Command) TransferTaosdClusterBasicInfo() error {
// cluster_info // cluster_info
func (cmd *Command) TransferTableToDst(sql string, dstTable string, tagNum int) error { func (cmd *Command) TransferTableToDst(sql string, dstTable string, tagNum int) error {
ctx := context.Background() ctx := context.Background()
endTime := time.Now() endTime := time.Now()
@ -445,13 +441,12 @@ func (cmd *Command) lineWriteBody(buf *bytes.Buffer) error {
req.Body = io.NopCloser(buf) req.Body = io.NopCloser(buf)
resp, err := cmd.client.Do(req) resp, err := cmd.client.Do(req)
if err != nil { if err != nil {
logger.Errorf("writing metrics exception, msg:%s", err) logger.Errorf("writing metrics exception, msg:%s", err)
return err return err
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusNoContent { if resp.StatusCode != http.StatusNoContent {
body, _ := io.ReadAll(resp.Body) body, _ := io.ReadAll(resp.Body)
return fmt.Errorf("unexpected status code %d:body:%s", resp.StatusCode, string(body)) return fmt.Errorf("unexpected status code %d:body:%s", resp.StatusCode, string(body))

View File

@ -112,8 +112,7 @@ var once sync.Once
func ConfigLog() { func ConfigLog() {
once.Do(func() { once.Do(func() {
err := SetLevel(config.Conf.LogLevel) if err := SetLevel(config.Conf.LogLevel); err != nil {
if err != nil {
panic(err) panic(err)
} }
writer, err := rotatelogs.New( writer, err := rotatelogs.New(

View File

@ -41,33 +41,31 @@ func (s *sysMonitor) collect() {
} }
s.Lock() s.Lock()
defer s.Unlock()
for output := range s.outputs { for output := range s.outputs {
select { select {
case output <- *s.status: case output <- *s.status:
default: default:
} }
} }
s.Unlock()
} }
func (s *sysMonitor) Register(c chan<- SysStatus) { func (s *sysMonitor) Register(c chan<- SysStatus) {
s.Lock() s.Lock()
defer s.Unlock()
if s.outputs == nil { if s.outputs == nil {
s.outputs = map[chan<- SysStatus]struct{}{ s.outputs = map[chan<- SysStatus]struct{}{}
c: {},
}
} else {
s.outputs[c] = struct{}{}
} }
s.Unlock() s.outputs[c] = struct{}{}
} }
func (s *sysMonitor) Deregister(c chan<- SysStatus) { func (s *sysMonitor) Deregister(c chan<- SysStatus) {
s.Lock() s.Lock()
defer s.Unlock()
if s.outputs != nil { if s.outputs != nil {
delete(s.outputs, c) delete(s.outputs, c)
} }
s.Unlock()
} }
var SysMonitor = &sysMonitor{status: &SysStatus{}} var SysMonitor = &sysMonitor{status: &SysStatus{}}

View File

@ -60,7 +60,7 @@ func EscapeInfluxProtocol(s string) string {
} }
func GetCfg() *config.Config { func GetCfg() *config.Config {
c := &config.Config{ return &config.Config{
InstanceID: 64, InstanceID: 64,
Port: 6043, Port: 6043,
LogLevel: "trace", LogLevel: "trace",
@ -87,7 +87,6 @@ func GetCfg() *config.Config {
ReservedDiskSize: 1073741824, ReservedDiskSize: 1073741824,
}, },
} }
return c
} }
func SafeSubstring(s string, n int) string { func SafeSubstring(s string, n int) string {
@ -123,8 +122,7 @@ func GetQidOwn() uint64 {
atomic.StoreUint64(&globalCounter64, 1) atomic.StoreUint64(&globalCounter64, 1)
id = 1 id = 1
} }
qid64 := uint64(config.Conf.InstanceID)<<56 | id return uint64(config.Conf.InstanceID)<<56 | id
return qid64
} }
func GetMd5HexStr(str string) string { func GetMd5HexStr(str string) string {
@ -138,7 +136,6 @@ func isValidChar(r rune) bool {
func ToValidTableName(input string) string { func ToValidTableName(input string) string {
var builder strings.Builder var builder strings.Builder
for _, r := range input { for _, r := range input {
if isValidChar(r) { if isValidChar(r) {
builder.WriteRune(unicode.ToLower(r)) builder.WriteRune(unicode.ToLower(r))
@ -146,7 +143,5 @@ func ToValidTableName(input string) string {
builder.WriteRune('_') builder.WriteRune('_')
} }
} }
return builder.String()
result := builder.String()
return result
} }