diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index 61b220158f..15be59a419 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -42,18 +42,13 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) { } static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) { + if (pBnode == NULL) return; + SBnodeMgmt *pMgmt = &pDnode->bmgmt; - int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); - if (pBnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pBnode != NULL) { - dTrace("release bnode, refCount:%d", refCount); - } + dTrace("release bnode, refCount:%d", refCount); } static int32_t dndReadBnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index ae37967b3d..6c23af7f00 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -43,18 +43,13 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { } static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { + if (pMnode == NULL) return; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); - if (pMnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pMnode != NULL) { - dTrace("release mnode, refCount:%d", refCount); - } + dTrace("release mnode, refCount:%d", refCount); } static int32_t dndReadMnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 3deee93e29..9d2f623c45 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -42,18 +42,13 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) { } static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) { + if (pQnode == NULL) return; + SQnodeMgmt *pMgmt = &pDnode->qmgmt; - int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); - if (pQnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pQnode != NULL) { - dTrace("release qnode, refCount:%d", refCount); - } + dTrace("release qnode, refCount:%d", refCount); } static int32_t dndReadQnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index ab4e38bfb2..00435d4c3e 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -42,18 +42,13 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) { } static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) { + if (pSnode == NULL) return; + SSnodeMgmt *pMgmt = &pDnode->smgmt; - int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); - if (pSnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pSnode != NULL) { - dTrace("release snode, refCount:%d", refCount); - } + dTrace("release snode, refCount:%d", refCount); } static int32_t dndReadSnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 1269562968..6d90da8b31 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -40,7 +40,7 @@ typedef struct { STaosQueue *pSyncQ; STaosQueue *pApplyQ; STaosQueue *pQueryQ; - STaosQueue* pFetchQ; + STaosQueue *pFetchQ; } SVnodeObj; typedef struct { @@ -53,22 +53,8 @@ typedef struct { SWrapperCfg *pCfgs; } SVnodeThread; -static int32_t dndInitVnodeReadWorker(SDnode *pDnode); -static int32_t dndInitVnodeWriteWorker(SDnode *pDnode); -static int32_t dndInitVnodeSyncWorker(SDnode *pDnode); -static void dndCleanupVnodeReadWorker(SDnode *pDnode); -static void dndCleanupVnodeWriteWorker(SDnode *pDnode); -static void dndCleanupVnodeSyncWorker(SDnode *pDnode); -static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); +static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode); +static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); @@ -117,11 +103,9 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { if (pVnode == NULL) return; SVnodesMgmt *pMgmt = &pDnode->vmgmt; - taosRLockLatch(&pMgmt->latch); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } @@ -134,7 +118,7 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { } pVnode->vgId = pCfg->vgId; - pVnode->refCount = 1; + pVnode->refCount = 0; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; @@ -148,23 +132,8 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeFetchQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeWriteQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeApplyQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeSyncQueue(pDnode, pVnode) != 0) { + if (dndAllocVnodeQueue(pDnode, pVnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -192,12 +161,7 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - dndFreeVnodeQueryQueue(pDnode, pVnode); - dndFreeVnodeFetchQueue(pDnode, pVnode); - dndFreeVnodeWriteQueue(pDnode, pVnode); - dndFreeVnodeApplyQueue(pDnode, pVnode); - dndFreeVnodeSyncQueue(pDnode, pVnode); - + dndFreeVnodeQueue(pDnode, pVnode); vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; @@ -527,8 +491,8 @@ static void dndCloseVnodes(SDnode *pDnode) { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { - SCreateVnodeReq *pCreate = rpcMsg->pCont; +static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) { + SCreateVnodeReq *pCreate = pReq->pCont; pCreate->vgId = htonl(pCreate->vgId); pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dbUid = htobe64(pCreate->dbUid); @@ -585,14 +549,14 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra pCfg->vgVersion = pCreate->vgVersion; } -static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { - SDropVnodeReq *pDrop = rpcMsg->pCont; +static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *pReq) { + SDropVnodeReq *pDrop = pReq->pCont; pDrop->vgId = htonl(pDrop->vgId); return pDrop; } -static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { - SAuthVnodeReq *pAuth = rpcMsg->pCont; +static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *pReq) { + SAuthVnodeReq *pAuth = pReq->pCont; pAuth->vgId = htonl(pAuth->vgId); return pAuth; } @@ -612,7 +576,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("vgId:%d, already exist, return success", pCreate->vgId); dndReleaseVnode(pDnode, pVnode); terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED; - return -1; + return 0; } SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); @@ -648,16 +612,13 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SVnodeCfg vnodeCfg = {0}; dndGenerateVnodeCfg(pAlter, &vnodeCfg); - SWrapperCfg wrapperCfg = {0}; - dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg); - SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); return -1; } - if (wrapperCfg.vgVersion == pVnode->vgVersion) { + if (pAlter->vgVersion == pVnode->vgVersion) { dndReleaseVnode(pDnode, pVnode); dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId); return 0; @@ -670,7 +631,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } int32_t oldVersion = pVnode->vgVersion; - pVnode->vgVersion = wrapperCfg.vgVersion; + pVnode->vgVersion = pAlter->vgVersion; int32_t code = dndWriteVnodesToFile(pDnode); if (code != 0) { pVnode->vgVersion = oldVersion; @@ -689,7 +650,7 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to drop since %s", vgId, terrstr()); - return -1; + return 0; } pVnode->dropped = 1; @@ -717,7 +678,7 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to auth since %s", vgId, terrstr()); - return terrno; + return -1; } pVnode->accessState = pAuth->accessState; @@ -821,6 +782,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_ for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); + // todo SRpcMsg *pRsp = NULL; (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); } @@ -832,6 +794,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); + // todo SRpcMsg *pRsp = NULL; (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); } @@ -855,21 +818,25 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) } if (code != TSDB_CODE_SUCCESS) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; - rpcSendResponse(&rsp); + if (pRpcMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + } rpcFreeCont(pRpcMsg->pCont); } } static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { - SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); if (pVnode == NULL) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; - rpcSendResponse(&rsp); + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; + rpcSendResponse(&rsp); + } rpcFreeCont(pMsg->pCont); } @@ -910,193 +877,96 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); - if (pVnode == NULL) { - return -1; - } + if (pVnode == NULL) return -1; int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); dndReleaseVnode(pDnode, pVnode); return code; } -static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); - if (pVnode->pQueryQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - pVnode->pQueryQ = NULL; -} - -static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); - if (pVnode->pFetchQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); - pVnode->pFetchQ = NULL; -} - -static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { +static int32_t dndInitVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; int32_t maxFetchThreads = 4; - float threadsForQuery = MAX(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores, 1); + int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores); + int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1); + int32_t maxQueryThreads = minQueryThreads; + int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1); + int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1); SWorkerPool *pPool = &pMgmt->queryPool; pPool->name = "vnode-query"; - pPool->min = (int32_t)threadsForQuery; - pPool->max = pPool->min; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + pPool->min = minQueryThreads; + pPool->max = maxQueryThreads; + if (tWorkerInit(pPool) != 0) return -1; pPool = &pMgmt->fetchPool; pPool->name = "vnode-fetch"; - pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores); - pPool->max = pPool->min; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + pPool->min = minFetchThreads; + pPool->max = maxFetchThreads; + if (tWorkerInit(pPool) != 0) return -1; - dDebug("vnode read worker is initialized"); + SMWorkerPool *pMPool = &pMgmt->writePool; + pMPool->name = "vnode-write"; + pMPool->max = maxWriteThreads; + if (tMWorkerInit(pMPool) != 0) return -1; + + pMPool = &pMgmt->syncPool; + pMPool->name = "vnode-sync"; + pMPool->max = maxSyncThreads; + if (tMWorkerInit(pMPool) != 0) return -1; + + dDebug("vnode workers is initialized"); return 0; } -static void dndCleanupVnodeReadWorker(SDnode *pDnode) { +static void dndCleanupVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tWorkerCleanup(&pMgmt->fetchPool); tWorkerCleanup(&pMgmt->queryPool); - dDebug("vnode close worker is initialized"); -} - -static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); - if (pVnode->pWriteQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); - pVnode->pWriteQ = NULL; -} - -static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); - if (pVnode->pApplyQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); - pVnode->pApplyQ = NULL; -} - -static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SMWorkerPool *pPool = &pMgmt->writePool; - pPool->name = "vnode-write"; - pPool->max = pDnode->opt.numOfCores; - if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("vnode write worker is initialized"); - return 0; -} - -static void dndCleanupVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; tMWorkerCleanup(&pMgmt->writePool); - dDebug("vnode write worker is closed"); -} - -static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); - if (pVnode->pSyncQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); - pVnode->pSyncQ = NULL; -} - -static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { - int32_t maxThreads = pDnode->opt.numOfCores / 2; - if (maxThreads < 1) maxThreads = 1; - - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SMWorkerPool *pPool = &pMgmt->syncPool; - pPool->name = "vnode-sync"; - pPool->max = maxThreads; - if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("vnode sync worker is initialized"); - return 0; -} - -static void dndCleanupVnodeSyncWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; tMWorkerCleanup(&pMgmt->syncPool); - dDebug("vnode sync worker is closed"); + dDebug("vnode workers is closed"); +} + +static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { + SVnodesMgmt *pMgmt = &pDnode->vmgmt; + + pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); + pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); + pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); + pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); + pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); + + if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || + pVnode->pQueryQ == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { + SVnodesMgmt *pMgmt = &pDnode->vmgmt; + tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); + tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); + pVnode->pWriteQ = NULL; + pVnode->pApplyQ = NULL; + pVnode->pSyncQ = NULL; + pVnode->pFetchQ = NULL; + pVnode->pQueryQ = NULL; } int32_t dndInitVnodes(SDnode *pDnode) { dInfo("dnode-vnodes start to init"); - if (dndInitVnodeReadWorker(pDnode) != 0) { - dError("failed to init vnodes read worker since %s", terrstr()); - return -1; - } - - if (dndInitVnodeWriteWorker(pDnode) != 0) { - dError("failed to init vnodes write worker since %s", terrstr()); - return -1; - } - - if (dndInitVnodeSyncWorker(pDnode) != 0) { - dError("failed to init vnodes sync worker since %s", terrstr()); + if (dndInitVnodeWorkers(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + dError("failed to init vnode workers since %s", terrstr()); return -1; } @@ -1112,9 +982,7 @@ int32_t dndInitVnodes(SDnode *pDnode) { void dndCleanupVnodes(SDnode *pDnode) { dInfo("dnode-vnodes start to clean up"); dndCloseVnodes(pDnode); - dndCleanupVnodeReadWorker(pDnode); - dndCleanupVnodeWriteWorker(pDnode); - dndCleanupVnodeSyncWorker(pDnode); + dndCleanupVnodeWorkers(pDnode); dInfo("dnode-vnodes is cleaned up"); } diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index c87749203a..88175646ae 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -20,12 +20,13 @@ static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfgInput) { +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - SVnodeCfg *pVnodeCfg = &defaultVnodeOptions; - pVnodeCfg->vgId = pVnodeCfg->vgId; + if (pVnodeCfg == NULL) { + pVnodeCfg = &defaultVnodeOptions; + } // Validate options if (vnodeValidateOptions(pVnodeCfg) < 0) {