diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 7947624451..34d37981ef 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -412,8 +412,12 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 2); if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1; +#if 0 tsNumOfVnodeFetchThreads = tsNumOfCores / 4; tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); +#else + tsNumOfVnodeFetchThreads = 1; +#endif if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1; tsNumOfVnodeWriteThreads = tsNumOfCores; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c index 90f852eed1..cbcb541200 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmFile.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmFile.c @@ -31,7 +31,7 @@ SVnodeObj **vmGetVnodeListFromHash(SVnodeMgmt *pMgmt, int32_t *numOfVnodes) { SVnodeObj *pVnode = *ppVnode; if (pVnode && num < size) { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); - // dTrace("vgId:%d, acquire vnode, refCount:%d", pVnode->vgId, refCount); + // dTrace("vgId:%d, acquire vnode list, ref:%d", pVnode->vgId, refCount); pVnodes[num++] = (*ppVnode); pIter = taosHashIterate(pMgmt->hash, pIter); } else { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 051e5defb0..037086ac02 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -23,6 +23,7 @@ SVnodeObj *vmAcquireVnode(SVnodeMgmt *pMgmt, int32_t vgId) { taosHashGetDup(pMgmt->hash, &vgId, sizeof(int32_t), (void *)&pVnode); if (pVnode == NULL || pVnode->dropped) { terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; + pVnode = NULL; } else { int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); // dTrace("vgId:%d, acquire vnode, ref:%d", pVnode->vgId, refCount); @@ -80,6 +81,8 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { taosThreadRwlockUnlock(&pMgmt->lock); vmReleaseVnode(pMgmt, pVnode); + + dTrace("vgId:%d, wait for vnode ref become 0", pVnode->vgId); while (pVnode->refCount > 0) taosMsleep(10); dTrace("vgId:%d, wait for vnode queue is empty", pVnode->vgId); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index e5b268a6a2..93f93b1ab7 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -206,9 +206,9 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype); if (code != 0) { dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); rpcFreeCont(pMsg->pCont); pRpc->pCont = NULL; + taosFreeQitem(pMsg); } return code; @@ -237,8 +237,8 @@ int32_t vmGetQueueSize(SVnodeMgmt *pMgmt, int32_t vgId, EQueueType qtype) { default: break; } + vmReleaseVnode(pMgmt, pVnode); } - vmReleaseVnode(pMgmt, pVnode); return size; } @@ -255,7 +255,11 @@ int32_t vmAllocQueue(SVnodeMgmt *pMgmt, SVnodeObj *pVnode) { return -1; } - dDebug("vgId:%d, queue is alloced", pVnode->vgId); + dDebug("vgId:%d, write-queue:%p is alloced", pVnode->vgId, pVnode->pWriteQ); + dDebug("vgId:%d, sync-queue:%p is alloced", pVnode->vgId, pVnode->pSyncQ); + dDebug("vgId:%d, apply-queue:%p is alloced", pVnode->vgId, pVnode->pApplyQ); + dDebug("vgId:%d, query-queue:%p is alloced", pVnode->vgId, pVnode->pQueryQ); + dDebug("vgId:%d, fetch-queue:%p is alloced", pVnode->vgId, pVnode->pFetchQ); return 0; }