From ddc3b100fe26c7eeed426e87be9e42e76e443bd5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Fri, 25 Mar 2022 20:43:58 +0800 Subject: [PATCH] fix --- source/dnode/mgmt/vnode/src/vmWorker.c | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/source/dnode/mgmt/vnode/src/vmWorker.c b/source/dnode/mgmt/vnode/src/vmWorker.c index ab1a45feb2..d736626a71 100644 --- a/source/dnode/mgmt/vnode/src/vmWorker.c +++ b/source/dnode/mgmt/vnode/src/vmWorker.c @@ -191,6 +191,10 @@ static int32_t vmPutNodeMsgToQueue(SVnodesMgmt *pMgmt, SNodeMsg *pMsg, EQueueTyp dTrace("msg:%p, will be written into vnode-sync queue", pMsg); code = taosWriteQitem(pVnode->pSyncQ, pMsg); break; + case MERGE_QUEUE: + dTrace("msg:%p, will be written into vnode-merge queue", pMsg); + code = taosWriteQitem(pVnode->pMergeQ, pMsg); + break; default: terrno = TSDB_CODE_INVALID_PARA; break; @@ -304,13 +308,13 @@ int32_t vmGetQueueSize(SMgmtWrapper *pWrapper, int32_t vgId, EQueueType qtype) { int32_t vmAllocQueue(SVnodesMgmt *pMgmt, SVnodeObj *pVnode) { pVnode->pWriteQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessWriteQueue); pVnode->pApplyQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); - pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->writePool, pVnode, (FItems)vmProcessApplyQueue); + pVnode->pMergeQ = tWWorkerAllocQueue(&pMgmt->mergePool, pVnode, (FItems)vmProcessMergeMsg); pVnode->pSyncQ = tWWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FItems)vmProcessSyncQueue); pVnode->pFetchQ = tQWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FItem)vmProcessFetchQueue); pVnode->pQueryQ = tQWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FItem)vmProcessQueryQueue); if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || - pVnode->pQueryQ == NULL || pVnode->pMergeQ) { + pVnode->pQueryQ == NULL || pVnode->pMergeQ == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -342,6 +346,7 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { int32_t maxQueryThreads = minQueryThreads; int32_t maxWriteThreads = TMAX(tsNumOfCores, 1); int32_t maxSyncThreads = TMAX(tsNumOfCores / 2, 1); + int32_t maxMergeThreads = 1; SQWorkerPool *pQPool = &pMgmt->queryPool; pQPool->name = "vnode-query"; @@ -365,6 +370,11 @@ int32_t vmStartWorker(SVnodesMgmt *pMgmt) { pWPool->max = maxSyncThreads; if (tWWorkerInit(pWPool) != 0) return -1; + pWPool = &pMgmt->mergePool; + pWPool->name = "vnode-merge"; + pWPool->max = maxMergeThreads; + if (tWWorkerInit(pWPool) != 0) return -1; + SSingleWorkerCfg cfg = { .minNum = 1, .maxNum = 1, .name = "vnode-mgmt", .fp = (FItem)vmProcessMgmtQueue, .param = pMgmt}; if (tSingleWorkerInit(&pMgmt->mgmtWorker, &cfg) != 0) { @@ -382,5 +392,6 @@ void vmStopWorker(SVnodesMgmt *pMgmt) { tQWorkerCleanup(&pMgmt->queryPool); tWWorkerCleanup(&pMgmt->writePool); tWWorkerCleanup(&pMgmt->syncPool); + tWWorkerCleanup(&pMgmt->mergePool); dDebug("vnode workers are closed"); }