From aa7cb880bb9b3c285f1eade915decf3d60d39a85 Mon Sep 17 00:00:00 2001 From: dmchen Date: Fri, 28 Feb 2025 11:52:27 +0800 Subject: [PATCH] fix/remove-get-queue-size-when-print-heartbeat-log --- source/dnode/mgmt/mgmt_vnode/inc/vmInt.h | 2 +- source/dnode/mgmt/mgmt_vnode/src/vmFile.c | 18 +++++------ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 12 ++++---- source/dnode/mgmt/mgmt_vnode/src/vmInt.c | 34 ++++++++++----------- source/libs/sync/src/syncUtil.c | 30 +++++++++++------- source/libs/transport/src/tmsgcb.c | 2 +- 6 files changed, 53 insertions(+), 45 deletions(-) diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 84f5149624..3d415e63c6 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -39,7 +39,7 @@ typedef struct SVnodeMgmt { SHashObj *runngingHash; SHashObj *closedHash; SHashObj *creatingHash; - TdThreadRwlock lock; + TdThreadRwlock hashLock; TdThreadMutex mutex; SVnodesStat state; STfs *pTfs; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index dbef048c23..cb14155b1c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -20,7 +20,7 @@ #define MAX_CONTENT_LEN 2 * 1024 * 1024 int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { - (void)taosThreadRwlockRdlock(&pMgmt->lock); + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); int32_t num = 0; int32_t size = taosHashGetSize(pMgmt->runngingHash); @@ -28,7 +28,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod size += closedSize; SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); if (pVnodes == NULL) { - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); return terrno; } @@ -60,7 +60,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod } } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); *numOfVnodes = num; *ppVnodes = pVnodes; @@ -68,7 +68,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod } int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { - (void)taosThreadRwlockRdlock(&pMgmt->lock); + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); int32_t num = 0; int32_t size = taosHashGetSize(pMgmt->runngingHash); @@ -76,7 +76,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV size += creatingSize; SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); if (pVnodes == NULL) { - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); return terrno; } @@ -107,7 +107,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV taosHashCancelIterate(pMgmt->creatingHash, pIter); } } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); *numOfVnodes = num; *ppVnodes = pVnodes; @@ -116,13 +116,13 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV } int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) { - (void)taosThreadRwlockRdlock(&pMgmt->lock); + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); int32_t num = 0; int32_t size = taosHashGetSize(pMgmt->runngingHash); SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *)); if (pVnodes == NULL) { - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); return terrno; } @@ -140,7 +140,7 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb } } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); *numOfVnodes = num; *ppVnodes = pVnodes; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 234d4f41e1..846b76d3ce 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -25,7 +25,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { tfsUpdateSize(pMgmt->pTfs); - (void)taosThreadRwlockRdlock(&pMgmt->lock); + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); void *pIter = taosHashIterate(pMgmt->runngingHash, NULL); while (pIter) { @@ -46,14 +46,14 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) { pIter = taosHashIterate(pMgmt->runngingHash, pIter); } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); } void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite)); if (!pInfo->pVloads) return; - (void)taosThreadRwlockRdlock(&pMgmt->lock); + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); void *pIter = taosHashIterate(pMgmt->runngingHash, NULL); while (pIter) { @@ -74,7 +74,7 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) { pIter = taosHashIterate(pMgmt->runngingHash, pIter); } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); } void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) { @@ -137,7 +137,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) { dError("failed to get vgroup ids"); return; } - (void)taosThreadRwlockRdlock(&pMgmt->lock); + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); for (int i = 0; i < list_size; i++) { int32_t vgroup_id = vgroup_ids[i]; void *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t)); @@ -148,7 +148,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) { } } } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); if (vgroup_ids) taosMemoryFree(vgroup_ids); if (keys) taosMemoryFree(keys); return; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index d71e0b02c4..f4925c348c 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -24,12 +24,12 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { int32_t diskId = -1; SVnodeObj *pVnode = NULL; - (void)taosThreadRwlockRdlock(&pMgmt->lock); + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode != NULL) { diskId = pVnode->diskPrimary; } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); return diskId; } @@ -62,7 +62,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t pCreatingVnode->vgId = vgId; pCreatingVnode->diskPrimary = diskId; - code = taosThreadRwlockWrlock(&pMgmt->lock); + code = taosThreadRwlockWrlock(&pMgmt->hashLock); if (code != 0) { taosMemoryFree(pCreatingVnode); return code; @@ -75,7 +75,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t taosMemoryFree(pCreatingVnode); } - int32_t r = taosThreadRwlockUnlock(&pMgmt->lock); + int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock); if (r != 0) { dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r)); } @@ -86,7 +86,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) { SVnodeObj *pOld = NULL; - (void)taosThreadRwlockWrlock(&pMgmt->lock); + (void)taosThreadRwlockWrlock(&pMgmt->hashLock); int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld); if (r != 0) { dError("vgId:%d, failed to get vnode from creating Hash", vgId); @@ -96,7 +96,7 @@ static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) { if (r != 0) { dError("vgId:%d, failed to remove vnode from creatingHash", vgId); } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); if (pOld) { dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld); @@ -205,7 +205,7 @@ void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingS SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) { SVnodeObj *pVnode = NULL; - (void)taosThreadRwlockRdlock(&pMgmt->lock); + (void)taosThreadRwlockRdlock(&pMgmt->hashLock); int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; @@ -214,7 +214,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount); } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); return pVnode; } @@ -334,10 +334,10 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) { pVnode->failed = 1; } - (void)taosThreadRwlockWrlock(&pMgmt->lock); + (void)taosThreadRwlockWrlock(&pMgmt->hashLock); int32_t code = vmRegisterRunningState(pMgmt, pVnode); vmUnRegisterClosedState(pMgmt, pVnode); - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); return code; } @@ -350,15 +350,15 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal, vnodeProposeCommitOnNeed(pVnode->pImpl, atExit); } - (void)taosThreadRwlockWrlock(&pMgmt->lock); + (void)taosThreadRwlockWrlock(&pMgmt->hashLock); vmUnRegisterRunningState(pMgmt, pVnode->vgId); if (keepClosed) { if (vmRegisterClosedState(pMgmt, pVnode) != 0) { - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); return; }; } - (void)taosThreadRwlockUnlock(&pMgmt->lock); + (void)taosThreadRwlockUnlock(&pMgmt->hashLock); vmReleaseVnode(pMgmt, pVnode); @@ -450,14 +450,14 @@ _closed: void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) { int32_t r = 0; - r = taosThreadRwlockWrlock(&pMgmt->lock); + r = taosThreadRwlockWrlock(&pMgmt->hashLock); if (r != 0) { dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r)); } if (r == 0) { vmUnRegisterRunningState(pMgmt, vgId); } - r = taosThreadRwlockUnlock(&pMgmt->lock); + r = taosThreadRwlockUnlock(&pMgmt->hashLock); if (r != 0) { dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r)); } @@ -792,7 +792,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) { vmCloseVnodes(pMgmt); vmStopWorker(pMgmt); vnodeCleanup(); - (void)taosThreadRwlockDestroy(&pMgmt->lock); + (void)taosThreadRwlockDestroy(&pMgmt->hashLock); (void)taosThreadMutexDestroy(&pMgmt->mutex); (void)taosThreadMutexDestroy(&pMgmt->fileLock); taosMemoryFree(pMgmt); @@ -880,7 +880,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.mgmt = pMgmt; - code = taosThreadRwlockInit(&pMgmt->lock, NULL); + code = taosThreadRwlockInit(&pMgmt->hashLock, NULL); if (code != 0) { code = TAOS_SYSTEM_ERROR(errno); goto _OVER; diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 5a2571a291..44e8d97aca 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -215,7 +215,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) { - (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); + (void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // vnodeSyncGetSnapshotInfo } SyncIndex logLastIndex = SYNC_INDEX_INVALID; @@ -253,13 +253,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo va_end(argpointer); int32_t aqItems = 0; + /* if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) { - aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); + aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); // vnodeApplyQueueItems } + */ // restore error code terrno = errCode; - SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); + SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); // vnodeSyncAppliedIndex if (pNode != NULL) { taosPrintLog( @@ -426,19 +428,25 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool } void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) { + char pBuf[TD_TIME_STR_LEN] = {0}; + if (pMsg->timeStamp > 0) { + if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) { + pBuf[0] = '\0'; + } + } if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) { pSyncNode->hbSlowNum++; sNTrace(pSyncNode, - "recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 - ", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64, - DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff); + "recv sync-heartbeat from dnode:%d slow(%d ms) {term:%" PRId64 ", commit-index:%" PRId64 + ", min-match:%" PRId64 ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms", + DID(&pMsg->srcId), SYNC_HEARTBEAT_SLOW_MS, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff); + } else { + sNTrace(pSyncNode, + "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 + ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms", + DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff); } - - sNTrace(pSyncNode, - "recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64 - ", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64, - DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff); } void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) { diff --git a/source/libs/transport/src/tmsgcb.c b/source/libs/transport/src/tmsgcb.c index e87011f097..0632e21d20 100644 --- a/source/libs/transport/src/tmsgcb.c +++ b/source/libs/transport/src/tmsgcb.c @@ -37,7 +37,7 @@ int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) { } int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) { - return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype); + return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype); // vmGetQueueSize } int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {