Merge pull request #29958 from taosdata/fix/TD-33872-improve-status-sync-heartbeat

fix: [td-33872] improve status sync heartbeat
This commit is contained in:
Simon Guan 2025-03-03 13:51:44 +08:00 committed by GitHub
commit eee9976a44
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
26 changed files with 325 additions and 103 deletions

View File

@ -1839,6 +1839,7 @@ typedef struct {
int32_t statusSeq;
int64_t ipWhiteVer;
int64_t analVer;
int64_t timestamp;
} SStatusReq;
int32_t tSerializeSStatusReq(void* buf, int32_t bufLen, SStatusReq* pReq);

View File

@ -31,6 +31,7 @@ typedef enum {
QUERY_QUEUE,
FETCH_QUEUE,
READ_QUEUE,
STATUS_QUEUE,
WRITE_QUEUE,
APPLY_QUEUE,
SYNC_QUEUE,

View File

@ -82,7 +82,7 @@ int32_t convertStringToTimestamp(int16_t type, char* inputData, int64_t timePrec
int32_t getDuration(int64_t val, char unit, int64_t* result, int32_t timePrecision);
int32_t taosFormatUtcTime(char* buf, int32_t bufLen, int64_t ts, int32_t precision);
char* formatTimestampLocal(char* buf, int64_t val, int precision);
struct STm {
struct tm tm;
int64_t fsec; // in NANOSECOND

View File

@ -1471,6 +1471,8 @@ int32_t tSerializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->analVer));
TAOS_CHECK_EXIT(tSerializeSMonitorParas(&encoder, &pReq->clusterCfg.monitorParas));
TAOS_CHECK_EXIT(tEncodeI64(&encoder, pReq->timestamp));
tEndEncode(&encoder);
_exit:
@ -1600,6 +1602,10 @@ int32_t tDeserializeSStatusReq(void *buf, int32_t bufLen, SStatusReq *pReq) {
TAOS_CHECK_EXIT(tDeserializeSMonitorParas(&decoder, &pReq->clusterCfg.monitorParas));
}
if (!tDecodeIsEnd(&decoder)) {
TAOS_CHECK_EXIT(tDecodeI64(&decoder, &pReq->timestamp));
}
tEndDecode(&decoder);
_exit:

View File

@ -1000,6 +1000,34 @@ int32_t taosFormatUtcTime(char* buf, int32_t bufLen, int64_t t, int32_t precisio
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
char* formatTimestampLocal(char* buf, int64_t val, int precision) {
time_t tt;
if (precision == TSDB_TIME_PRECISION_MICRO) {
tt = (time_t)(val / 1000000);
}
if (precision == TSDB_TIME_PRECISION_NANO) {
tt = (time_t)(val / 1000000000);
} else {
tt = (time_t)(val / 1000);
}
struct tm tm;
if (taosLocalTime(&tt, &tm, NULL, 0, NULL) == NULL) {
return NULL;
}
size_t pos = taosStrfTime(buf, 32, "%Y-%m-%d %H:%M:%S", &tm);
if (precision == TSDB_TIME_PRECISION_MICRO) {
sprintf(buf + pos, ".%06d", (int)(val % 1000000));
} else if (precision == TSDB_TIME_PRECISION_NANO) {
sprintf(buf + pos, ".%09d", (int)(val % 1000000000));
} else {
sprintf(buf + pos, ".%03d", (int)(val % 1000));
}
return buf;
}
int32_t taosTs2Tm(int64_t ts, int32_t precision, struct STm* tm, timezone_t tz) {
tm->fsec = ts % TICK_PER_SECOND[precision] * (TICK_PER_SECOND[TSDB_TIME_PRECISION_NANO] / TICK_PER_SECOND[precision]);
time_t t = ts / TICK_PER_SECOND[precision];

View File

@ -25,6 +25,8 @@
extern SConfig *tsCfg;
SMonVloadInfo tsVinfo = {0};
SMnodeLoad tsMLoad = {0};
SDnodeData tsDnodeData = {0};
static void dmUpdateDnodeCfg(SDnodeMgmt *pMgmt, SDnodeCfg *pCfg) {
int32_t code = 0;
@ -167,23 +169,30 @@ static void dmProcessStatusRsp(SDnodeMgmt *pMgmt, SRpcMsg *pRsp) {
void dmSendStatusReq(SDnodeMgmt *pMgmt) {
int32_t code = 0;
SStatusReq req = {0};
req.timestamp = taosGetTimestampMs();
pMgmt->statusSeq++;
dDebug("send status req to mnode, statusSeq:%d, begin to mgnt lock", pMgmt->statusSeq);
(void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
dDebug("send status req to mnode, statusSeq:%d, begin to mgnt statusInfolock", pMgmt->statusSeq);
if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to lock status info lock");
return;
}
dDebug("send status req to mnode, statusSeq:%d, begin to get dnode info", pMgmt->statusSeq);
req.sver = tsVersion;
req.dnodeVer = pMgmt->pData->dnodeVer;
req.dnodeId = pMgmt->pData->dnodeId;
req.clusterId = pMgmt->pData->clusterId;
req.dnodeVer = tsDnodeData.dnodeVer;
req.dnodeId = tsDnodeData.dnodeId;
req.clusterId = tsDnodeData.clusterId;
if (req.clusterId == 0) req.dnodeId = 0;
req.rebootTime = pMgmt->pData->rebootTime;
req.updateTime = pMgmt->pData->updateTime;
req.rebootTime = tsDnodeData.rebootTime;
req.updateTime = tsDnodeData.updateTime;
req.numOfCores = tsNumOfCores;
req.numOfSupportVnodes = tsNumOfSupportVnodes;
req.numOfDiskCfg = tsDiskCfgNum;
req.memTotal = tsTotalMemoryKB * 1024;
req.memAvail = req.memTotal - tsQueueMemoryAllowed - tsApplyMemoryAllowed - 16 * 1024 * 1024;
tstrncpy(req.dnodeEp, tsLocalEp, TSDB_EP_LEN);
tstrncpy(req.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
tstrncpy(req.machineId, tsDnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
req.clusterCfg.statusInterval = tsStatusInterval;
req.clusterCfg.checkTime = 0;
@ -205,29 +214,23 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
memcpy(req.clusterCfg.timezone, tsTimezoneStr, TD_TIMEZONE_LEN);
memcpy(req.clusterCfg.locale, tsLocale, TD_LOCALE_LEN);
memcpy(req.clusterCfg.charset, tsCharset, TD_LOCALE_LEN);
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
dDebug("send status req to mnode, statusSeq:%d, begin to get vnode loads", pMgmt->statusSeq);
if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to lock status info lock");
return;
}
req.pVloads = tsVinfo.pVloads;
tsVinfo.pVloads = NULL;
dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
req.mload = tsMLoad;
if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to unlock status info lock");
return;
}
dDebug("send status req to mnode, statusSeq:%d, begin to get mnode loads", pMgmt->statusSeq);
SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo);
req.mload = minfo.load;
dDebug("send status req to mnode, statusSeq:%d, begin to get qnode loads", pMgmt->statusSeq);
(*pMgmt->getQnodeLoadsFp)(&req.qload);
pMgmt->statusSeq++;
req.statusSeq = pMgmt->statusSeq;
req.ipWhiteVer = pMgmt->pData->ipWhiteVer;
req.analVer = taosAnalGetVersion();
@ -267,7 +270,7 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
code =
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
if (code != 0) {
dError("failed to send status req since %s", tstrerror(code));
dError("failed to SendRecv with timeout %d status req since %s", tsStatusInterval * 5 * 1000, tstrerror(code));
return;
}
@ -275,7 +278,8 @@ void dmSendStatusReq(SDnodeMgmt *pMgmt) {
dmRotateMnodeEpSet(pMgmt->pData);
char tbuf[512];
dmEpSetToStr(tbuf, sizeof(tbuf), &epSet);
dError("failed to send status req since %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code), tbuf, epSet.inUse);
dInfo("Rotate mnode ep set since failed to SendRecv status req %s, epSet:%s, inUse:%d", tstrerror(rpcRsp.code),
tbuf, epSet.inUse);
} else {
if (epUpdated == 1) {
dmSetMnodeEpSet(pMgmt->pData, &epSet);
@ -403,7 +407,7 @@ void dmSendConfigReq(SDnodeMgmt *pMgmt) {
code =
rpcSendRecvWithTimeout(pMgmt->msgCb.statusRpc, &epSet, &rpcMsg, &rpcRsp, &epUpdated, tsStatusInterval * 5 * 1000);
if (code != 0) {
dError("failed to send status req since %s", tstrerror(code));
dError("failed to SendRecv config req with timeout %d since %s", tsStatusInterval * 5 * 1000, tstrerror(code));
return;
}
if (rpcRsp.code != 0) {
@ -414,14 +418,37 @@ void dmSendConfigReq(SDnodeMgmt *pMgmt) {
}
void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
SMonVloadInfo vinfo = {0};
dDebug("begin to get dnode info");
SDnodeData dnodeData = {0};
(void)taosThreadRwlockRdlock(&pMgmt->pData->lock);
dnodeData.dnodeVer = pMgmt->pData->dnodeVer;
dnodeData.dnodeId = pMgmt->pData->dnodeId;
dnodeData.clusterId = pMgmt->pData->clusterId;
dnodeData.rebootTime = pMgmt->pData->rebootTime;
dnodeData.updateTime = pMgmt->pData->updateTime;
tstrncpy(dnodeData.machineId, pMgmt->pData->machineId, TSDB_MACHINE_ID_LEN + 1);
(void)taosThreadRwlockUnlock(&pMgmt->pData->lock);
dDebug("begin to get vnode loads");
(*pMgmt->getVnodeLoadsFp)(&vinfo);
SMonVloadInfo vinfo = {0};
(*pMgmt->getVnodeLoadsFp)(&vinfo); // dmGetVnodeLoads
dDebug("begin to get mnode loads");
SMonMloadInfo minfo = {0};
(*pMgmt->getMnodeLoadsFp)(&minfo); // dmGetMnodeLoads
dDebug("begin to lock status info");
if (taosThreadMutexLock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to lock status info lock");
return;
}
tsDnodeData.dnodeVer = dnodeData.dnodeVer;
tsDnodeData.dnodeId = dnodeData.dnodeId;
tsDnodeData.clusterId = dnodeData.clusterId;
tsDnodeData.rebootTime = dnodeData.rebootTime;
tsDnodeData.updateTime = dnodeData.updateTime;
tstrncpy(tsDnodeData.machineId, dnodeData.machineId, TSDB_MACHINE_ID_LEN + 1);
if (tsVinfo.pVloads == NULL) {
tsVinfo.pVloads = vinfo.pVloads;
vinfo.pVloads = NULL;
@ -429,6 +456,9 @@ void dmUpdateStatusInfo(SDnodeMgmt *pMgmt) {
taosArrayDestroy(vinfo.pVloads);
vinfo.pVloads = NULL;
}
tsMLoad = minfo.load;
if (taosThreadMutexUnlock(&pMgmt->pData->statusInfolock) != 0) {
dError("failed to unlock status info lock");
return;

View File

@ -32,6 +32,7 @@ typedef struct SMnodeMgmt {
SSingleWorker queryWorker;
SSingleWorker fetchWorker;
SSingleWorker readWorker;
SSingleWorker statusWorker;
SSingleWorker writeWorker;
SSingleWorker arbWorker;
SSingleWorker syncWorker;
@ -58,6 +59,7 @@ int32_t mmPutMsgToArbQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToSyncRdQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToFetchQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg);
int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc);

View File

@ -202,7 +202,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_QUERY, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_KILL_CONN, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_HEARTBEAT, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_STATUS, mmPutMsgToStatusQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CONFIG, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_NOTIFY, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_SYSTABLE_RETRIEVE, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;

View File

@ -133,6 +133,10 @@ int32_t mmPutMsgToReadQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->readWorker, pMsg);
}
int32_t mmPutMsgToStatusQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return mmPutMsgToWorker(pMgmt, &pMgmt->statusWorker, pMsg);
}
int32_t mmPutMsgToQueryQueue(SMnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t code = 0;
if (NULL == pMgmt->pMnode) {
@ -172,6 +176,9 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
case READ_QUEUE:
pWorker = &pMgmt->readWorker;
break;
case STATUS_QUEUE:
pWorker = &pMgmt->statusWorker;
break;
case ARB_QUEUE:
pWorker = &pMgmt->arbWorker;
break;
@ -246,6 +253,18 @@ int32_t mmStartWorker(SMnodeMgmt *pMgmt) {
return code;
}
SSingleWorkerCfg stautsCfg = {
.min = 1,
.max = 1,
.name = "mnode-status",
.fp = (FItem)mmProcessRpcMsg,
.param = pMgmt,
};
if ((code = tSingleWorkerInit(&pMgmt->statusWorker, &stautsCfg)) != 0) {
dError("failed to start mnode-status worker since %s", tstrerror(code));
return code;
}
SSingleWorkerCfg wCfg = {
.min = 1,
.max = 1,
@ -304,6 +323,7 @@ void mmStopWorker(SMnodeMgmt *pMgmt) {
tSingleWorkerCleanup(&pMgmt->queryWorker);
tSingleWorkerCleanup(&pMgmt->fetchWorker);
tSingleWorkerCleanup(&pMgmt->readWorker);
tSingleWorkerCleanup(&pMgmt->statusWorker);
tSingleWorkerCleanup(&pMgmt->writeWorker);
tSingleWorkerCleanup(&pMgmt->arbWorker);
tSingleWorkerCleanup(&pMgmt->syncWorker);

View File

@ -71,6 +71,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) {
code = taosWriteQitem(pMgmt->queryWorker.queue, pMsg);
return code;
case READ_QUEUE:
case STATUS_QUEUE:
case FETCH_QUEUE:
dTrace("msg:%p, is created and will put into qnode-fetch queue, len:%d", pMsg, pRpc->contLen);
code = taosWriteQitem(pMgmt->fetchWorker.queue, pMsg);

View File

@ -40,7 +40,7 @@ typedef struct SVnodeMgmt {
SHashObj *runngingHash;
SHashObj *closedHash;
SHashObj *creatingHash;
TdThreadRwlock lock;
TdThreadRwlock hashLock;
TdThreadMutex mutex;
SVnodesStat state;
STfs *pTfs;

View File

@ -20,7 +20,7 @@
#define MAX_CONTENT_LEN 2 * 1024 * 1024
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->runngingHash);
@ -28,7 +28,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
size += closedSize;
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
if (pVnodes == NULL) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return terrno;
}
@ -60,7 +60,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
*numOfVnodes = num;
*ppVnodes = pVnodes;
@ -68,7 +68,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
}
int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->runngingHash);
@ -76,7 +76,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
size += creatingSize;
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
if (pVnodes == NULL) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return terrno;
}
@ -107,7 +107,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
taosHashCancelIterate(pMgmt->creatingHash, pIter);
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
*numOfVnodes = num;
*ppVnodes = pVnodes;
@ -116,13 +116,13 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
}
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->runngingHash);
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
if (pVnodes == NULL) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return terrno;
}
@ -140,7 +140,7 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
*numOfVnodes = num;
*ppVnodes = pVnodes;

View File

@ -25,7 +25,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
tfsUpdateSize(pMgmt->pTfs);
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
while (pIter) {
@ -46,14 +46,14 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
}
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
if (!pInfo->pVloads) return;
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
while (pIter) {
@ -74,7 +74,7 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
}
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
@ -137,7 +137,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
dError("failed to get vgroup ids");
return;
}
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
for (int i = 0; i < list_size; i++) {
int32_t vgroup_id = vgroup_ids[i];
void *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
@ -148,7 +148,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
}
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
if (vgroup_ids) taosMemoryFree(vgroup_ids);
if (keys) taosMemoryFree(keys);
return;

View File

@ -24,12 +24,12 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
int32_t diskId = -1;
SVnodeObj *pVnode = NULL;
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode != NULL) {
diskId = pVnode->diskPrimary;
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return diskId;
}
@ -62,7 +62,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
pCreatingVnode->vgId = vgId;
pCreatingVnode->diskPrimary = diskId;
code = taosThreadRwlockWrlock(&pMgmt->lock);
code = taosThreadRwlockWrlock(&pMgmt->hashLock);
if (code != 0) {
taosMemoryFree(pCreatingVnode);
return code;
@ -75,7 +75,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
taosMemoryFree(pCreatingVnode);
}
int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
if (r != 0) {
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
}
@ -86,7 +86,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pOld = NULL;
(void)taosThreadRwlockWrlock(&pMgmt->lock);
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
if (r != 0) {
dError("vgId:%d, failed to get vnode from creating Hash", vgId);
@ -96,7 +96,7 @@ static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
if (r != 0) {
dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
if (pOld) {
dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
@ -205,7 +205,7 @@ void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingS
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
SVnodeObj *pVnode = NULL;
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
@ -214,7 +214,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return pVnode;
}
@ -334,10 +334,10 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->failed = 1;
}
(void)taosThreadRwlockWrlock(&pMgmt->lock);
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
int32_t code = vmRegisterRunningState(pMgmt, pVnode);
vmUnRegisterClosedState(pMgmt, pVnode);
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return code;
}
@ -350,15 +350,15 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
}
(void)taosThreadRwlockWrlock(&pMgmt->lock);
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
vmUnRegisterRunningState(pMgmt, pVnode->vgId);
if (keepClosed) {
if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return;
};
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
vmReleaseVnode(pMgmt, pVnode);
@ -454,14 +454,14 @@ _closed:
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
int32_t r = 0;
r = taosThreadRwlockWrlock(&pMgmt->lock);
r = taosThreadRwlockWrlock(&pMgmt->hashLock);
if (r != 0) {
dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
}
if (r == 0) {
vmUnRegisterRunningState(pMgmt, vgId);
}
r = taosThreadRwlockUnlock(&pMgmt->lock);
r = taosThreadRwlockUnlock(&pMgmt->hashLock);
if (r != 0) {
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
}
@ -796,7 +796,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
vmCloseVnodes(pMgmt);
vmStopWorker(pMgmt);
vnodeCleanup();
(void)taosThreadRwlockDestroy(&pMgmt->lock);
(void)taosThreadRwlockDestroy(&pMgmt->hashLock);
(void)taosThreadMutexDestroy(&pMgmt->mutex);
(void)taosThreadMutexDestroy(&pMgmt->fileLock);
taosMemoryFree(pMgmt);
@ -884,7 +884,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt;
code = taosThreadRwlockInit(&pMgmt->lock, NULL);
code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _OVER;

View File

@ -756,8 +756,12 @@ static int32_t mndProcessStatusReq(SRpcMsg *pReq) {
bool needCheck = !online || dnodeChanged || reboot || supportVnodesChanged || analVerChanged ||
pMnode->ipWhiteVer != statusReq.ipWhiteVer || encryptKeyChanged || enableWhiteListChanged;
const STraceId *trace = &pReq->info.traceId;
mGTrace("dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d", pDnode->id,
pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq);
char timestamp[TD_TIME_STR_LEN] = {0};
if (mDebugFlag & DEBUG_TRACE) (void)formatTimestampLocal(timestamp, statusReq.timestamp, TSDB_TIME_PRECISION_MILLI);
mGTrace(
"dnode:%d, status received, accessTimes:%d check:%d online:%d reboot:%d changed:%d statusSeq:%d "
"timestamp:%s",
pDnode->id, pDnode->accessTimes, needCheck, online, reboot, dnodeChanged, statusReq.statusSeq, timestamp);
if (reboot) {
tsGrantHBInterval = GRANT_HEART_BEAT_MIN;

View File

@ -1174,6 +1174,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
}
int32_t mndGetLoad(SMnode *pMnode, SMnodeLoad *pLoad) {
mTrace("mnode get load");
SSyncState state = syncGetState(pMnode->syncMgmt.sync);
pLoad->syncState = state.state;
pLoad->syncRestore = state.restored;

View File

@ -66,6 +66,7 @@ class MndTestTrans2 : public ::testing::Test {
msgCb.queueFps[SYNC_QUEUE] = putToQueue;
msgCb.queueFps[WRITE_QUEUE] = putToQueue;
msgCb.queueFps[READ_QUEUE] = putToQueue;
msgCb.queueFps[STATUS_QUEUE] = putToQueue;
msgCb.mgmt = (SMgmtWrapper *)(&msgCb); // hack
tmsgSetDefault(&msgCb);

View File

@ -40,6 +40,7 @@ typedef struct SSyncLogReplMgr {
int32_t retryBackoff;
int32_t peerId;
int32_t sendCount;
TdThreadMutex mutex;
} SSyncLogReplMgr;
typedef struct SSyncLogBufEntry {

View File

@ -35,6 +35,7 @@ void raftStoreClearVote(SSyncNode *pNode);
void raftStoreNextTerm(SSyncNode *pNode);
void raftStoreSetTerm(SSyncNode *pNode, SyncTerm term);
SyncTerm raftStoreGetTerm(SSyncNode *pNode);
SyncTerm raftStoreTryGetTerm(SSyncNode *pNode);
#ifdef __cplusplus
}

View File

@ -50,8 +50,15 @@ extern "C" {
#define sNError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintNodeLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); }
#define sNWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintNodeLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); }
#define sNInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintNodeLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); }
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, false, pNode, __VA_ARGS__); }
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, false, pNode, __VA_ARGS__); }
#define sNDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintNodeLog("SYN ", DEBUG_DEBUG, sDebugFlag, true, pNode, __VA_ARGS__); }
#define sNTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintNodeLog("SYN ", DEBUG_TRACE, sDebugFlag, true, pNode, __VA_ARGS__); }
#define sHFatal(pNode, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintHbLog("SYN FATAL ", DEBUG_FATAL, 255, true, pNode, __VA_ARGS__); }
#define sHError(pNode, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintHbLog("SYN ERROR ", DEBUG_ERROR, 255, true, pNode, __VA_ARGS__); }
#define sHWarn(pNode, ...) if (sDebugFlag & DEBUG_WARN) { syncPrintHbLog("SYN WARN ", DEBUG_WARN, 255, true, pNode, __VA_ARGS__); }
#define sHInfo(pNode, ...) if (sDebugFlag & DEBUG_INFO) { syncPrintHbLog("SYN ", DEBUG_INFO, 255, true, pNode, __VA_ARGS__); }
#define sHDebug(pNode, ...) if (sDebugFlag & DEBUG_DEBUG) { syncPrintHbLog("SYN ", DEBUG_DEBUG, sDebugFlag, true, pNode, __VA_ARGS__); }
#define sHTrace(pNode, ...) if (sDebugFlag & DEBUG_TRACE) { syncPrintHbLog("SYN ", DEBUG_TRACE, sDebugFlag, true, pNode, __VA_ARGS__); }
#define sSFatal(pSender, ...) if (sDebugFlag & DEBUG_FATAL) { syncPrintSnapshotSenderLog("SYN FATAL ", DEBUG_FATAL, 255, pSender, __VA_ARGS__); }
#define sSError(pSender, ...) if (sDebugFlag & DEBUG_ERROR) { syncPrintSnapshotSenderLog("SYN ERROR ", DEBUG_ERROR, 255, pSender, __VA_ARGS__); }
@ -87,6 +94,8 @@ void syncUtilGenerateArbToken(int32_t nodeId, int32_t groupId, char* buf);
void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
const char* format, ...);
void syncPrintHbLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
const char* format, ...);
void syncPrintSnapshotSenderLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotSender* pSender,
const char* format, ...);
void syncPrintSnapshotReceiverLog(const char* flags, ELogLevel level, int32_t dflag, SSyncSnapshotReceiver* pReceiver,

View File

@ -1140,26 +1140,29 @@ int32_t syncLogReplRecover(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEn
int32_t syncLogReplProcessHeartbeatReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncHeartbeatReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
(void)taosThreadMutexLock(&pBuf->mutex);
(void)taosThreadMutexLock(&pMgr->mutex);
if (pMsg->startTime != 0 && pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl in heartbeat. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64 "",
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
syncLogReplReset(pMgr);
pMgr->peerStartTime = pMsg->startTime;
}
(void)taosThreadMutexUnlock(&pBuf->mutex);
(void)taosThreadMutexUnlock(&pMgr->mutex);
return 0;
}
int32_t syncLogReplProcessReply(SSyncLogReplMgr* pMgr, SSyncNode* pNode, SyncAppendEntriesReply* pMsg) {
SSyncLogBuffer* pBuf = pNode->pLogBuf;
(void)taosThreadMutexLock(&pBuf->mutex);
(void)taosThreadMutexLock(&pMgr->mutex);
if (pMsg->startTime != pMgr->peerStartTime) {
sInfo("vgId:%d, reset sync log repl in appendlog reply. peer:%" PRIx64 ", start time:%" PRId64 ", old:%" PRId64,
pNode->vgId, pMsg->srcId.addr, pMsg->startTime, pMgr->peerStartTime);
syncLogReplReset(pMgr);
pMgr->peerStartTime = pMsg->startTime;
}
(void)taosThreadMutexUnlock(&pMgr->mutex);
(void)taosThreadMutexLock(&pBuf->mutex);
int32_t code = 0;
if (pMgr->restored) {
@ -1324,6 +1327,12 @@ SSyncLogReplMgr* syncLogReplCreate() {
return NULL;
}
int32_t code = taosThreadMutexInit(&pMgr->mutex, NULL);
if (code) {
terrno = code;
return NULL;
}
return pMgr;
}
@ -1331,6 +1340,7 @@ void syncLogReplDestroy(SSyncLogReplMgr* pMgr) {
if (pMgr == NULL) {
return;
}
(void)taosThreadMutexDestroy(&pMgr->mutex);
taosMemoryFree(pMgr);
return;
}

View File

@ -213,3 +213,13 @@ SyncTerm raftStoreGetTerm(SSyncNode *pNode) {
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
return term;
}
SyncTerm raftStoreTryGetTerm(SSyncNode *pNode) {
SyncTerm term = 0;
if (taosThreadMutexTryLock(&pNode->raftStore.mutex) == 0) {
term = pNode->raftStore.currentTerm;
(void)taosThreadMutexUnlock(&pNode->raftStore.mutex);
}
return term;
}

View File

@ -116,7 +116,7 @@ static void syncPrintTime(bool formatTime, int32_t* len, int64_t tsMs, int32_t i
if (formatTime) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (tsMs > 0) {
if (taosFormatUtcTime(pBuf, TD_TIME_STR_LEN, tsMs, TSDB_TIME_PRECISION_MILLI) != 0) {
if (formatTimestampLocal(pBuf, tsMs, TSDB_TIME_PRECISION_MILLI) == NULL) {
pBuf[0] = '\0';
}
}
@ -215,7 +215,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // vnodeSyncGetSnapshotInfo
}
SyncIndex logLastIndex = SYNC_INDEX_INVALID;
@ -254,12 +254,12 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
int32_t aqItems = 0;
if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); // vnodeApplyQueueItems
}
// restore error code
terrno = errCode;
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); // vnodeSyncAppliedIndex
if (pNode != NULL) {
taosPrintLog(
@ -270,15 +270,71 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
"aq-items:%d, snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, recv:%d, slow-recev:%d]",
", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, "
"recv:%d, slow-recv:%d]",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
appliedIndex, logBeginIndex, logLastIndex, pNode->minMatchIndex, snapshot.lastApplyIndex,
snapshot.lastApplyTerm, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, aqItems, pNode->snapshottingIndex,
pNode->replicaNum, pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish,
syncNodeDynamicQuorum(pNode), pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr,
replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount,
pNode->slowCount);
replMgrStatesStr, cfgStr, sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount,
pNode->recvCount, pNode->slowCount);
}
}
void syncPrintHbLog(const char* flags, ELogLevel level, int32_t dflag, bool formatTime, SSyncNode* pNode,
const char* format, ...) {
if (pNode == NULL || pNode->pLogStore == NULL) return;
int64_t currentTerm = raftStoreTryGetTerm(pNode);
// save error code, otherwise it will be overwritten
int32_t errCode = terrno;
int32_t cacheHit = pNode->pLogStore->cacheHit;
int32_t cacheMiss = pNode->pLogStore->cacheMiss;
char cfgStr[1024] = "";
syncCfg2SimpleStr(&pNode->raftCfg.cfg, cfgStr, sizeof(cfgStr));
char replMgrStatesStr[1024] = "";
syncLogReplStates2Str(pNode, replMgrStatesStr, sizeof(replMgrStatesStr));
char bufferStatesStr[256] = "";
syncLogBufferStates2Str(pNode, bufferStatesStr, sizeof(bufferStatesStr));
char hbrTimeStr[256] = "";
syncHearbeatReplyTime2Str(pNode, hbrTimeStr, sizeof(hbrTimeStr), formatTime);
char hbTimeStr[256] = "";
syncHearbeatTime2Str(pNode, hbTimeStr, sizeof(hbTimeStr), formatTime);
char sentHbTimeStr[512] = "";
syncSentHearbeatTime2Str(pNode, sentHbTimeStr, sizeof(sentHbTimeStr), formatTime);
char eventLog[512]; // {0};
va_list argpointer;
va_start(argpointer, format);
int32_t writeLen = vsnprintf(eventLog, sizeof(eventLog), format, argpointer);
va_end(argpointer);
terrno = errCode;
if (pNode != NULL) {
taosPrintLog(
flags, level, dflag,
"vgId:%d, %s, sync:%s, term:%" PRIu64 ", commit-index:%" PRId64 ", assigned-index:%" PRId64 ", min:%" PRId64
", elect-times:%d, as-leader-times:%d, as-assigned-leader-times:%d, cfg-ch-times:%d, hb-slow:%d, hbr-slow:%d, "
", snaping:%" PRId64 ", replicas:%d, last-cfg:%" PRId64
", chging:%d, restore:%d, quorum:%d, elect-lc-timer:%" PRId64 ", hb:%" PRId64
", buffer:%s, repl-mgrs:%s, members:%s, send hb:%s, recv hb:%s, recv hb-reply:%s, arb-token:%s, msg[sent:%d, "
"recv:%d, slow-recv:%d]",
pNode->vgId, eventLog, syncStr(pNode->state), currentTerm, pNode->commitIndex, pNode->assignedCommitIndex,
pNode->minMatchIndex, pNode->electNum, pNode->becomeLeaderNum, pNode->becomeAssignedLeaderNum,
pNode->configChangeNum, pNode->hbSlowNum, pNode->hbrSlowNum, pNode->snapshottingIndex, pNode->replicaNum,
pNode->raftCfg.lastConfigIndex, pNode->changing, pNode->restoreFinish, syncNodeDynamicQuorum(pNode),
pNode->electTimerLogicClock, pNode->heartbeatTimerLogicClockUser, bufferStatesStr, replMgrStatesStr, cfgStr,
sentHbTimeStr, hbTimeStr, hbrTimeStr, pNode->arbToken, pNode->sendCount, pNode->recvCount, pNode->slowCount);
}
}
@ -411,17 +467,24 @@ void syncLogRecvAppendEntriesReply(SSyncNode* pSyncNode, const SyncAppendEntries
void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool printX, int64_t timerElapsed,
int64_t execTime) {
if (sDebugFlag & DEBUG_TRACE) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (pMsg->timeStamp > 0) {
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
pBuf[0] = '\0';
}
}
if (printX) {
sNTrace(pSyncNode,
sHTrace(pSyncNode,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, x",
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp);
", ts:%s}, x",
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf);
} else {
sNTrace(pSyncNode,
sHTrace(pSyncNode,
"send sync-heartbeat to dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, timerElapsed,
execTime);
", ts:%s}, timer-elapsed:%" PRId64 ", next-exec:%" PRId64,
DID(&pMsg->destId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, timerElapsed, execTime);
}
}
}
@ -429,20 +492,36 @@ void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
pSyncNode->hbSlowNum++;
sNTrace(pSyncNode,
"recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
char pBuf[TD_TIME_STR_LEN] = {0};
if (pMsg->timeStamp > 0) {
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
pBuf[0] = '\0';
}
}
sNTrace(pSyncNode,
sHError(pSyncNode,
"recv sync-heartbeat from dnode:%d slow(%d ms) {term:%" PRId64 ", commit-index:%" PRId64
", min-match:%" PRId64 ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
DID(&pMsg->srcId), SYNC_HEARTBEAT_SLOW_MS, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s,
timeDiff);
} else {
if (sDebugFlag & DEBUG_TRACE) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (pMsg->timeStamp > 0) {
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
pBuf[0] = '\0';
}
}
sHTrace(pSyncNode,
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff);
}
}
}
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
sNTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s",
sHTrace(pSyncNode, "send sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s",
DID(&pMsg->destId), pMsg->term, pMsg->timeStamp, s);
}
@ -450,14 +529,29 @@ void syncLogRecvHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* p
if (timeDiff > SYNC_HEARTBEAT_REPLY_SLOW_MS) {
pSyncNode->hbrSlowNum++;
sNTrace(pSyncNode,
"recv sync-heartbeat-reply from dnode:%d slow {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
char pBuf[TD_TIME_STR_LEN] = {0};
if (pMsg->timeStamp > 0) {
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
pBuf[0] = '\0';
}
}
sNTrace(pSyncNode,
sHError(pSyncNode,
"recv sync-heartbeat-reply from dnode:%d slow(%d ms) {term:%" PRId64 ", ts:%s}, %s, net elapsed:%" PRId64,
DID(&pMsg->srcId), SYNC_HEARTBEAT_REPLY_SLOW_MS, pMsg->term, pBuf, s, timeDiff);
} else {
if (sDebugFlag & DEBUG_TRACE) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (pMsg->timeStamp > 0) {
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
pBuf[0] = '\0';
}
}
sHTrace(pSyncNode,
"recv sync-heartbeat-reply from dnode:%d {term:%" PRId64 ", ts:%" PRId64 "}, %s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->timeStamp, s, timeDiff);
}
}
}
void syncLogSendSyncSnapshotSend(SSyncNode* pSyncNode, const SyncSnapshotSend* pMsg, const char* s) {

View File

@ -37,7 +37,7 @@ int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
}
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype);
return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype); // vmGetQueueSize
}
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {

View File

@ -48,6 +48,7 @@ class TDTestCase:
tdSql.checkData(4,1,'%s:6430'%self.host)
tdSql.checkData(0,4,'ready')
tdSql.checkData(4,4,'ready')
time.sleep(1)
tdSql.query("select * from information_schema.ins_mnodes;")
tdSql.checkData(0,1,'%s:6030'%self.host)
tdSql.checkData(0,2,'leader')

View File

@ -42,6 +42,7 @@ class TDTestCase:
return buildPath
def check_setup_cluster_status(self):
time.sleep(1)
tdSql.query("select * from information_schema.ins_mnodes")
for mnode in tdSql.queryResult:
name = mnode[1]