diff --git a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h index 85712d2797..c3b4bc0710 100644 --- a/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h +++ b/source/dnode/mgmt/mgmt_vnode/inc/vmInt.h @@ -34,12 +34,14 @@ typedef struct SVnodeMgmt { SAutoQWorkerPool streamPool; SWWorkerPool fetchPool; SSingleWorker mgmtWorker; + SSingleWorker mgmtMultiWorker; SHashObj *hash; TdThreadRwlock lock; SVnodesStat state; STfs *pTfs; TdThread thread; bool stop; + TdThreadMutex createLock; } SVnodeMgmt; typedef struct { @@ -69,6 +71,7 @@ typedef struct { STaosQueue *pQueryQ; STaosQueue *pStreamQ; STaosQueue *pFetchQ; + STaosQueue *pMultiMgmQ; } SVnodeObj; typedef struct { @@ -125,6 +128,7 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMergeQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); +int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index fbe1925e3f..0669956fb8 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -377,11 +377,14 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { goto _OVER; } + taosThreadMutexLock(&pMgmt->createLock); code = vmWriteVnodeListToFile(pMgmt); if (code != 0) { code = terrno != 0 ? terrno : code; + taosThreadMutexUnlock(&pMgmt->createLock); goto _OVER; } + taosThreadMutexUnlock(&pMgmt->createLock); _OVER: if (code != 0) { @@ -985,7 +988,7 @@ SArray *vmGetMsgHandles() { if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_TRIM, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_VND_S3MIGRATE, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER; - if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; + if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutMsgToMultiMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_ALTER_VNODE_TYPE, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; if (dmSetMgmtHandle(pArray, TDMT_DND_CHECK_VNODE_LEARNER_CATCHUP, vmPutMsgToMgmtQueue, 0) == NULL) goto _OVER; diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c index 296372a955..6f60e6adcd 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmInt.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmInt.c @@ -580,6 +580,7 @@ static int32_t vmInit(SMgmtInputOpt *pInput, SMgmtOutputOpt *pOutput) { pMgmt->msgCb.qsizeFp = (GetQueueSizeFp)vmGetQueueSize; pMgmt->msgCb.mgmt = pMgmt; taosThreadRwlockInit(&pMgmt->lock, NULL); + taosThreadMutexInit(&pMgmt->createLock, NULL); pMgmt->pTfs = pInput->pTfs; if (pMgmt->pTfs == NULL) { diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 8c1b33cb14..c8fe03164b 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -28,6 +28,31 @@ static inline void vmSendRsp(SRpcMsg *pMsg, int32_t code) { tmsgSendRsp(&rsp); } +static void vmProcessMultiMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { + SVnodeMgmt *pMgmt = pInfo->ahandle; + int32_t code = -1; + const STraceId *trace = &pMsg->info.traceId; + + dGTrace("msg:%p, get from vnode-multi-mgmt queue", pMsg); + switch (pMsg->msgType) { + case TDMT_DND_CREATE_VNODE: + code = vmProcessCreateVnodeReq(pMgmt, pMsg); + break; + } + + if (IsReq(pMsg)) { + if (code != 0) { + if (terrno != 0) code = terrno; + dGError("msg:%p, failed to process since %s, type:%s", pMsg, tstrerror(code), TMSG_INFO(pMsg->msgType)); + } + vmSendRsp(pMsg, code); + } + + dGTrace("msg:%p, is freed, code:0x%x", pMsg, code); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + static void vmProcessMgmtQueue(SQueueInfo *pInfo, SRpcMsg *pMsg) { SVnodeMgmt *pMgmt = pInfo->ahandle; int32_t code = -1; @@ -271,6 +296,13 @@ int32_t vmPutMsgToFetchQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsg int32_t vmPutMsgToStreamQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return vmPutMsgToQueue(pMgmt, pMsg, STREAM_QUEUE); } +int32_t vmPutMsgToMultiMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { + const STraceId *trace = &pMsg->info.traceId; + dGTrace("msg:%p, put into vnode-multi-mgmt queue", pMsg); + taosWriteQitem(pMgmt->mgmtMultiWorker.queue, pMsg); + return 0; +} + int32_t vmPutMsgToMgmtQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { const STraceId *trace = &pMsg->info.traceId; dGTrace("msg:%p, put into vnode-mgmt queue", pMsg); @@ -415,6 +447,20 @@ int32_t vmStartWorker(SVnodeMgmt *pMgmt) { if (tSingleWorkerInit(&pMgmt->mgmtWorker, &mgmtCfg) != 0) return -1; + int32_t threadNum = 0; + if (tsNumOfCores == 0) { + threadNum = 2; + } else { + threadNum = tsNumOfCores; + } + SSingleWorkerCfg multiMgmtCfg = {.min = threadNum, + .max = threadNum, + .name = "vnode-multi-mgmt", + .fp = (FItem)vmProcessMultiMgmtQueue, + .param = pMgmt}; + + if (tSingleWorkerInit(&pMgmt->mgmtMultiWorker, &multiMgmtCfg) != 0) return -1; + dDebug("vnode workers are initialized"); return 0; }