fix/remove-get-queue-size-when-print-heartbeat-log
This commit is contained in:
parent
4dd0622594
commit
aa7cb880bb
|
@ -39,7 +39,7 @@ typedef struct SVnodeMgmt {
|
||||||
SHashObj *runngingHash;
|
SHashObj *runngingHash;
|
||||||
SHashObj *closedHash;
|
SHashObj *closedHash;
|
||||||
SHashObj *creatingHash;
|
SHashObj *creatingHash;
|
||||||
TdThreadRwlock lock;
|
TdThreadRwlock hashLock;
|
||||||
TdThreadMutex mutex;
|
TdThreadMutex mutex;
|
||||||
SVnodesStat state;
|
SVnodesStat state;
|
||||||
STfs *pTfs;
|
STfs *pTfs;
|
||||||
|
|
|
@ -20,7 +20,7 @@
|
||||||
#define MAX_CONTENT_LEN 2 * 1024 * 1024
|
#define MAX_CONTENT_LEN 2 * 1024 * 1024
|
||||||
|
|
||||||
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
||||||
|
@ -28,7 +28,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
|
||||||
size += closedSize;
|
size += closedSize;
|
||||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||||
if (pVnodes == NULL) {
|
if (pVnodes == NULL) {
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,7 +60,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
*numOfVnodes = num;
|
*numOfVnodes = num;
|
||||||
*ppVnodes = pVnodes;
|
*ppVnodes = pVnodes;
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ int32_t vmGetAllVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnod
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
||||||
|
@ -76,7 +76,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
|
||||||
size += creatingSize;
|
size += creatingSize;
|
||||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||||
if (pVnodes == NULL) {
|
if (pVnodes == NULL) {
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +107,7 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
|
||||||
taosHashCancelIterate(pMgmt->creatingHash, pIter);
|
taosHashCancelIterate(pMgmt->creatingHash, pIter);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
*numOfVnodes = num;
|
*numOfVnodes = num;
|
||||||
*ppVnodes = pVnodes;
|
*ppVnodes = pVnodes;
|
||||||
|
@ -116,13 +116,13 @@ int32_t vmGetAllVnodeListFromHashWithCreating(SVnodeMgmt *pMgmt, int32_t *numOfV
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeObj ***ppVnodes) {
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
int32_t size = taosHashGetSize(pMgmt->runngingHash);
|
||||||
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
SVnodeObj **pVnodes = taosMemoryCalloc(size, sizeof(SVnodeObj *));
|
||||||
if (pVnodes == NULL) {
|
if (pVnodes == NULL) {
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
return terrno;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,7 +140,7 @@ int32_t vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes, SVnodeOb
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
*numOfVnodes = num;
|
*numOfVnodes = num;
|
||||||
*ppVnodes = pVnodes;
|
*ppVnodes = pVnodes;
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
|
||||||
|
|
||||||
tfsUpdateSize(pMgmt->pTfs);
|
tfsUpdateSize(pMgmt->pTfs);
|
||||||
|
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
|
@ -46,14 +46,14 @@ void vmGetVnodeLoads(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo, bool isReset) {
|
||||||
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||||
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
|
pInfo->pVloads = taosArrayInit(pMgmt->state.totalVnodes, sizeof(SVnodeLoadLite));
|
||||||
if (!pInfo->pVloads) return;
|
if (!pInfo->pVloads) return;
|
||||||
|
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
void *pIter = taosHashIterate(pMgmt->runngingHash, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
|
@ -74,7 +74,7 @@ void vmGetVnodeLoadsLite(SVnodeMgmt *pMgmt, SMonVloadInfo *pInfo) {
|
||||||
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
pIter = taosHashIterate(pMgmt->runngingHash, pIter);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
void vmGetMonitorInfo(SVnodeMgmt *pMgmt, SMonVmInfo *pInfo) {
|
||||||
|
@ -137,7 +137,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
|
||||||
dError("failed to get vgroup ids");
|
dError("failed to get vgroup ids");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
|
||||||
for (int i = 0; i < list_size; i++) {
|
for (int i = 0; i < list_size; i++) {
|
||||||
int32_t vgroup_id = vgroup_ids[i];
|
int32_t vgroup_id = vgroup_ids[i];
|
||||||
void *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
|
void *vnode = taosHashGet(pMgmt->runngingHash, &vgroup_id, sizeof(int32_t));
|
||||||
|
@ -148,7 +148,7 @@ void vmCleanExpriedSamples(SVnodeMgmt *pMgmt) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
if (vgroup_ids) taosMemoryFree(vgroup_ids);
|
if (vgroup_ids) taosMemoryFree(vgroup_ids);
|
||||||
if (keys) taosMemoryFree(keys);
|
if (keys) taosMemoryFree(keys);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -24,12 +24,12 @@ int32_t vmGetPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||||
int32_t diskId = -1;
|
int32_t diskId = -1;
|
||||||
SVnodeObj *pVnode = NULL;
|
SVnodeObj *pVnode = NULL;
|
||||||
|
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
|
||||||
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||||
if (pVnode != NULL) {
|
if (pVnode != NULL) {
|
||||||
diskId = pVnode->diskPrimary;
|
diskId = pVnode->diskPrimary;
|
||||||
}
|
}
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
return diskId;
|
return diskId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -62,7 +62,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
|
||||||
pCreatingVnode->vgId = vgId;
|
pCreatingVnode->vgId = vgId;
|
||||||
pCreatingVnode->diskPrimary = diskId;
|
pCreatingVnode->diskPrimary = diskId;
|
||||||
|
|
||||||
code = taosThreadRwlockWrlock(&pMgmt->lock);
|
code = taosThreadRwlockWrlock(&pMgmt->hashLock);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
taosMemoryFree(pCreatingVnode);
|
taosMemoryFree(pCreatingVnode);
|
||||||
return code;
|
return code;
|
||||||
|
@ -75,7 +75,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
|
||||||
taosMemoryFree(pCreatingVnode);
|
taosMemoryFree(pCreatingVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t r = taosThreadRwlockUnlock(&pMgmt->lock);
|
int32_t r = taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
|
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
|
||||||
}
|
}
|
||||||
|
@ -86,7 +86,7 @@ static int32_t vmRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId, int32_t
|
||||||
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
|
static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||||
SVnodeObj *pOld = NULL;
|
SVnodeObj *pOld = NULL;
|
||||||
|
|
||||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
|
||||||
int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
|
int32_t r = taosHashGetDup(pMgmt->creatingHash, &vgId, sizeof(int32_t), (void *)&pOld);
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
dError("vgId:%d, failed to get vnode from creating Hash", vgId);
|
dError("vgId:%d, failed to get vnode from creating Hash", vgId);
|
||||||
|
@ -96,7 +96,7 @@ static void vmUnRegisterCreatingState(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
|
dError("vgId:%d, failed to remove vnode from creatingHash", vgId);
|
||||||
}
|
}
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
if (pOld) {
|
if (pOld) {
|
||||||
dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
|
dTrace("vgId:%d, free vnode pOld:%p", vgId, &pOld);
|
||||||
|
@ -205,7 +205,7 @@ void vmCleanPrimaryDisk(SVnodeMgmt *pMgmt, int32_t vgId) { vmUnRegisterCreatingS
|
||||||
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
|
SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
|
||||||
SVnodeObj *pVnode = NULL;
|
SVnodeObj *pVnode = NULL;
|
||||||
|
|
||||||
(void)taosThreadRwlockRdlock(&pMgmt->lock);
|
(void)taosThreadRwlockRdlock(&pMgmt->hashLock);
|
||||||
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
int32_t r = taosHashGetDup(pMgmt->runngingHash, &vgId, sizeof(int32_t), (void *)&pVnode);
|
||||||
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
|
if (pVnode == NULL || strict && (pVnode->dropped || pVnode->failed)) {
|
||||||
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
|
||||||
|
@ -214,7 +214,7 @@ SVnodeObj *vmAcquireVnodeImpl(SVnodeMgmt *pMgmt, int32_t vgId, bool strict) {
|
||||||
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
|
||||||
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
dTrace("vgId:%d, acquire vnode, vnode:%p, ref:%d", pVnode->vgId, pVnode, refCount);
|
||||||
}
|
}
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
return pVnode;
|
return pVnode;
|
||||||
}
|
}
|
||||||
|
@ -334,10 +334,10 @@ int32_t vmOpenVnode(SVnodeMgmt *pMgmt, SWrapperCfg *pCfg, SVnode *pImpl) {
|
||||||
pVnode->failed = 1;
|
pVnode->failed = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
|
||||||
int32_t code = vmRegisterRunningState(pMgmt, pVnode);
|
int32_t code = vmRegisterRunningState(pMgmt, pVnode);
|
||||||
vmUnRegisterClosedState(pMgmt, pVnode);
|
vmUnRegisterClosedState(pMgmt, pVnode);
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
@ -350,15 +350,15 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal,
|
||||||
vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
|
vnodeProposeCommitOnNeed(pVnode->pImpl, atExit);
|
||||||
}
|
}
|
||||||
|
|
||||||
(void)taosThreadRwlockWrlock(&pMgmt->lock);
|
(void)taosThreadRwlockWrlock(&pMgmt->hashLock);
|
||||||
vmUnRegisterRunningState(pMgmt, pVnode->vgId);
|
vmUnRegisterRunningState(pMgmt, pVnode->vgId);
|
||||||
if (keepClosed) {
|
if (keepClosed) {
|
||||||
if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
|
if (vmRegisterClosedState(pMgmt, pVnode) != 0) {
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
(void)taosThreadRwlockUnlock(&pMgmt->lock);
|
(void)taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
|
|
||||||
vmReleaseVnode(pMgmt, pVnode);
|
vmReleaseVnode(pMgmt, pVnode);
|
||||||
|
|
||||||
|
@ -450,14 +450,14 @@ _closed:
|
||||||
|
|
||||||
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
|
void vmCloseFailedVnode(SVnodeMgmt *pMgmt, int32_t vgId) {
|
||||||
int32_t r = 0;
|
int32_t r = 0;
|
||||||
r = taosThreadRwlockWrlock(&pMgmt->lock);
|
r = taosThreadRwlockWrlock(&pMgmt->hashLock);
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
|
dError("vgId:%d, failed to lock since %s", vgId, tstrerror(r));
|
||||||
}
|
}
|
||||||
if (r == 0) {
|
if (r == 0) {
|
||||||
vmUnRegisterRunningState(pMgmt, vgId);
|
vmUnRegisterRunningState(pMgmt, vgId);
|
||||||
}
|
}
|
||||||
r = taosThreadRwlockUnlock(&pMgmt->lock);
|
r = taosThreadRwlockUnlock(&pMgmt->hashLock);
|
||||||
if (r != 0) {
|
if (r != 0) {
|
||||||
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
|
dError("vgId:%d, failed to unlock since %s", vgId, tstrerror(r));
|
||||||
}
|
}
|
||||||
|
@ -792,7 +792,7 @@ static void vmCleanup(SVnodeMgmt *pMgmt) {
|
||||||
vmCloseVnodes(pMgmt);
|
vmCloseVnodes(pMgmt);
|
||||||
vmStopWorker(pMgmt);
|
vmStopWorker(pMgmt);
|
||||||
vnodeCleanup();
|
vnodeCleanup();
|
||||||
(void)taosThreadRwlockDestroy(&pMgmt->lock);
|
(void)taosThreadRwlockDestroy(&pMgmt->hashLock);
|
||||||
(void)taosThreadMutexDestroy(&pMgmt->mutex);
|
(void)taosThreadMutexDestroy(&pMgmt->mutex);
|
||||||
(void)taosThreadMutexDestroy(&pMgmt->fileLock);
|
(void)taosThreadMutexDestroy(&pMgmt->fileLock);
|
||||||
taosMemoryFree(pMgmt);
|
taosMemoryFree(pMgmt);
|
||||||
|
@ -880,7 +880,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) {
|
||||||
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
|
pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize;
|
||||||
pMgmt->msgCb.mgmt = pMgmt;
|
pMgmt->msgCb.mgmt = pMgmt;
|
||||||
|
|
||||||
code = taosThreadRwlockInit(&pMgmt->lock, NULL);
|
code = taosThreadRwlockInit(&pMgmt->hashLock, NULL);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
code = TAOS_SYSTEM_ERROR(errno);
|
code = TAOS_SYSTEM_ERROR(errno);
|
||||||
goto _OVER;
|
goto _OVER;
|
||||||
|
|
|
@ -215,7 +215,7 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
|
||||||
|
|
||||||
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0};
|
||||||
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
if (pNode->pFsm != NULL && pNode->pFsm->FpGetSnapshotInfo != NULL) {
|
||||||
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot);
|
(void)pNode->pFsm->FpGetSnapshotInfo(pNode->pFsm, &snapshot); // vnodeSyncGetSnapshotInfo
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex logLastIndex = SYNC_INDEX_INVALID;
|
SyncIndex logLastIndex = SYNC_INDEX_INVALID;
|
||||||
|
@ -253,13 +253,15 @@ void syncPrintNodeLog(const char* flags, ELogLevel level, int32_t dflag, bool fo
|
||||||
va_end(argpointer);
|
va_end(argpointer);
|
||||||
|
|
||||||
int32_t aqItems = 0;
|
int32_t aqItems = 0;
|
||||||
|
/*
|
||||||
if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
|
if (pNode != NULL && pNode->pFsm != NULL && pNode->pFsm->FpApplyQueueItems != NULL) {
|
||||||
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm);
|
aqItems = pNode->pFsm->FpApplyQueueItems(pNode->pFsm); // vnodeApplyQueueItems
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
// restore error code
|
// restore error code
|
||||||
terrno = errCode;
|
terrno = errCode;
|
||||||
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm);
|
SyncIndex appliedIndex = pNode->pFsm->FpAppliedIndexCb(pNode->pFsm); // vnodeSyncAppliedIndex
|
||||||
|
|
||||||
if (pNode != NULL) {
|
if (pNode != NULL) {
|
||||||
taosPrintLog(
|
taosPrintLog(
|
||||||
|
@ -426,19 +428,25 @@ void syncLogSendHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, bool
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
|
void syncLogRecvHeartbeat(SSyncNode* pSyncNode, const SyncHeartbeat* pMsg, int64_t timeDiff, const char* s) {
|
||||||
|
char pBuf[TD_TIME_STR_LEN] = {0};
|
||||||
|
if (pMsg->timeStamp > 0) {
|
||||||
|
if (formatTimestampLocal(pBuf, pMsg->timeStamp, TSDB_TIME_PRECISION_MILLI) == NULL) {
|
||||||
|
pBuf[0] = '\0';
|
||||||
|
}
|
||||||
|
}
|
||||||
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
|
if (timeDiff > SYNC_HEARTBEAT_SLOW_MS) {
|
||||||
pSyncNode->hbSlowNum++;
|
pSyncNode->hbSlowNum++;
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
sNTrace(pSyncNode,
|
||||||
"recv sync-heartbeat from dnode:%d slow {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
"recv sync-heartbeat from dnode:%d slow(%d ms) {term:%" PRId64 ", commit-index:%" PRId64
|
||||||
", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
|
", min-match:%" PRId64 ", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
|
||||||
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
|
DID(&pMsg->srcId), SYNC_HEARTBEAT_SLOW_MS, pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff);
|
||||||
|
} else {
|
||||||
|
sNTrace(pSyncNode,
|
||||||
|
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
||||||
|
", ts:%s}, QID:%s, net elapsed:%" PRId64 "ms",
|
||||||
|
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pBuf, s, timeDiff);
|
||||||
}
|
}
|
||||||
|
|
||||||
sNTrace(pSyncNode,
|
|
||||||
"recv sync-heartbeat from dnode:%d {term:%" PRId64 ", commit-index:%" PRId64 ", min-match:%" PRId64
|
|
||||||
", ts:%" PRId64 "}, QID:%s, net elapsed:%" PRId64,
|
|
||||||
DID(&pMsg->srcId), pMsg->term, pMsg->commitIndex, pMsg->minMatchIndex, pMsg->timeStamp, s, timeDiff);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
void syncLogSendHeartbeatReply(SSyncNode* pSyncNode, const SyncHeartbeatReply* pMsg, const char* s) {
|
||||||
|
|
|
@ -37,7 +37,7 @@ int32_t tmsgPutToQueue(const SMsgCb* msgcb, EQueueType qtype, SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
|
int32_t tmsgGetQueueSize(const SMsgCb* msgcb, int32_t vgId, EQueueType qtype) {
|
||||||
return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype);
|
return (*msgcb->qsizeFp)(msgcb->mgmt, vgId, qtype); // vmGetQueueSize
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
int32_t tmsgSendReq(const SEpSet* epSet, SRpcMsg* pMsg) {
|
||||||
|
|
Loading…
Reference in New Issue