From daa2c2238ae4d3b1502634926b2c46b8f4ba5379 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 6 Dec 2023 07:48:40 +0000 Subject: [PATCH 01/25] fix/TS-4251 --- include/common/tglobal.h | 5 +-- include/libs/audit/audit.h | 19 +++++++++-- source/common/src/tglobal.c | 8 +++-- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 3 ++ source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 4 +++ source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 35 +++++++++++++++++++++ source/dnode/mgmt/node_mgmt/inc/dmMgmt.h | 1 + source/dnode/mgmt/node_mgmt/src/dmEnv.c | 2 ++ source/dnode/mgmt/node_mgmt/src/dmMonitor.c | 6 ++++ source/dnode/mgmt/node_util/inc/dmUtil.h | 2 ++ source/dnode/vnode/src/vnd/vnodeSvr.c | 8 +++-- source/libs/audit/inc/auditInt.h | 3 ++ source/libs/audit/src/auditMain.c | 33 +++++++++++++++++++ 13 files changed, 121 insertions(+), 8 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 33cfada338..91f8bbc7f3 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -107,8 +107,9 @@ extern int32_t tsMonitorMaxLogs; extern bool tsMonitorComp; // audit -extern bool tsEnableAudit; -extern bool tsEnableAuditCreateTable; +extern bool tsEnableAudit; +extern bool tsEnableAuditCreateTable; +extern int32_t tsAuditInterval; // telem extern bool tsEnableTelem; diff --git a/include/libs/audit/audit.h b/include/libs/audit/audit.h index 85d462b96b..dd3df27866 100644 --- a/include/libs/audit/audit.h +++ b/include/libs/audit/audit.h @@ -23,13 +23,13 @@ #include "tjson.h" #include "tmsgcb.h" #include "trpc.h" -#include "mnode.h" #ifdef __cplusplus extern "C" { #endif #define AUDIT_DETAIL_MAX 65472 +#define AUDIT_OPERATION_LEN 20 typedef struct { const char *server; @@ -37,13 +37,28 @@ typedef struct { bool comp; } SAuditCfg; +typedef struct { + int64_t curTime; + char strClusterId[TSDB_CLUSTER_ID_LEN]; + char clientAddress[50]; + char user[TSDB_USER_LEN]; + char operation[AUDIT_OPERATION_LEN]; + char target1[TSDB_DB_NAME_LEN]; //put db name + char target2[TSDB_STREAM_NAME_LEN]; //put stb name, table name, topic name, user name, stream name, use max + char* detail; +} SAuditRecord; + int32_t auditInit(const SAuditCfg *pCfg); +void auditCleanup(); void auditSend(SJson *pJson); void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len); +void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len); +void auditSendRecordsInBatch(); #ifdef __cplusplus } #endif -#endif /*_TD_MONITOR_H_*/ +#endif /*_TD_AUDIT_H_*/ diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 39cd2d604b..44acab73e2 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -95,8 +95,9 @@ int32_t tsMonitorMaxLogs = 100; bool tsMonitorComp = false; // audit -bool tsEnableAudit = true; -bool tsEnableAuditCreateTable = true; +bool tsEnableAudit = true; +bool tsEnableAuditCreateTable = true; +int32_t tsAuditInterval = 100; // telem #ifdef TD_ENTERPRISE @@ -686,6 +687,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; + if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_BOTH, CFG_DYN_ENT_SERVER) != 0) return -1; @@ -1137,6 +1140,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsEnableAudit = cfgGetItem(pCfg, "audit")->bval; tsEnableAuditCreateTable = cfgGetItem(pCfg, "auditCreateTable")->bval; + tsAuditInterval = cfgGetItem(pCfg, "auditInterval")->i32; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index 9e43c2af47..a0d16cc4ea 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -30,12 +30,14 @@ typedef struct SDnodeMgmt { TdThread statusThread; TdThread notifyThread; TdThread monitorThread; + TdThread auditThread; TdThread crashReportThread; SSingleWorker mgmtWorker; ProcessCreateNodeFp processCreateNodeFp; ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessDropNodeFp processDropNodeFp; SendMonitorReportFp sendMonitorReportFp; + SendAuditRecordsFp sendAuditRecordsFp; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; @@ -62,6 +64,7 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt); int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt); void dmStopNotifyThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); +int32_t dmStartAuditThread(SDnodeMgmt *pMgmt); void dmStopMonitorThread(SDnodeMgmt *pMgmt); int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt); void dmStopCrashReportThread(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 4bd32cac20..960d4afd8b 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -29,6 +29,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { if (dmStartMonitorThread(pMgmt) != 0) { return -1; } + if (dmStartAuditThread(pMgmt) != 0) { + return -1; + } if (dmStartCrashReportThread(pMgmt) != 0) { return -1; } @@ -60,6 +63,7 @@ static int32_t dmOpenMgmt(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->processAlterNodeTypeFp = pInput->processAlterNodeTypeFp; pMgmt->processDropNodeFp = pInput->processDropNodeFp; pMgmt->sendMonitorReportFp = pInput->sendMonitorReportFp; + pMgmt->sendAuditRecordsFp = pInput->sendAuditRecordFp; pMgmt->getVnodeLoadsFp = pInput->getVnodeLoadsFp; pMgmt->getVnodeLoadsLiteFp = pInput->getVnodeLoadsLiteFp; pMgmt->getMnodeLoadsFp = pInput->getMnodeLoadsFp; diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index d6bdaf51bc..99c9d52cc9 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -99,6 +99,27 @@ static void *dmMonitorThreadFp(void *param) { return NULL; } +static void *dmAuditThreadFp(void *param) { + SDnodeMgmt *pMgmt = param; + int64_t lastTime = taosGetTimestampMs(); + setThreadName("dnode-audit"); + + while (1) { + taosMsleep(100); + if (pMgmt->pData->dropped || pMgmt->pData->stopped) break; + + int64_t curTime = taosGetTimestampMs(); + if (curTime < lastTime) lastTime = curTime; + float interval = curTime - lastTime; + if (interval >= tsAuditInterval) { + (*pMgmt->sendAuditRecordsFp)(); + lastTime = curTime; + } + } + + return NULL; +} + static void *dmCrashReportThreadFp(void *param) { SDnodeMgmt *pMgmt = param; int64_t lastTime = taosGetTimestampMs(); @@ -218,6 +239,20 @@ int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt) { return 0; } +int32_t dmStartAuditThread(SDnodeMgmt *pMgmt) { + TdThreadAttr thAttr; + taosThreadAttrInit(&thAttr); + taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE); + if (taosThreadCreate(&pMgmt->auditThread, &thAttr, dmAuditThreadFp, pMgmt) != 0) { + dError("failed to create audit thread since %s", strerror(errno)); + return -1; + } + + taosThreadAttrDestroy(&thAttr); + tmsgReportStartup("dnode-audit", "initialized"); + return 0; +} + void dmStopMonitorThread(SDnodeMgmt *pMgmt) { if (taosCheckPthreadValid(pMgmt->monitorThread)) { taosThreadJoin(pMgmt->monitorThread, NULL); diff --git a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h index 36097438a2..83f9f13c82 100644 --- a/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h +++ b/source/dnode/mgmt/node_mgmt/inc/dmMgmt.h @@ -124,6 +124,7 @@ int32_t dmProcessNodeMsg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); // dmMonitor.c void dmSendMonitorReport(); +void dmSendAuditRecords(); void dmGetVnodeLoads(SMonVloadInfo *pInfo); void dmGetVnodeLoadsLite(SMonVloadInfo *pInfo); void dmGetMnodeLoads(SMonMloadInfo *pInfo); diff --git a/source/dnode/mgmt/node_mgmt/src/dmEnv.c b/source/dnode/mgmt/node_mgmt/src/dmEnv.c index e0503c83c6..f9bba19fbb 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmEnv.c +++ b/source/dnode/mgmt/node_mgmt/src/dmEnv.c @@ -189,6 +189,7 @@ void dmCleanup() { if (dmCheckRepeatCleanup(pDnode) != 0) return; dmCleanupDnode(pDnode); monCleanup(); + auditCleanup(); syncCleanUp(); walCleanUp(); udfcClose(); @@ -396,6 +397,7 @@ SMgmtInputOpt dmBuildMgmtInputOpt(SMgmtWrapper *pWrapper) { .processAlterNodeTypeFp = dmProcessAlterNodeTypeReq, .processDropNodeFp = dmProcessDropNodeReq, .sendMonitorReportFp = dmSendMonitorReport, + .sendAuditRecordFp = auditSendRecordsInBatch, .getVnodeLoadsFp = dmGetVnodeLoads, .getVnodeLoadsLiteFp = dmGetVnodeLoadsLite, .getMnodeLoadsFp = dmGetMnodeLoads, diff --git a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c index b3db7c3058..c42aa6a1ae 100644 --- a/source/dnode/mgmt/node_mgmt/src/dmMonitor.c +++ b/source/dnode/mgmt/node_mgmt/src/dmMonitor.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "dmMgmt.h" #include "dmNodes.h" +#include "audit.h" static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { pInfo->protocol = 1; @@ -108,6 +109,11 @@ void dmSendMonitorReport() { monSendReport(); } +//Todo: put this in seperate file in the future +void dmSendAuditRecords() { + auditSendRecordsInBatch(); +} + void dmGetVnodeLoads(SMonVloadInfo *pInfo) { SDnode *pDnode = dmInstance(); SMgmtWrapper *pWrapper = &pDnode->wrappers[VNODE]; diff --git a/source/dnode/mgmt/node_util/inc/dmUtil.h b/source/dnode/mgmt/node_util/inc/dmUtil.h index 0a52c578a5..4769ef8538 100644 --- a/source/dnode/mgmt/node_util/inc/dmUtil.h +++ b/source/dnode/mgmt/node_util/inc/dmUtil.h @@ -86,6 +86,7 @@ typedef enum { typedef int32_t (*ProcessCreateNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef int32_t (*ProcessDropNodeFp)(EDndNodeType ntype, SRpcMsg *pMsg); typedef void (*SendMonitorReportFp)(); +typedef void (*SendAuditRecordsFp)(); typedef void (*GetVnodeLoadsFp)(SMonVloadInfo *pInfo); typedef void (*GetMnodeLoadsFp)(SMonMloadInfo *pInfo); typedef void (*GetQnodeLoadsFp)(SQnodeLoad *pInfo); @@ -120,6 +121,7 @@ typedef struct { ProcessAlterNodeTypeFp processAlterNodeTypeFp; ProcessDropNodeFp processDropNodeFp; SendMonitorReportFp sendMonitorReportFp; + SendAuditRecordsFp sendAuditRecordFp; GetVnodeLoadsFp getVnodeLoadsFp; GetVnodeLoadsFp getVnodeLoadsLiteFp; GetMnodeLoadsFp getMnodeLoadsFp; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1f951097a4..5da6aabf65 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1008,11 +1008,15 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, taosMemoryFreeClear(*key); } + taosStringBuilderAppendStringLen(&sb, "specailkey", strlen("specailkey")); + size_t len = 0; char* keyJoined = taosStringBuilderGetResult(&sb, &len); + vInfo("create table %s", keyJoined); + if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); + auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); @@ -1236,7 +1240,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in char *keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); + auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); diff --git a/source/libs/audit/inc/auditInt.h b/source/libs/audit/inc/auditInt.h index b6c6ec87e8..e5fed2e473 100644 --- a/source/libs/audit/inc/auditInt.h +++ b/source/libs/audit/inc/auditInt.h @@ -17,9 +17,12 @@ #define _TD_AUDIT_INT_H_ #include "audit.h" +#include "tarray.h" typedef struct { SAuditCfg cfg; + SArray *records; + TdThreadMutex lock; } SAudit; #endif /*_TD_AUDIT_INT_H_*/ diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index c408f0d87b..7616617ff0 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -14,6 +14,8 @@ */ #define _DEFAULT_SOURCE + +#include "tarray.h" #include "auditInt.h" #include "taoserror.h" #include "thttp.h" @@ -21,25 +23,56 @@ #include "tjson.h" #include "tglobal.h" #include "mnode.h" +#include "audit.h" SAudit tsAudit = {0}; char* tsAuditUri = "/audit"; +char* tsAuditBatchUri = "/audit-batch"; int32_t auditInit(const SAuditCfg *pCfg) { tsAudit.cfg = *pCfg; + tsAudit.records = taosArrayInit(0, sizeof(SAuditRecord *)); + taosThreadMutexInit(&tsAudit.lock, NULL); return 0; } +void auditCleanup() { + tsLogFp = NULL; + taosArrayDestroy(tsAudit.records); + tsAudit.records = NULL; + taosThreadMutexDestroy(&tsAudit.lock); +} + extern void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len); +extern void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len); +extern void auditSendRecordsInBatchImp(); void auditRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len) { auditRecordImp(pReq, clusterId, operation, target1, target2, detail, len); } +void auditAddRecord(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len) { + auditAddRecordImp(pReq, clusterId, operation, target1, target2, detail, len); +} + +void auditSendRecordsInBatch(){ + auditSendRecordsInBatchImp(); +} + #ifndef TD_ENTERPRISE void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len) { } + +void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, + char *detail, int32_t len) { +} + +void auditSendRecordsInBatchImp(){ + +} #endif From 67e9f695c92431c03dacd023ad409291b1581a47 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 7 Dec 2023 03:13:23 +0000 Subject: [PATCH 02/25] debug info and default setting --- source/common/src/tglobal.c | 4 ++-- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ---- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 44acab73e2..2b7ad21cc7 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -97,7 +97,7 @@ bool tsMonitorComp = false; // audit bool tsEnableAudit = true; bool tsEnableAuditCreateTable = true; -int32_t tsAuditInterval = 100; +int32_t tsAuditInterval = 500; // telem #ifdef TD_ENTERPRISE @@ -687,7 +687,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddBool(pCfg, "audit", tsEnableAudit, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0) return -1; if (cfgAddBool(pCfg, "auditCreateTable", tsEnableAuditCreateTable, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; - if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) + if (cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0) return -1; if (cfgAddBool(pCfg, "crashReporting", tsEnableCrashReport, CFG_SCOPE_BOTH, CFG_DYN_NONE) != 0) return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 5da6aabf65..f77dbc5eee 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1008,13 +1008,9 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, taosMemoryFreeClear(*key); } - taosStringBuilderAppendStringLen(&sb, "specailkey", strlen("specailkey")); - size_t len = 0; char* keyJoined = taosStringBuilderGetResult(&sb, &len); - vInfo("create table %s", keyJoined); - if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); } From 1547d95742f448e5280ab20b8ffda79279dd00b2 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 8 Dec 2023 04:24:33 +0000 Subject: [PATCH 03/25] break dependency --- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f77dbc5eee..1f951097a4 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1012,7 +1012,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, char* keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); + auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); @@ -1236,7 +1236,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in char *keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); + auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); From dad76804e8df96ff35b7c8d2d36227b41a0d9dd3 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 8 Dec 2023 07:26:20 +0000 Subject: [PATCH 04/25] mem leak --- source/dnode/mgmt/mgmt_dnode/inc/dmInt.h | 1 + source/dnode/mgmt/mgmt_dnode/src/dmInt.c | 1 + source/dnode/mgmt/mgmt_dnode/src/dmWorker.c | 7 +++++++ 3 files changed, 9 insertions(+) diff --git a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h index a0d16cc4ea..80502e2662 100644 --- a/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h +++ b/source/dnode/mgmt/mgmt_dnode/inc/dmInt.h @@ -66,6 +66,7 @@ void dmStopNotifyThread(SDnodeMgmt *pMgmt); int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt); int32_t dmStartAuditThread(SDnodeMgmt *pMgmt); void dmStopMonitorThread(SDnodeMgmt *pMgmt); +void dmStopAuditThread(SDnodeMgmt *pMgmt); int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt); void dmStopCrashReportThread(SDnodeMgmt *pMgmt); int32_t dmStartWorker(SDnodeMgmt *pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c index 960d4afd8b..b9dd45f1c0 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmInt.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmInt.c @@ -41,6 +41,7 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) { static void dmStopMgmt(SDnodeMgmt *pMgmt) { pMgmt->pData->stopped = true; dmStopMonitorThread(pMgmt); + dmStopAuditThread(pMgmt); dmStopStatusThread(pMgmt); #if defined(TD_ENTERPRISE) dmStopNotifyThread(pMgmt); diff --git a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c index 99c9d52cc9..af43804db4 100644 --- a/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c +++ b/source/dnode/mgmt/mgmt_dnode/src/dmWorker.c @@ -260,6 +260,13 @@ void dmStopMonitorThread(SDnodeMgmt *pMgmt) { } } +void dmStopAuditThread(SDnodeMgmt *pMgmt) { + if (taosCheckPthreadValid(pMgmt->auditThread)) { + taosThreadJoin(pMgmt->auditThread, NULL); + taosThreadClear(&pMgmt->auditThread); + } +} + int32_t dmStartCrashReportThread(SDnodeMgmt *pMgmt) { if (!tsEnableCrashReport) { return 0; From b1dd76b9248973af0b417e42a5c59be2547bd4b6 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 12 Dec 2023 18:33:42 +0800 Subject: [PATCH 05/25] fix: assert on offset leq file.size in tsdbDataFileRAWWriterCloseCommit --- source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c index 3f448379c9..d2e3cb08e5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRAW.c @@ -115,7 +115,7 @@ static int32_t tsdbDataFileRAWWriterDoClose(SDataFileRAWWriter *writer) { return static int32_t tsdbDataFileRAWWriterCloseCommit(SDataFileRAWWriter *writer, TFileOpArray *opArr) { int32_t code = 0; int32_t lino = 0; - ASSERT(writer->ctx->offset == writer->file.size); + ASSERT(writer->ctx->offset <= writer->file.size); ASSERT(writer->config->fid == writer->file.fid); STFileOp op = (STFileOp){ From c5cde7ffe8f3da786326f2056ffc79c2f2cd1ae3 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 12 Dec 2023 18:34:59 +0800 Subject: [PATCH 06/25] enh: resend new snap replication msg on all acked if not finished yet --- source/libs/sync/src/syncSnapshot.c | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 93e81fd8e2..98cabca521 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -338,6 +338,12 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { pBlk->sendTimeMs = nowMs; } + if (pSender->seq != SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) { + if (snapshotSend(pSender) != 0) { + goto _out; + } + } + if (pSender->seq == SYNC_SNAPSHOT_SEQ_END && pSndBuf->end <= pSndBuf->start) { if (syncSnapSendMsg(pSender, pSender->seq, NULL, 0, 0) != 0) { goto _out; From af8a5c0ada376acca68166450901d8fa5c843098 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Tue, 12 Dec 2023 19:22:34 +0800 Subject: [PATCH 07/25] enh: protect stop of snap sender and receiver with the mutex of its buffer --- source/libs/sync/src/syncSnapshot.c | 55 ++++++++++++++++------------- 1 file changed, 31 insertions(+), 24 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 98cabca521..d7b19a75cd 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -24,7 +24,6 @@ #include "syncUtil.h" static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { - taosThreadMutexLock(&pBuf->mutex); for (int64_t i = pBuf->start; i < pBuf->end; ++i) { if (pBuf->entryDeleteCb) { pBuf->entryDeleteCb(pBuf->entries[i % pBuf->size]); @@ -34,7 +33,6 @@ static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { pBuf->start = SYNC_SNAPSHOT_SEQ_BEGIN + 1; pBuf->end = pBuf->start; pBuf->cursor = pBuf->start - 1; - taosThreadMutexUnlock(&pBuf->mutex); } static void syncSnapBufferDestroy(SSyncSnapBuffer **ppBuf) { @@ -198,20 +196,23 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { // update flag int8_t stopped = !atomic_val_compare_exchange_8(&pSender->start, true, false); if (stopped) return; + taosThreadMutexLock(&pSender->pSndBuf->mutex); + { + pSender->finish = finish; + pSender->waitTime = -1; - pSender->finish = finish; - pSender->waitTime = -1; + // close reader + if (pSender->pReader != NULL) { + pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); + pSender->pReader = NULL; + } - // close reader - if (pSender->pReader != NULL) { - pSender->pSyncNode->pFsm->FpSnapshotStopRead(pSender->pSyncNode->pFsm, pSender->pReader); - pSender->pReader = NULL; + syncSnapBufferReset(pSender->pSndBuf); + + SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; + sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish); } - - syncSnapBufferReset(pSender->pSndBuf); - - SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; - sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish); + taosThreadMutexUnlock(&pSender->pSndBuf->mutex); } int32_t syncSnapSendMsg(SSyncSnapshotSender *pSender, int32_t seq, void *pBlock, int32_t blockLen, int32_t typ) { @@ -324,6 +325,9 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) { SSyncSnapBuffer *pSndBuf = pSender->pSndBuf; int32_t code = -1; taosThreadMutexLock(&pSndBuf->mutex); + if (pSender->pReader == NULL || pSender->finish || !snapshotSenderIsStart(pSender)) { + goto _out; + } for (int32_t seq = pSndBuf->cursor + 1; seq < pSndBuf->end; ++seq) { SyncSnapBlock *pBlk = pSndBuf->entries[seq % pSndBuf->size]; @@ -520,19 +524,22 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { int8_t stopped = !atomic_val_compare_exchange_8(&pReceiver->start, true, false); if (stopped) return; - - if (pReceiver->pWriter != NULL) { - int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, false, - &pReceiver->snapshot); - if (ret != 0) { - sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr()); + taosThreadMutexLock(&pReceiver->pRcvBuf->mutex); + { + if (pReceiver->pWriter != NULL) { + int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStopWrite(pReceiver->pSyncNode->pFsm, pReceiver->pWriter, + false, &pReceiver->snapshot); + if (ret != 0) { + sRError(pReceiver, "snapshot receiver stop write failed since %s", terrstr()); + } + pReceiver->pWriter = NULL; + } else { + sRInfo(pReceiver, "snapshot receiver stop, writer is null"); } - pReceiver->pWriter = NULL; - } else { - sRInfo(pReceiver, "snapshot receiver stop, writer is null"); - } - syncSnapBufferReset(pReceiver->pRcvBuf); + syncSnapBufferReset(pReceiver->pRcvBuf); + } + taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex); } static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *pMsg) { From c74c4fe20844ed7b877466430e9a20b286b4bd96 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 13 Dec 2023 09:57:12 +0800 Subject: [PATCH 08/25] enh: clean WAL logs based on size if one follower is offline. --- include/libs/sync/sync.h | 4 +++- include/libs/wal/wal.h | 1 + source/libs/sync/inc/syncRaftLog.h | 1 + source/libs/sync/src/syncMain.c | 9 +++++++-- source/libs/sync/src/syncRaftLog.c | 10 ++++++++++ source/libs/wal/src/walMeta.c | 17 +++++++++++++++++ 6 files changed, 39 insertions(+), 3 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a428a9ae6a..e54237fe8b 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -33,11 +33,12 @@ extern "C" { #define SYNC_MAX_PROGRESS_WAIT_MS 4000 #define SYNC_MAX_START_TIME_RANGE_MS (1000 * 20) #define SYNC_MAX_RECV_TIME_RANGE_MS 1200 -#define SYNC_DEL_WAL_MS (1000 * 60) #define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1) #define SNAPSHOT_WAIT_MS 1000 * 5 +#define SYNC_WAL_LOG_RETENTION_SIZE (8LL * 1024 * 1024 * 1024) + #define SYNC_MAX_RETRY_BACKOFF 5 #define SYNC_LOG_REPL_RETRY_WAIT_MS 100 #define SYNC_APPEND_ENTRIES_TIMEOUT_MS 10000 @@ -219,6 +220,7 @@ typedef struct SSyncLogStore { SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore); SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore); + SyncIndex (*syncLogIndexRetention)(struct SSyncLogStore* pLogStore, int64_t bytes); SyncTerm (*syncLogLastTerm)(struct SSyncLogStore* pLogStore); int32_t (*syncLogAppendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry, bool forcSync); diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index a56a5567eb..7c00ff5178 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -225,6 +225,7 @@ bool walIsEmpty(SWal *); int64_t walGetFirstVer(SWal *); int64_t walGetSnapshotVer(SWal *); int64_t walGetLastVer(SWal *); +int64_t walGetVerRetention(SWal *pWal, int64_t bytes); int64_t walGetCommittedVer(SWal *); int64_t walGetAppliedVer(SWal *); diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index de8bd81b30..137baab558 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -46,6 +46,7 @@ SyncIndex raftLogBeginIndex(struct SSyncLogStore* pLogStore); SyncIndex raftLogEndIndex(struct SSyncLogStore* pLogStore); int32_t raftLogEntryCount(struct SSyncLogStore* pLogStore); SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore); +SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes); SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore); int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 199c7a1445..b8740a2858 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -305,6 +305,10 @@ SyncIndex syncMinMatchIndex(SSyncNode* pSyncNode) { return minMatchIndex; } +static SyncIndex syncLogRetentionIndex(SSyncNode* pSyncNode, int64_t bytes) { + return pSyncNode->pLogStore->syncLogIndexRetention(pSyncNode->pLogStore, bytes); +} + int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { SSyncNode* pSyncNode = syncNodeAcquire(rid); if (pSyncNode == NULL) { @@ -331,7 +335,6 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { } else { // vnode if (pSyncNode->replicaNum > 1) { - // multi replicas logRetention = SYNC_VNODE_LOG_RETENTION; } } @@ -344,7 +347,9 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { syncNodeRelease(pSyncNode); return 0; } - logRetention = TMAX(logRetention, lastApplyIndex - pSyncNode->minMatchIndex + logRetention); + SyncIndex retentionIndex = + TMAX(pSyncNode->minMatchIndex, syncLogRetentionIndex(pSyncNode, SYNC_WAL_LOG_RETENTION_SIZE)); + logRetention += TMAX(0, lastApplyIndex - retentionIndex); } _DEL_WAL: diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index b167f2ecb6..b9c6838fda 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -70,6 +70,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { pLogStore->syncLogIsEmpty = raftLogIsEmpty; pLogStore->syncLogEntryCount = raftLogEntryCount; pLogStore->syncLogLastIndex = raftLogLastIndex; + pLogStore->syncLogIndexRetention = raftLogIndexRetention; pLogStore->syncLogLastTerm = raftLogLastTerm; pLogStore->syncLogAppendEntry = raftLogAppendEntry; pLogStore->syncLogGetEntry = raftLogGetEntry; @@ -154,6 +155,15 @@ SyncIndex raftLogLastIndex(struct SSyncLogStore* pLogStore) { return lastVer; } +SyncIndex raftLogIndexRetention(struct SSyncLogStore* pLogStore, int64_t bytes) { + SyncIndex lastIndex; + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SyncIndex lastVer = walGetVerRetention(pWal, bytes); + + return lastVer; +} + SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 933014466a..b897eb4922 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -654,6 +654,23 @@ _err: return -1; } +int64_t walGetVerRetention(SWal* pWal, int64_t bytes) { + int64_t ver = -1; + int64_t totSize = 0; + taosThreadMutexLock(&pWal->mutex); + int32_t fileIdx = taosArrayGetSize(pWal->fileInfoSet); + while (--fileIdx) { + SWalFileInfo* pInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); + if (totSize >= bytes) { + ver = pInfo->lastVer; + break; + } + totSize += pInfo->fileSize; + } + taosThreadMutexUnlock(&pWal->mutex); + return ver + 1; +} + int walCheckAndRepairIdx(SWal* pWal) { int32_t sz = taosArrayGetSize(pWal->fileInfoSet); int32_t fileIdx = sz; From 6eb64fcd9b75eaac10b0a2958de654fe2f4558c7 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 13 Dec 2023 13:48:45 +0800 Subject: [PATCH 09/25] fix: set start of snap replication incremental properly --- source/libs/sync/src/syncSnapshot.c | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index d7b19a75cd..353e28890b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -23,6 +23,8 @@ #include "syncReplication.h" #include "syncUtil.h" +static SyncIndex syncNodeGetSnapBeginIndex(SSyncNode *ths); + static void syncSnapBufferReset(SSyncSnapBuffer *pBuf) { for (int64_t i = pBuf->start; i < pBuf->end; ++i) { if (pBuf->entryDeleteCb) { @@ -514,7 +516,9 @@ void snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver, SyncSnapshotSend *p pReceiver->term = pPreMsg->term; pReceiver->fromId = pPreMsg->srcId; pReceiver->startTime = pPreMsg->startTime; - ASSERT(pReceiver->startTime); + + pReceiver->snapshotParam.start = syncNodeGetSnapBeginIndex(pReceiver->pSyncNode); + pReceiver->snapshotParam.end = -1; sRInfo(pReceiver, "snapshot receiver start, from dnode:%d.", DID(&pReceiver->fromId)); } From 0a7ef098baf664cb64978f9d5da23fdbb47f45d2 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Wed, 13 Dec 2023 14:01:52 +0800 Subject: [PATCH 10/25] enh: reduce wait time to start a new snap replication --- source/libs/sync/inc/syncSnapshot.h | 1 - source/libs/sync/src/syncSnapshot.c | 11 +---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index f8ee99e8a0..66d8edfdfc 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -63,7 +63,6 @@ typedef struct SSyncSnapshotSender { int64_t sendingMS; SyncTerm term; int64_t startTime; - int64_t waitTime; int64_t lastSendTime; bool finish; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 353e28890b..cbdc60b2b3 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -81,7 +81,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI pSender->replicaIndex = replicaIndex; pSender->term = raftStoreGetTerm(pSyncNode); pSender->startTime = -1; - pSender->waitTime = -1; pSender->pSyncNode->pFsm->FpGetSnapshotInfo(pSender->pSyncNode->pFsm, &pSender->snapshot); pSender->finish = false; @@ -201,7 +200,6 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { taosThreadMutexLock(&pSender->pSndBuf->mutex); { pSender->finish = finish; - pSender->waitTime = -1; // close reader if (pSender->pReader != NULL) { @@ -373,14 +371,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) { return 0; } - int64_t timeNow = taosGetTimestampMs(); - if (pSender->waitTime <= 0) { - pSender->waitTime = timeNow + SNAPSHOT_WAIT_MS; - } - if (timeNow < pSender->waitTime) { - sSDebug(pSender, "snapshot sender waitTime not expired yet, ignore"); - return 0; - } + taosMsleep(1); int32_t code = snapshotSenderStart(pSender); if (code != 0) { From 7cb32da33645f9e1904bc0cf0eb98f804ab0580f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Wed, 13 Dec 2023 15:14:49 +0800 Subject: [PATCH 11/25] fix(cos/put): seek to part offset --- source/common/src/cos.c | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/source/common/src/cos.c b/source/common/src/cos.c index d494d6f175..fcc777ac99 100644 --- a/source/common/src/cos.c +++ b/source/common/src/cos.c @@ -671,6 +671,13 @@ upload: continue; } + if (i > 0 && cp.parts[i - 1].completed) { + if (taosLSeekFile(data->infileFD, cp.parts[i].offset, SEEK_SET) < 0) { + code = TAOS_SYSTEM_ERROR(errno); + goto clean; + } + } + int seq = cp.parts[i].index + 1; partData.manager = &manager; From 04366f2e682819527ef5747a156fe678e7b2bc8b Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 11 Dec 2023 11:24:31 +0000 Subject: [PATCH 12/25] revert dependency --- source/dnode/vnode/src/vnd/vnodeSvr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 1f951097a4..f77dbc5eee 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -1012,7 +1012,7 @@ static int32_t vnodeProcessCreateTbReq(SVnode *pVnode, int64_t ver, void *pReq, char* keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); + auditAddRecord(pOriginRpc, clusterId, "createTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); @@ -1236,7 +1236,7 @@ static int32_t vnodeProcessDropTbReq(SVnode *pVnode, int64_t ver, void *pReq, in char *keyJoined = taosStringBuilderGetResult(&sb, &len); if(pOriginRpc->info.conn.user != NULL && strlen(pOriginRpc->info.conn.user) > 0){ - auditRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); + auditAddRecord(pOriginRpc, clusterId, "dropTable", name.dbname, "", keyJoined, len); } taosStringBuilderDestroy(&sb); From e2d2ffc00eb89c96606f78588e6705b5fa9b92fd Mon Sep 17 00:00:00 2001 From: factosea <285808407@qq.com> Date: Wed, 13 Dec 2023 16:07:10 +0800 Subject: [PATCH 13/25] fix: % wildcard --- source/util/src/tcompare.c | 3 ++- source/util/test/utilTests.cpp | 5 +++++ 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index 4bacda48d2..b6b71855f3 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -1097,7 +1097,8 @@ int32_t patternMatch(const char *pattern, size_t psize, const char *str, size_t c1 = str[j++]; ++nMatchChar; - if (c == '\\' && pattern[i] == '_' && c1 == '_') { + if (c == '\\' && pattern[i] == c1 && + (c1 == '_' || c1 == '%')) { i++; continue; } diff --git a/source/util/test/utilTests.cpp b/source/util/test/utilTests.cpp index ff1d91aa9d..01a55ae710 100644 --- a/source/util/test/utilTests.cpp +++ b/source/util/test/utilTests.cpp @@ -204,6 +204,11 @@ TEST(utilTest, char_pattern_match_test) { const char* str12 = NULL; ret = patternMatch(pattern12, 4, str12, 0, &pInfo); ASSERT_EQ(ret, TSDB_PATTERN_NOMATCH); + + const char* pattern13 = "a\\%c"; + const char* str13 = "a%c"; + ret = patternMatch(pattern13, 5, str13, strlen(str13), &pInfo); + ASSERT_EQ(ret, TSDB_PATTERN_MATCH); } TEST(utilTest, char_pattern_match_no_terminated) { From 840b457308676f5bc7fd573e5368386d5e6ffc69 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Dec 2023 17:00:39 +0800 Subject: [PATCH 14/25] fix(tsdb): fix error in tsdb read. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 26 ++++++++++++---------- source/dnode/vnode/src/tsdb/tsdbReadUtil.c | 1 + 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index a2ef109800..6d7841116f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -67,7 +67,7 @@ static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond static int32_t doBuildDataBlock(STsdbReader* pReader); static TSDBKEY getCurrentKeyInBuf(STableBlockScanInfo* pScanInfo, STsdbReader* pReader); static bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo); -static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader); +static bool hasDataInSttBlock(STableBlockScanInfo *pInfo); static void initBlockDumpInfo(STsdbReader* pReader, SDataBlockIter* pBlockIter); static int32_t getInitialDelIndex(const SArray* pDelSkyline, int32_t order); static void resetTableListIndex(SReaderStatus* pStatus); @@ -1466,7 +1466,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* SFileBlockDumpInfo* pDumpInfo = &pReader->status.fBlockDumpInfo; int64_t tsLast = INT64_MIN; - if (hasDataInSttBlock(pSttBlockReader)) { + if (hasDataInSttBlock(pBlockScanInfo)) { tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } @@ -1485,7 +1485,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* int64_t minKey = 0; if (pReader->info.order == TSDB_ORDER_ASC) { minKey = INT64_MAX; // chosen the minimum value - if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) { + if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { minKey = tsLast; } @@ -1498,7 +1498,7 @@ static int32_t doMergeBufAndFileRows(STsdbReader* pReader, STableBlockScanInfo* } } else { minKey = INT64_MIN; - if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) { + if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { minKey = tsLast; } @@ -1705,7 +1705,7 @@ static int32_t mergeFileBlockAndSttBlock(STsdbReader* pReader, SSttBlockReader* } bool dataInDataFile = hasDataInFileBlock(pBlockData, pDumpInfo); - bool dataInSttFile = hasDataInSttBlock(pSttBlockReader); + bool dataInSttFile = hasDataInSttBlock(pBlockScanInfo); if (dataInDataFile && (!dataInSttFile)) { // no stt file block available, only data block exists return mergeRowsInFileBlocks(pBlockData, pBlockScanInfo, key, pReader); @@ -1791,7 +1791,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* TSDBROW* piRow = getValidMemRow(&pBlockScanInfo->iiter, pDelList, pReader); int64_t tsLast = INT64_MIN; - if (hasDataInSttBlock(pSttBlockReader)) { + if (hasDataInSttBlock(pBlockScanInfo)) { tsLast = getCurrentKeyInSttBlock(pSttBlockReader); } @@ -1840,7 +1840,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = key; } - if (minKey > tsLast && hasDataInSttBlock(pSttBlockReader)) { + if (minKey > tsLast && hasDataInSttBlock(pBlockScanInfo)) { minKey = tsLast; } } else { @@ -1857,7 +1857,7 @@ static int32_t doMergeMultiLevelRows(STsdbReader* pReader, STableBlockScanInfo* minKey = key; } - if (minKey < tsLast && hasDataInSttBlock(pSttBlockReader)) { + if (minKey < tsLast && hasDataInSttBlock(pBlockScanInfo)) { minKey = tsLast; } } @@ -2065,7 +2065,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan // the stt block reader has been initialized for this table. if (pSttBlockReader->uid == pScanInfo->uid) { - return hasDataInSttBlock(pSttBlockReader); + return hasDataInSttBlock(pScanInfo); } if (pSttBlockReader->uid != 0) { @@ -2158,7 +2158,9 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan return hasData; } -static bool hasDataInSttBlock(SSttBlockReader* pSttBlockReader) { return pSttBlockReader->mergeTree.pIter != NULL; } +static bool hasDataInSttBlock(STableBlockScanInfo *pInfo, SSttBlockReader* pSttBlockReader) { + return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; +} bool hasDataInFileBlock(const SBlockData* pBlockData, const SFileBlockDumpInfo* pDumpInfo) { if ((pBlockData->nRow > 0) && (pBlockData->nRow != pDumpInfo->totalRows)) { @@ -2733,7 +2735,7 @@ static int32_t doLoadSttBlockSequentially(STsdbReader* pReader) { int64_t st = taosGetTimestampUs(); while (1) { // no data in stt block and block, no need to proceed. - if (!hasDataInSttBlock(pSttBlockReader)) { + if (!hasDataInSttBlock(pScanInfo)) { break; } @@ -2850,7 +2852,7 @@ static int32_t doBuildDataBlock(STsdbReader* pReader) { initSttBlockReader(pSttBlockReader, pScanInfo, pReader); // no data in stt block, no need to proceed. - while (hasDataInSttBlock(pSttBlockReader)) { + while (hasDataInSttBlock(pScanInfo)) { ASSERT(pScanInfo->sttKeyInfo.status == STT_FILE_HAS_DATA); code = buildComposedDataBlockImpl(pReader, pScanInfo, &pReader->status.fileBlockData, pSttBlockReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 3c26badc0e..a223a2dc2d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -252,6 +252,7 @@ static void doCleanupInfoForNextFileset(STableBlockScanInfo* pScanInfo) { taosArrayClear(pScanInfo->pFileDelData); // del data from each file set pScanInfo->cleanSttBlocks = false; pScanInfo->numOfRowsInStt = 0; + pScanInfo->sttBlockReturned = false; INIT_TIMEWINDOW(&pScanInfo->sttWindow); INIT_TIMEWINDOW(&pScanInfo->filesetWindow); pScanInfo->sttKeyInfo.status = STT_FILE_READER_UNINIT; From 88a8e4e9f0805ac3d66a9896d6a55df90d9cc2c2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Dec 2023 17:01:55 +0800 Subject: [PATCH 15/25] fix(tsdb): fix syntax error. --- source/dnode/vnode/src/tsdb/tsdbRead2.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 6d7841116f..52ee6d0b14 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -2158,7 +2158,7 @@ static bool initSttBlockReader(SSttBlockReader* pSttBlockReader, STableBlockScan return hasData; } -static bool hasDataInSttBlock(STableBlockScanInfo *pInfo, SSttBlockReader* pSttBlockReader) { +static bool hasDataInSttBlock(STableBlockScanInfo *pInfo) { return pInfo->sttKeyInfo.status == STT_FILE_HAS_DATA; } From 105542bb211ef59c8c8d334c40975c7744c78034 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 13 Dec 2023 17:44:00 +0800 Subject: [PATCH 16/25] fix(stream):handle the case of delete orphaned tasks. --- source/dnode/mnode/impl/src/mndStream.c | 19 +++++++++++++------ source/dnode/mnode/impl/src/mndStreamTrans.c | 3 +-- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index e8d5dfd1f5..bd67af712a 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -704,10 +704,10 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask pReq->streamId = pTask->id.streamId; STransAction action = {0}; - SEpSet epset = {0}; - if(pTask->info.nodeId == SNODE_HANDLE){ + SEpSet epset = {0}; + if (pTask->info.nodeId == SNODE_HANDLE) { SSnodeObj *pObj = NULL; - void *pIter = NULL; + void *pIter = NULL; while (1) { pIter = sdbFetch(pMnode->pSdb, SDB_SNODE, pIter, (void **)&pObj); if (pIter == NULL) { @@ -717,10 +717,16 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask addEpIntoEpSet(&epset, pObj->pDnode->fqdn, pObj->pDnode->port); sdbRelease(pMnode->pSdb, pObj); } - }else{ + } else { SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - epset = mndGetVgroupEpset(pMnode, pVgObj); - mndReleaseVgroup(pMnode, pVgObj); + if (pVgObj != NULL) { + epset = mndGetVgroupEpset(pMnode, pVgObj); + mndReleaseVgroup(pMnode, pVgObj); + } else { + mDebug("orphaned task:0x%x need to be dropped, nodeId:%d, no redo action", pTask->id.taskId, pTask->info.nodeId); + taosMemoryFree(pReq); + return 0; + } } // The epset of nodeId of this task may have been expired now, let's use the newest epset from mnode. @@ -1657,6 +1663,7 @@ static void setTaskAttrInResBlock(SStreamObj *pStream, SStreamTask *pTask, SSDat STaskStatusEntry *pe = taosHashGet(execInfo.pTaskMap, &id, sizeof(id)); if (pe == NULL) { + mError("task:0x%" PRIx64 " not exists in vnode, no valid status/stage info", id.taskId); return; } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index fa36d69d6e..43e85a9405 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -48,8 +48,7 @@ int32_t clearFinishedTrans(SMnode* pMnode) { void* pKey = taosHashGetKey(pEntry, &keyLen); // key is the name of src/dst db name SKeyInfo info = {.pKey = pKey, .keyLen = keyLen}; - - mDebug("transId:%d %s startTs:%" PRId64 "cleared due to finished", pEntry->transId, pEntry->name, + mDebug("transId:%d %s startTs:%" PRId64 " cleared since finished", pEntry->transId, pEntry->name, pEntry->startTime); taosArrayPush(pList, &info); } else { From 926c28f62f740d8194f8885a5f6e59d6982e0a14 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 11 Dec 2023 19:08:22 +0800 Subject: [PATCH 17/25] enh: clear info data of snap sender and receiver at stop --- source/libs/sync/src/syncSnapshot.c | 49 ++++++++++++++++++++--------- 1 file changed, 34 insertions(+), 15 deletions(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index cbdc60b2b3..53d91d15ec 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -107,6 +107,19 @@ void syncSnapBlockDestroy(void *ptr) { taosMemoryFree(pBlk); } +static int32_t snapshotSenderClearInfoData(SSyncSnapshotSender *pSender) { + if (pSender->snapshotParam.data) { + taosMemoryFree(pSender->snapshotParam.data); + pSender->snapshotParam.data = NULL; + } + + if (pSender->snapshot.data) { + taosMemoryFree(pSender->snapshot.data); + pSender->snapshot.data = NULL; + } + return 0; +} + void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { if (pSender == NULL) return; @@ -121,10 +134,8 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) { syncSnapBufferDestroy(&pSender->pSndBuf); } - if (pSender->snapshotParam.data) { - taosMemoryFree(pSender->snapshotParam.data); - pSender->snapshotParam.data = NULL; - } + snapshotSenderClearInfoData(pSender); + // free sender taosMemoryFree(pSender); } @@ -209,6 +220,8 @@ void snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish) { syncSnapBufferReset(pSender->pSndBuf); + snapshotSenderClearInfoData(pSender); + SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; sSInfo(pSender, "snapshot sender stop, to dnode:%d, finish:%d", DID(&destId), finish); } @@ -419,6 +432,19 @@ SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode, SRaftId from return pReceiver; } +static int32_t snapshotReceiverClearInfoData(SSyncSnapshotReceiver *pReceiver) { + if (pReceiver->snapshotParam.data) { + taosMemoryFree(pReceiver->snapshotParam.data); + pReceiver->snapshotParam.data = NULL; + } + + if (pReceiver->snapshot.data) { + taosMemoryFree(pReceiver->snapshot.data); + pReceiver->snapshot.data = NULL; + } + return 0; +} + void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { if (pReceiver == NULL) return; @@ -432,22 +458,13 @@ void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) { pReceiver->pWriter = NULL; } - // free data of snapshot info - if (pReceiver->snapshotParam.data) { - taosMemoryFree(pReceiver->snapshotParam.data); - pReceiver->snapshotParam.data = NULL; - } - - if (pReceiver->snapshot.data) { - taosMemoryFree(pReceiver->snapshot.data); - pReceiver->snapshot.data = NULL; - } - // free snap buf if (pReceiver->pRcvBuf) { syncSnapBufferDestroy(&pReceiver->pRcvBuf); } + snapshotReceiverClearInfoData(pReceiver); + // free receiver taosMemoryFree(pReceiver); } @@ -533,6 +550,8 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { } syncSnapBufferReset(pReceiver->pRcvBuf); + + snapshotReceiverClearInfoData(pReceiver); } taosThreadMutexUnlock(&pReceiver->pRcvBuf->mutex); } From 41ef6075e2c807ae6867ddebfc28ee6b05c13246 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 11 Dec 2023 15:48:06 +0800 Subject: [PATCH 18/25] enh: check result of FpGetSnapshotInfo for exchange snap info --- source/libs/sync/src/syncSnapshot.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 53d91d15ec..f060e9da13 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -683,7 +683,10 @@ static int32_t syncSnapReceiverExchgSnapInfo(SSyncNode *pSyncNode, SSyncSnapshot memcpy(pInfo->data, pMsg->data, pMsg->dataLen); // exchange snap info - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo); + if (pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, pInfo) != 0) { + sRError(pReceiver, "failed to get snapshot info. type: %d", pMsg->payloadType); + goto _out; + } SSyncTLV *datHead = pInfo->data; if (datHead->typ != TDMT_SYNC_PREP_SNAPSHOT_REPLY) { sRError(pReceiver, "unexpected data typ in data of snapshot info. typ: %d", datHead->typ); From 81f96603b071d71c63ed8c0e57e7c5b2cf8c86e9 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 09:33:51 +0800 Subject: [PATCH 19/25] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 3 +- source/dnode/vnode/src/tq/tqStreamStateSnap.c | 2 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 16 ++--- source/libs/stream/src/streamMeta.c | 65 ++++--------------- 4 files changed, 20 insertions(+), 66 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 8f3e100db6..3c475d0a03 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -840,11 +840,10 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int3 int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); SStreamTask* streamMetaAcquireTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); void streamMetaReleaseTask(SStreamMeta* pMeta, SStreamTask* pTask); -int32_t streamMetaReopen(SStreamMeta* pMeta); +void streamMetaClear(SStreamMeta* pMeta); void streamMetaInitBackend(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta); -int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta); int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta); void streamMetaNotifyClose(SStreamMeta* pMeta); int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); diff --git a/source/dnode/vnode/src/tq/tqStreamStateSnap.c b/source/dnode/vnode/src/tq/tqStreamStateSnap.c index 2ab710176d..50f413bcc9 100644 --- a/source/dnode/vnode/src/tq/tqStreamStateSnap.c +++ b/source/dnode/vnode/src/tq/tqStreamStateSnap.c @@ -181,5 +181,5 @@ int32_t streamStateRebuildFromSnap(SStreamStateWriter* pWriter, int64_t chkpId) } int32_t streamStateLoadTasks(SStreamStateWriter* pWriter) { - return streamMetaReloadAllTasks(pWriter->pTq->pStreamMeta); + return streamMetaLoadAllTasks(pWriter->pTq->pStreamMeta); } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index b1d49bf31b..a761d15eff 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -712,9 +712,9 @@ int32_t resetStreamTaskStatus(SStreamMeta* pMeta) { } static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { - int32_t vgId = pMeta->vgId; - int32_t code = 0; - int64_t st = taosGetTimestampMs(); + int32_t vgId = pMeta->vgId; + int32_t code = 0; + int64_t st = taosGetTimestampMs(); while(1) { int32_t startVal = atomic_val_compare_exchange_32(&pMeta->startInfo.taskStarting, 0, 1); @@ -736,17 +736,9 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } streamMetaWLock(pMeta); - code = streamMetaReopen(pMeta); - if (code != TSDB_CODE_SUCCESS) { - tqError("vgId:%d failed to reopen stream meta", vgId); - streamMetaWUnLock(pMeta); - code = terrno; - return code; - } + streamMetaClear(pMeta); - streamMetaInitBackend(pMeta); int64_t el = taosGetTimestampMs() - st; - tqInfo("vgId:%d close&reload state elapsed time:%.3fs", vgId, el/1000.); code = streamMetaLoadAllTasks(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 807f120cb7..23cb6f5a35 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -31,7 +31,6 @@ int32_t streamMetaId = 0; int32_t taskDbWrapperId = 0; static void metaHbToMnode(void* param, void* tmrId); -static void streamMetaClear(SStreamMeta* pMeta); static int32_t streamMetaBegin(SStreamMeta* pMeta); static void streamMetaCloseImpl(void* arg); @@ -395,41 +394,6 @@ _err: return NULL; } -int32_t streamMetaReopen(SStreamMeta* pMeta) { - streamMetaClear(pMeta); - - // NOTE: role should not be changed during reopen meta - pMeta->streamBackendRid = -1; - pMeta->streamBackend = NULL; - - char* defaultPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128); - sprintf(defaultPath, "%s%s%s", pMeta->path, TD_DIRSEP, "state"); - taosRemoveDir(defaultPath); - - char* newPath = taosMemoryCalloc(1, strlen(pMeta->path) + 128); - sprintf(newPath, "%s%s%s", pMeta->path, TD_DIRSEP, "received"); - - int32_t code = taosStatFile(newPath, NULL, NULL, NULL); - if (code == 0) { - // directory exists - code = taosRenameFile(newPath, defaultPath); - if (code != 0) { - terrno = TAOS_SYSTEM_ERROR(code); - stError("vgId:%d failed to rename file, from %s to %s, code:%s", pMeta->vgId, newPath, defaultPath, - tstrerror(terrno)); - - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - return -1; - } - } - - taosMemoryFree(defaultPath); - taosMemoryFree(newPath); - - return 0; -} - // todo refactor: the lock shoud be restricted in one function void streamMetaInitBackend(SStreamMeta* pMeta) { pMeta->streamBackend = streamBackendInit(pMeta->path, pMeta->chkpId, pMeta->vgId); @@ -829,28 +793,27 @@ static void doClear(void* pKey, void* pVal, TBC* pCur, SArray* pRecycleList) { taosArrayDestroy(pRecycleList); } -int32_t streamMetaReloadAllTasks(SStreamMeta* pMeta) { - if (pMeta == NULL) return 0; - - return streamMetaLoadAllTasks(pMeta); -} int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) { TBC* pCur = NULL; - int32_t vgId = pMeta->vgId; - - stInfo("vgId:%d load stream tasks from meta files", vgId); - - if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { - stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); - return -1; - } - void* pKey = NULL; int32_t kLen = 0; void* pVal = NULL; int32_t vLen = 0; SDecoder decoder; - SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); + + if (pMeta == NULL) { + return TSDB_CODE_SUCCESS; + } + + SArray* pRecycleList = taosArrayInit(4, sizeof(STaskId)); + int32_t vgId = pMeta->vgId; + stInfo("vgId:%d load stream tasks from meta files", vgId); + + if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { + stError("vgId:%d failed to open stream meta, code:%s", vgId, tstrerror(terrno)); + taosArrayDestroy(pRecycleList); + return -1; + } tdbTbcMoveToFirst(pCur); while (tdbTbcNext(pCur, &pKey, &kLen, &pVal, &vLen) == 0) { From 8e042e34cb01651c4a68538564917d65fa01030b Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 09:48:14 +0800 Subject: [PATCH 20/25] refactor: do some internal refactor. --- source/dnode/vnode/src/tq/tq.c | 54 ++++++++++++++++------------------ 1 file changed, 25 insertions(+), 29 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index ee76a27414..4834924fe0 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -699,7 +699,23 @@ end: return ret; } -void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } +static void freePtr(void* ptr) { taosMemoryFree(*(void**)ptr); } + +static STaskId replaceStreamTaskId(SStreamTask* pTask) { + ASSERT(pTask->info.fillHistory); + STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId}; + + pTask->id.streamId = pTask->streamTaskId.streamId; + pTask->id.taskId = pTask->streamTaskId.taskId; + + return id; +} + +static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { + ASSERT(pTask->info.fillHistory); + pTask->streamTaskId.taskId = pId->taskId; + pTask->streamTaskId.streamId = pId->streamId; +} int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { int32_t vgId = TD_VID(pTq->pVnode); @@ -713,15 +729,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { streamTaskOpenAllUpstreamInput(pTask); if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) { - SStreamTask* pStateTask = pTask; - - STaskId taskId = {.streamId = 0, .taskId = 0}; + STaskId taskId = {0}; if (pTask->info.fillHistory) { - taskId.streamId = pTask->id.streamId; - taskId.taskId = pTask->id.taskId; - - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; + taskId = replaceStreamTaskId(pTask); } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -731,9 +741,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } else { tqDebug("s-task:%s state:%p", pTask->id.idStr, pTask->pState); } + if (pTask->info.fillHistory) { - pTask->id.streamId = taskId.streamId; - pTask->id.taskId = taskId.taskId; + restoreStreamTaskId(pTask, &taskId); } SReadHandle handle = { @@ -754,15 +764,9 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { qSetTaskId(pTask->exec.pExecutor, pTask->id.taskId, pTask->id.streamId); } else if (pTask->info.taskLevel == TASK_LEVEL__AGG) { - SStreamTask* pSateTask = pTask; - // SStreamTask task = {0}; - - STaskId taskId = {.streamId = 0, .taskId = 0}; + STaskId taskId = {0}; if (pTask->info.fillHistory) { - taskId.streamId = pTask->id.streamId; - taskId.taskId = pTask->id.taskId; - pTask->id.streamId = pTask->streamTaskId.streamId; - pTask->id.taskId = pTask->streamTaskId.taskId; + taskId = replaceStreamTaskId(pTask); } pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask, false, -1, -1); @@ -774,15 +778,13 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { } if (pTask->info.fillHistory) { - pTask->id.streamId = taskId.streamId; - pTask->id.taskId = taskId.taskId; + restoreStreamTaskId(pTask, &taskId); } - int32_t numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList); SReadHandle handle = { .checkpointId = pTask->chkInfo.checkpointId, .vnode = NULL, - .numOfVgroups = numOfVgroups, + .numOfVgroups = (int32_t)taosArrayGetSize(pTask->upstreamInfo.pList), .pStateBackend = pTask->pState, .fillHistory = pTask->info.fillHistory, .winRange = pTask->dataRange.window, @@ -828,12 +830,6 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { pTask->exec.pWalReader = walOpenReader(pTq->pVnode->pWal, &cond, pTask->id.taskId); } - // // reset the task status from unfinished transaction - // if (pTask->status.taskStatus == TASK_STATUS__PAUSE) { - // tqWarn("s-task:%s reset task status to be normal, status kept in taskMeta: Paused", pTask->id.idStr); - // pTask->status.taskStatus = TASK_STATUS__READY; - // } - streamTaskResetUpstreamStageInfo(pTask); streamSetupScheduleTrigger(pTask); SCheckpointInfo* pChkInfo = &pTask->chkInfo; From 239b2d4f006c7d9bd81a08f7f1caef335a6b1eaf Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 09:51:27 +0800 Subject: [PATCH 21/25] refactor: do some internal refactor. --- include/libs/stream/tstream.h | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c475d0a03..f6737b4e27 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -509,11 +509,8 @@ typedef struct SStreamMeta { SArray* chkpSaved; SArray* chkpInUse; SRWLatch chkpDirLock; - - void* qHandle; - int32_t pauseTaskNum; - - void* bkdChkptMgt; + void* qHandle; + void* bkdChkptMgt; } SStreamMeta; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); From f0d63a977a913c20f956921746f11d09c446b80f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 10:01:57 +0800 Subject: [PATCH 22/25] refactor: do some internal refactor. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index a761d15eff..a106fe148b 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -750,10 +750,10 @@ static int32_t restartStreamTasks(SStreamMeta* pMeta, bool isLeader) { } if (isLeader && !tsDisableStream) { - tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); resetStreamTaskStatus(pMeta); - streamMetaWUnLock(pMeta); + tqInfo("vgId:%d restart all stream tasks after all tasks being updated", vgId); + startStreamTasks(pMeta); } else { streamMetaResetStartInfo(&pMeta->startInfo); From 38d7ae3cd7206ead35e30699586ad39cf9a2e3c3 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 10:36:15 +0800 Subject: [PATCH 23/25] fix(query): add one more row for table rows distributed --- source/libs/function/src/builtinsimpl.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 25a3c509a7..93f3b6c109 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -5466,13 +5466,12 @@ bool blockDistSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { } int32_t blockDistFunction(SqlFunctionCtx* pCtx) { - const int32_t BLOCK_DIST_RESULT_ROWS = 24; + const int32_t BLOCK_DIST_RESULT_ROWS = 25; SInputColumnInfoData* pInput = &pCtx->input; SColumnInfoData* pInputCol = pInput->pData[0]; - - SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); - STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); + SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); + STableBlockDistInfo* pDistInfo = GET_ROWCELL_INTERBUF(pResInfo); STableBlockDistInfo p1 = {0}; tDeserializeBlockDistInfo(varDataVal(pInputCol->pData), varDataLen(pInputCol->pData), &p1); From 7cf0add513d812c687a83bb1712ef53830d2a234 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 14 Dec 2023 11:28:03 +0800 Subject: [PATCH 24/25] fix(stream): fix error caused by refactor. --- source/dnode/vnode/src/tq/tq.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 4834924fe0..138c58b45f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -713,8 +713,8 @@ static STaskId replaceStreamTaskId(SStreamTask* pTask) { static void restoreStreamTaskId(SStreamTask* pTask, STaskId* pId) { ASSERT(pTask->info.fillHistory); - pTask->streamTaskId.taskId = pId->taskId; - pTask->streamTaskId.streamId = pId->streamId; + pTask->id.taskId = pId->taskId; + pTask->id.streamId = pId->streamId; } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t nextProcessVer) { From 6b9ba26f37faab15318fb9ecfdee9f2a92a31f28 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 13 Dec 2023 10:11:15 +0000 Subject: [PATCH 25/25] auditinterval default --- source/common/src/tglobal.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 2b7ad21cc7..798916d3b5 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -97,7 +97,7 @@ bool tsMonitorComp = false; // audit bool tsEnableAudit = true; bool tsEnableAuditCreateTable = true; -int32_t tsAuditInterval = 500; +int32_t tsAuditInterval = 5000; // telem #ifdef TD_ENTERPRISE