From daa2c2238ae4d3b1502634926b2c46b8f4ba5379 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 6 Dec 2023 07:48:40 +0000 Subject: [PATCH 1/4] fix/TS-4251 --- include/common/tglobal.h | 5 +-- include/libs/audit/audit.h | 19 +++++++++-- source/common/src/tglobal.c | 8 +++-- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 3 ++ source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 4 +++ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 35 +++++++++++++++++++++ source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 1 + source/dnode/mgmt/node_mgmt/src/dmEnv.c | 2 ++ source/dnode/mgmt/node_mgmt/src/dmMonitor.c | 6 ++++ source/dnode/mgmt/node_util/inc/dmUtil.h | 2 ++ source/dnode/vnode/src/vnd/vnodeSvr.c | 8 +++-- source/libs/audit/inc/auditInt.h | 3 ++ source/libs/audit/src/auditMain.c | 33 +++++++++++++++++++ 13 files changed, 121 insertions(+), 8 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 33cfada338..91f8bbc7f3 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -107,8 +107,9 @@ extern int32_t tsMonitorMaxLogs; extern bool tsMonitorComp; // audit -extern bool tsEnableAudit; -extern bool tsEnableAuditCreateTable; +extern bool tsEnableAudit; +extern bool tsEnableAuditCreateTable; +extern int32_t tsAuditInterval; // telem extern bool tsEnableTelem; diff --git a/include/libs/audit/audit.h b/include/libs/audit/audit.h index 85d462b96b..dd3df27866 100644 --- a/include/libs/audit/audit.h +++ b/include/libs/audit/audit.h @@ -23,13 +23,13 @@ #include "tjson.h" #include "tmsgcb.h" #include "trpc.h" -#include "mnode.h" #ifdef __cplusplus extern "C" { #endif #define AUDIT_DETAIL_MAX 65472 +#define AUDIT_OPERATION_LEN 20 typedef struct { const char *server; @@ -37,13 +37,28 @@ typedef struct { bool comp; } SAuditCfg; +typedef struct { + int64_t curTime; + char strClusterId[TSDB_CLUSTER_ID_LEN]; + char clientAddress[50]; + char user[TSDB_USER_LEN]; + char operation[AUDIT_OPERATION_LEN]; + char target1[TSDB_DB_NAME_LEN]; //put db name + char target2[TSDB_STREAM_NAME_LEN]; //put stb name, table name, topic name, user name, stream name, use max + char* detail; +} SAuditRecord; + int32_t auditInit(const SAuditCfg *pCfg); +void auditCleanup(); void auditSend(SJson *pJson); void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len); +void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len); +void auditSendRecordsInBatch(); #ifdef __cplusplus } #endif -#endif /*_TD_MONITOR_H_*/ +#endif /*_TD_AUDIT_H_*/ diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 39cd2d604b..44acab73e2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -95,8 +95,9 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // audit -bool tsEnableAudit = true; -bool tsEnableAuditCreateTable = true; +bool tsEnableAudit = true; +bool tsEnableAuditCreateTable = true; +int32_t tsAuditInterval = 100; // telem #ifdef TD_ENTERPRISE @@ -686,6 +687,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1137,6 +1140,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsEnableAudit = cfgGetItem(pCfg, "audit")->bval; tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval; + tsAuditInterval = cfgGetItem(pCfg, "auditInterval")->i32; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 9e43c2af47..a0d16cc4ea 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -30,12 +30,14 @@ typedef struct SDnodeMgmt { TdThread statusThread; TdThread notifyThread; TdThread monitorThread; + TdThread auditThread; TdThread crashReportThread; SSingleWorker mgmtWorker; ProcessCreateNodeFp processCreateNodeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessDropNodeFp processDropNodeFp; SendMonitorReportFp sendMonitorReportFp; + SendAuditRecordsFp sendAuditRecordsFp; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; @@ -62,6 +64,7 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt); int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt); void dmStopNotifyThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); +int32_t dmStartAuditThread(SDnodeMgmt *pMgmt); void dmStopMonitorThread(SDnodeMgmt *pMgmt); int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt); void dmStopCrashReportThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 4bd32cac20..960d4afd8b 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -29,6 +29,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { if (dmStartMonitorThread(pMgmt) != 0) { return -1; } + if (dmStartAuditThread(pMgmt) != 0) { + return -1; + } if (dmStartCrashReportThread(pMgmt) != 0) { return -1; } @@ -60,6 +63,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; + pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index d6bdaf51bc..99c9d52cc9 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -99,6 +99,27 @@ static void *dmMonitorThreadFp(void *param) { return NULL; } +static void *dmAuditThreadFp(void *param) { + SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); + setThreadName("dnode-audit"); + + while (1) { + taosMsleep(100); + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + + int64_t curTime = taosGetTimestampMs(); + if (curTime < lastTime) lastTime = curTime; + float interval = curTime - lastTime; + if (interval >= tsAuditInterval) { + (*pMgmt->sendAuditRecordsFp)(); + lastTime = curTime; + } + } + + return NULL; +} + static void *dmCrashReportThreadFp(void *param) { SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); @@ -218,6 +239,20 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { return 0; } +int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) { + dError("failed to create audit thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + tmsgReportStartup("dnode-audit", "initialized"); + return 0; +} + void dmStopMonitorThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->monitorThread)) { taosThreadJoin(pMgmt->monitorThread, NULL); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 36097438a2..83f9f13c82 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -124,6 +124,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); // dmMonitor.c void dmSendMonitorReport(); +void dmSendAuditRecords(); void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); void dmGetMnodeLoads(SMonMloadInfo *pInfo); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index e0503c83c6..f9bba19fbb 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -189,6 +189,7 @@ void dmCleanup() { if (dmCheckRepeatCleanup(pDnode) != 0) return; dmCleanupDnode(pDnode); monCleanup(); + auditCleanup(); syncCleanUp(); walCleanUp(); udfcClose(); @@ -396,6 +397,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq, .processDropNodeFp = dmProcessDropNodeReq, .sendMonitorReportFp = dmSendMonitorReport, + .sendAuditRecordFp = auditSendRecordsInBatch, .getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getMnodeLoadsFp = dmGetMnodeLoads, diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index b3db7c3058..c42aa6a1ae 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" #include "dmNodes.h" +#include "audit.h" static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { pInfo->protocol = 1; @@ -108,6 +109,11 @@ void dmSendMonitorReport() { monSendReport(); } +//Todo: put this in seperate file in the future +void dmSendAuditRecords() { + auditSendRecordsInBatch(); +} + void dmGetVnodeLoads(SMonVloadInfo *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 0a52c578a5..4769ef8538 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -86,6 +86,7 @@ typedef enum { typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*SendMonitorReportFp)(); +typedef void (*SendAuditRecordsFp)(); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); @@ -120,6 +121,7 @@ typedef struct { ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessDropNodeFp processDropNodeFp; SendMonitorReportFp sendMonitorReportFp; + SendAuditRecordsFp sendAuditRecordFp; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1f951097a4..5da6aabf65 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1008,11 +1008,15 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, taosMemoryFreeClear(*key); } + taosStringBuilderAppendStringLen(&sb, "specailkey", strlen("specailkey")); + size_t len = 0; char* keyJoined = taosStringBuilderGetResult(&sb, &len); + vInfo("create table %s", keyJoined); + if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); + auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); @@ -1236,7 +1240,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in char *keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); + auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); diff --git a/source/libs/audit/inc/auditInt.h b/source/libs/audit/inc/auditInt.h index b6c6ec87e8..e5fed2e473 100644 --- a/source/libs/audit/inc/auditInt.h +++ b/source/libs/audit/inc/auditInt.h @@ -17,9 +17,12 @@ #define _TD_AUDIT_INT_H_ #include "audit.h" +#include "tarray.h" typedef struct { SAuditCfg cfg; + SArray *records; + TdThreadMutex lock; } SAudit; #endif /*_TD_AUDIT_INT_H_*/ diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index c408f0d87b..7616617ff0 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -14,6 +14,8 @@ */ #define _DEFAULT_SOURCE + +#include "tarray.h" #include "auditInt.h" #include "taoserror.h" #include "thttp.h" @@ -21,25 +23,56 @@ #include "tjson.h" #include "tglobal.h" #include "mnode.h" +#include "audit.h" SAudit tsAudit = {0}; char* tsAuditUri = "/audit"; +char* tsAuditBatchUri = "/audit-batch"; int32_t auditInit(const SAuditCfg *pCfg) { tsAudit.cfg = *pCfg; + tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *)); + taosThreadMutexInit(&tsAudit.lock, NULL); return 0; } +void auditCleanup() { + tsLogFp = NULL; + taosArrayDestroy(tsAudit.records); + tsAudit.records = NULL; + taosThreadMutexDestroy(&tsAudit.lock); +} + extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len); +extern void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len); +extern void auditSendRecordsInBatchImp(); void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len) { auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len); } +void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len) { + auditAddRecordImp(pReq, clusterId, operation, target1, target2, detail, len); +} + +void auditSendRecordsInBatch(){ + auditSendRecordsInBatchImp(); +} + #ifndef TD_ENTERPRISE void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len) { } + +void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len) { +} + +void auditSendRecordsInBatchImp(){ + +} #endif From 67e9f695c92431c03dacd023ad409291b1581a47 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 7 Dec 2023 03:13:23 +0000 Subject: [PATCH 2/4] debug info and default setting --- source/common/src/tglobal.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 44acab73e2..2b7ad21cc7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -97,7 +97,7 @@ bool tsMonitorComp = false; // audit bool tsEnableAudit = true; bool tsEnableAuditCreateTable = true; -int32_t tsAuditInterval = 100; +int32_t tsAuditInterval = 500; // telem #ifdef TD_ENTERPRISE @@ -687,7 +687,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5da6aabf65..f77dbc5eee 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1008,13 +1008,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, taosMemoryFreeClear(*key); } - taosStringBuilderAppendStringLen(&sb, "specailkey", strlen("specailkey")); - size_t len = 0; char* keyJoined = taosStringBuilderGetResult(&sb, &len); - vInfo("create table %s", keyJoined); - if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); } From 1547d95742f448e5280ab20b8ffda79279dd00b2 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 8 Dec 2023 04:24:33 +0000 Subject: [PATCH 3/4] break dependency --- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f77dbc5eee..1f951097a4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1012,7 +1012,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, char* keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); + auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); @@ -1236,7 +1236,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in char *keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); + auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); From dad76804e8df96ff35b7c8d2d36227b41a0d9dd3 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 8 Dec 2023 07:26:20 +0000 Subject: [PATCH 4/4] mem leak --- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 1 + source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 1 + source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 7 +++++++ 3 files changed, 9 insertions(+) diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index a0d16cc4ea..80502e2662 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -66,6 +66,7 @@ void dmStopNotifyThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); int32_t dmStartAuditThread(SDnodeMgmt *pMgmt); void dmStopMonitorThread(SDnodeMgmt *pMgmt); +void dmStopAuditThread(SDnodeMgmt *pMgmt); int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt); void dmStopCrashReportThread(SDnodeMgmt *pMgmt); int32_t dmStartWorker(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 960d4afd8b..b9dd45f1c0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -41,6 +41,7 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { static void dmStopMgmt(SDnodeMgmt *pMgmt) { pMgmt->pData->stopped = true; dmStopMonitorThread(pMgmt); + dmStopAuditThread(pMgmt); dmStopStatusThread(pMgmt); #if defined(TD_ENTERPRISE) dmStopNotifyThread(pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 99c9d52cc9..af43804db4 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -260,6 +260,13 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) { } } +void dmStopAuditThread(SDnodeMgmt *pMgmt) { + if (taosCheckPthreadValid(pMgmt->auditThread)) { + taosThreadJoin(pMgmt->auditThread, NULL); + taosThreadClear(&pMgmt->auditThread); + } +} + int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) { if (!tsEnableCrashReport) { return 0;