fix/remove-get-queue-size-when-print-heartbeat-log

This commit is contained in:
dmchen 2025-02-28 11:52:27 +08:00
parent f6a472ae68
commit 1237e762c1
6 changed files with 53 additions and 45 deletions

View File

@ -39,7 +39,7 @@ typedef struct SVnodeMgmt {
SHashObj *runngingHash;
SHashObj *closedHash;
SHashObj *creatingHash;
TdThreadRwlock lock;
TdThreadRwlock hashLock;
TdThreadMutex mutex;
SVnodesStat state;
STfs *pTfs;

View File

@ -20,7 +20,7 @@
#define MAX_CONTENT_LEN 2 * 1024 * 1024
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->runngingHash);
@ -28,7 +28,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
size += closedSize;
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
if (pVnodes == NULL) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return terrno;
}
@ -60,7 +60,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
*numOfVnodes = num;
*ppVnodes = pVnodes;
@ -68,7 +68,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
}
int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->runngingHash);
@ -76,7 +76,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
size += creatingSize;
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
if (pVnodes == NULL) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return terrno;
}
@ -107,7 +107,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
taosHashCancelIterate(pMgmt->creatingHash, pIter);
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
*numOfVnodes = num;
*ppVnodes = pVnodes;
@ -116,13 +116,13 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
}
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t num = 0;
int32_t size = taosHashGetSize(pMgmt->runngingHash);
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
if (pVnodes == NULL) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return terrno;
}
@ -140,7 +140,7 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
*numOfVnodes = num;
*ppVnodes = pVnodes;

View File

@ -25,7 +25,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
tfsUpdateSize(pMgmt->pTfs);
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
while (pIter) {
@ -46,14 +46,14 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
}
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
if (!pInfo->pVloads) return;
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
while (pIter) {
@ -74,7 +74,7 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
}
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
@ -137,7 +137,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
dError("failed to get vgroup ids");
return;
}
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
for (int i = 0; i < list_size; i++) {
int32_t vgroup_id = vgroup_ids[i];
void *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
@ -148,7 +148,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
}
}
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
if (vgroup_ids) taosMemoryFree(vgroup_ids);
if (keys) taosMemoryFree(keys);
return;

View File

@ -24,12 +24,12 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
int32_t diskId = -1;
SVnodeObj *pVnode = NULL;
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode != NULL) {
diskId = pVnode->diskPrimary;
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return diskId;
}
@ -62,7 +62,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
pCreatingVnode->vgId = vgId;
pCreatingVnode->diskPrimary = diskId;
code = taosThreadRwlockWrlock(&pMgmt->lock);
code = taosThreadRwlockWrlock(&pMgmt->hashLock);
if (code != 0) {
taosMemoryFree(pCreatingVnode);
return code;
@ -75,7 +75,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
taosMemoryFree(pCreatingVnode);
}
int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
if (r != 0) {
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
}
@ -86,7 +86,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
SVnodeObj *pOld = NULL;
(void)taosThreadRwlockWrlock(&pMgmt->lock);
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
if (r != 0) {
dError("vgId:%d, failed to get vnode from creating Hash", vgId);
@ -96,7 +96,7 @@ static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
if (r != 0) {
dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
if (pOld) {
dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
@ -205,7 +205,7 @@ void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingS
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
SVnodeObj *pVnode = NULL;
(void)taosThreadRwlockRdlock(&pMgmt->lock);
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
@ -214,7 +214,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return pVnode;
}
@ -334,10 +334,10 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
pVnode->failed = 1;
}
(void)taosThreadRwlockWrlock(&pMgmt->lock);
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
int32_t code = vmRegisterRunningState(pMgmt, pVnode);
vmUnRegisterClosedState(pMgmt, pVnode);
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return code;
}
@ -350,15 +350,15 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
}
(void)taosThreadRwlockWrlock(&pMgmt->lock);
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
vmUnRegisterRunningState(pMgmt, pVnode->vgId);
if (keepClosed) {
if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
return;
};
}
(void)taosThreadRwlockUnlock(&pMgmt->lock);
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
vmReleaseVnode(pMgmt, pVnode);
@ -450,14 +450,14 @@ _closed:
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
int32_t r = 0;
r = taosThreadRwlockWrlock(&pMgmt->lock);
r = taosThreadRwlockWrlock(&pMgmt->hashLock);
if (r != 0) {
dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
}
if (r == 0) {
vmUnRegisterRunningState(pMgmt, vgId);
}
r = taosThreadRwlockUnlock(&pMgmt->lock);
r = taosThreadRwlockUnlock(&pMgmt->hashLock);
if (r != 0) {
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
}
@ -792,7 +792,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
vmCloseVnodes(pMgmt);
vmStopWorker(pMgmt);
vnodeCleanup();
(void)taosThreadRwlockDestroy(&pMgmt->lock);
(void)taosThreadRwlockDestroy(&pMgmt->hashLock);
(void)taosThreadMutexDestroy(&pMgmt->mutex);
(void)taosThreadMutexDestroy(&pMgmt->fileLock);
taosMemoryFree(pMgmt);
@ -880,7 +880,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
pMgmt->msgCb.mgmt = pMgmt;
code = taosThreadRwlockInit(&pMgmt->lock, NULL);
code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
if (code != 0) {
code = TAOS_SYSTEM_ERROR(errno);
goto _OVER;

View File

@ -215,7 +215,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // vnodeSyncGetSnapshotInfo
}
SyncIndex logLastIndex = SYNC_INDEX_INVALID;
@ -253,13 +253,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
va_end(argpointer);
int32_t aqItems = 0;
/*
if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); // vnodeApplyQueueItems
}
*/
// restore error code
terrno = errCode;
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); // vnodeSyncAppliedIndex
if (pNode != NULL) {
taosPrintLog(
@ -426,19 +428,25 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
}
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
char pBuf[TD_TIME_STR_LEN] = {0};
if (pMsg->timeStamp > 0) {
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
pBuf[0] = '\0';
}
}
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
pSyncNode->hbSlowNum++;
sNTrace(pSyncNode,
"recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
"recv sync-heartbeat from dnode:%d slow(%d ms) {term:%" PRId64 ", commit-index:%" PRId64
", min-match:%" PRId64 ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
DID(&pMsg->srcId), SYNC_HEARTBEAT_SLOW_MS, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff);
} else {
sNTrace(pSyncNode,
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff);
}
sNTrace(pSyncNode,
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
}
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {

View File

@ -37,7 +37,7 @@ int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
}
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype);
return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype); // vmGetQueueSize
}
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {