Merge pull request #28417 from taosdata/fix/TS-5532-set-seperate-thread-update-status-main

Fix/ts 5532 set seperate thread update status main
This commit is contained in:
Shengliang Guan 2024-10-21 19:35:24 +08:00 committed by GitHub
commit 0e595fa768
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 110 additions and 3 deletions

View File

@ -28,6 +28,7 @@ typedef struct SDnodeMgmt {
const char *path;
const char *name;
TdThread statusThread;
TdThread statusInfoThread;
TdThread notifyThread;
TdThread monitorThread;
TdThread auditThread;
@ -49,6 +50,7 @@ typedef struct SDnodeMgmt {
// dmHandle.c
SArray *dmGetMsgHandles();
void dmSendStatusReq(SDnodeMgmt *pMgmt);
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt);
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq);
int32_t dmProcessConfigReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmProcessAuthRsp(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
@ -62,7 +64,9 @@ int32_t dmProcessCreateEncryptKeyReq(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
// dmWorker.c
int32_t dmPutNodeMsgToMgmtQueue(SDnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t dmStartStatusThread(SDnodeMgmt *pMgmt);
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt);
void dmStopStatusThread(SDnodeMgmt *pMgmt);
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt);
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt);
void dmStopNotifyThread(SDnodeMgmt *pMgmt);
int32_t dmStartMonitorThread(SDnodeMgmt *pMgmt);

View File

@ -22,6 +22,8 @@
extern SConfig *tsCfg;
SMonVloadInfo tsVinfo = {0};
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
int32_t code = 0;
if (pMgmt->pData->dnodeId == 0 || pMgmt->pData->clusterId == 0) {
@ -163,9 +165,16 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo);
req.pVloads = vinfo.pVloads;
if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to lock status info lock");
return;
}
req.pVloads = tsVinfo.pVloads;
tsVinfo.pVloads = NULL;
if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to unlock status info lock");
return;
}
dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
SMonMloadInfo minfo = {0};
@ -231,6 +240,28 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dmProcessStatusRsp(pMgmt, &rpcRsp);
}
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
SMonVloadInfo vinfo = {0};
dDebug("begin to get vnode loads");
(*pMgmt->getVnodeLoadsFp)(&vinfo);
dDebug("begin to lock status info");
if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to lock status info lock");
return;
}
if (tsVinfo.pVloads == NULL) {
tsVinfo.pVloads = vinfo.pVloads;
vinfo.pVloads = NULL;
} else {
taosArrayDestroy(vinfo.pVloads);
vinfo.pVloads = NULL;
}
if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to unlock status info lock");
return;
}
}
void dmSendNotifyReq(SDnodeMgmt *pMgmt, SNotifyReq *pReq) {
int32_t contLen = tSerializeSNotifyReq(NULL, 0, pReq);
if (contLen < 0) {

View File

@ -22,6 +22,9 @@ static int32_t dmStartMgmt(SDnodeMgmt *pMgmt) {
if ((code = dmStartStatusThread(pMgmt)) != 0) {
return code;
}
if ((code = dmStartStatusInfoThread(pMgmt)) != 0) {
return code;
}
#if defined(TD_ENTERPRISE)
if ((code = dmStartNotifyThread(pMgmt)) != 0) {
return code;
@ -44,6 +47,7 @@ static void dmStopMgmt(SDnodeMgmt *pMgmt) {
dmStopMonitorThread(pMgmt);
dmStopAuditThread(pMgmt);
dmStopStatusThread(pMgmt);
dmStopStatusInfoThread(pMgmt);
#if defined(TD_ENTERPRISE)
dmStopNotifyThread(pMgmt);
#endif

View File

@ -47,6 +47,49 @@ static void *dmStatusThreadFp(void *param) {
return NULL;
}
extern SMonVloadInfo tsVinfo;
static void *dmStatusInfoThreadFp(void *param) {
SDnodeMgmt *pMgmt = param;
int64_t lastTime = taosGetTimestampMs();
setThreadName("dnode-status-info");
int32_t upTimeCount = 0;
int64_t upTime = 0;
while (1) {
taosMsleep(200);
if (pMgmt->pData->dropped || pMgmt->pData->stopped) break;
int64_t curTime = taosGetTimestampMs();
if (curTime < lastTime) lastTime = curTime;
float interval = (curTime - lastTime) / 1000.0f;
if (interval >= tsStatusInterval) {
dmUpdateStatusInfo(pMgmt);
lastTime = curTime;
if ((upTimeCount = ((upTimeCount + 1) & 63)) == 0) {
upTime = taosGetOsUptime() - tsDndStartOsUptime;
tsDndUpTime = TMAX(tsDndUpTime, upTime);
}
}
}
dDebug("begin to lock status info when thread exit");
if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to lock status info lock");
return NULL;
}
if (tsVinfo.pVloads != NULL) {
taosArrayDestroy(tsVinfo.pVloads);
tsVinfo.pVloads = NULL;
}
if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to unlock status info lock");
return NULL;
}
return NULL;
}
SDmNotifyHandle dmNotifyHdl = {.state = 0};
#define TIMESERIES_STASH_NUM 5
static void *dmNotifyThreadFp(void *param) {
@ -280,6 +323,22 @@ int32_t dmStartStatusThread(SDnodeMgmt *pMgmt) {
return 0;
}
int32_t dmStartStatusInfoThread(SDnodeMgmt *pMgmt) {
int32_t code = 0;
TdThreadAttr thAttr;
(void)taosThreadAttrInit(&thAttr);
(void)taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
if (taosThreadCreate(&pMgmt->statusInfoThread, &thAttr, dmStatusInfoThreadFp, pMgmt) != 0) {
code = TAOS_SYSTEM_ERROR(errno);
dError("failed to create status Info thread since %s", tstrerror(code));
return code;
}
(void)taosThreadAttrDestroy(&thAttr);
tmsgReportStartup("dnode-status-info", "initialized");
return 0;
}
void dmStopStatusThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->statusThread)) {
(void)taosThreadJoin(pMgmt->statusThread, NULL);
@ -287,6 +346,13 @@ void dmStopStatusThread(SDnodeMgmt *pMgmt) {
}
}
void dmStopStatusInfoThread(SDnodeMgmt *pMgmt) {
if (taosCheckPthreadValid(pMgmt->statusInfoThread)) {
(void)taosThreadJoin(pMgmt->statusInfoThread, NULL);
taosThreadClear(&pMgmt->statusInfoThread);
}
}
int32_t dmStartNotifyThread(SDnodeMgmt *pMgmt) {
int32_t code = 0;
TdThreadAttr thAttr;

View File

@ -214,6 +214,7 @@ int32_t dmInitVars(SDnode *pDnode) {
}
(void)taosThreadRwlockInit(&pData->lock, NULL);
(void)taosThreadMutexInit(&pData->statusInfolock, NULL);
(void)taosThreadMutexInit(&pDnode->mutex, NULL);
return 0;
}

View File

@ -144,6 +144,7 @@ typedef struct {
char machineId[TSDB_MACHINE_ID_LEN + 1];
EEncryptAlgor encryptAlgorigthm;
EEncryptScope encryptScope;
TdThreadMutex statusInfolock;
} SDnodeData;
typedef struct {