From 6160aa6e07e03124a8a1130284a44bf65e8d4631 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 26 Dec 2021 19:56:37 -0800 Subject: [PATCH] mgmt queue --- source/dnode/mgmt/impl/inc/dndDnode.h | 9 +- source/dnode/mgmt/impl/inc/dndInt.h | 28 ++--- source/dnode/mgmt/impl/inc/dndMnode.h | 5 +- source/dnode/mgmt/impl/src/dndDnode.c | 145 +++++++++++++++++----- source/dnode/mgmt/impl/src/dndMnode.c | 108 +--------------- source/dnode/mgmt/impl/src/dndTransport.c | 18 +-- 6 files changed, 152 insertions(+), 161 deletions(-) diff --git a/source/dnode/mgmt/impl/inc/dndDnode.h b/source/dnode/mgmt/impl/inc/dndDnode.h index e242334be5..a2015913a7 100644 --- a/source/dnode/mgmt/impl/inc/dndDnode.h +++ b/source/dnode/mgmt/impl/inc/dndDnode.h @@ -23,15 +23,16 @@ extern "C" { int32_t dndInitDnode(SDnode *pDnode); void dndCleanupDnode(SDnode *pDnode); -void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndGetDnodeId(SDnode *pDnode); int64_t dndGetClusterId(SDnode *pDnode); void dndGetDnodeEp(SDnode *pDnode, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort); void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet); -void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); -void dndSendStatusMsg(SDnode *pDnode); + +void dndSendRedirectMsg(SDnode *pDnode, SRpcMsg *pMsg); +void dndSendStatusMsg(SDnode *pDnode); +void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet); +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 032471c5c5..c352b37ef0 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -59,18 +59,20 @@ typedef struct { } SDnodeDir; typedef struct { - int32_t dnodeId; - int32_t dropped; - int64_t clusterId; - int64_t rebootTime; - int64_t updateTime; - int8_t statusSent; - SEpSet mnodeEpSet; - char *file; - SHashObj *dnodeHash; - SDnodeEps *dnodeEps; - pthread_t *threadId; - SRWLatch latch; + int32_t dnodeId; + int32_t dropped; + int64_t clusterId; + int64_t rebootTime; + int64_t updateTime; + int8_t statusSent; + SEpSet mnodeEpSet; + char *file; + SHashObj *dnodeHash; + SDnodeEps *dnodeEps; + pthread_t *threadId; + SRWLatch latch; + taos_queue pMgmtQ; + SWorkerPool mgmtPool; } SDnodeMgmt; typedef struct { @@ -86,8 +88,6 @@ typedef struct { taos_queue pReadQ; taos_queue pWriteQ; taos_queue pSyncQ; - taos_queue pMgmtQ; - SWorkerPool mgmtPool; SWorkerPool readPool; SWorkerPool writePool; SWorkerPool syncPool; diff --git a/source/dnode/mgmt/impl/inc/dndMnode.h b/source/dnode/mgmt/impl/inc/dndMnode.h index 67c51e51a8..0f7cec230e 100644 --- a/source/dnode/mgmt/impl/inc/dndMnode.h +++ b/source/dnode/mgmt/impl/inc/dndMnode.h @@ -23,11 +23,14 @@ extern "C" { int32_t dndInitMnode(SDnode *pDnode); void dndCleanupMnode(SDnode *pDnode); + int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey); -void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); +int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index 3362632b15..777d6f77d9 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -18,6 +18,21 @@ #include "dndTransport.h" #include "dndVnodes.h" +static int32_t dndInitMgmtWorker(SDnode *pDnode); +static void dndCleanupMgmtWorker(SDnode *pDnode); +static int32_t dndAllocMgmtQueue(SDnode *pDnode); +static void dndFreeMgmtQueue(SDnode *pDnode); +static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); + +static int32_t dndReadDnodes(SDnode *pDnode); +static int32_t dndWriteDnodes(SDnode *pDnode); +static void *dnodeThreadRoutine(void *param); + +static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg); +static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg); +static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg); +static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg); + int32_t dndGetDnodeId(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; taosRLockLatch(&pMgmt->latch); @@ -164,7 +179,7 @@ static int32_t dndReadDnodes(SDnode *pDnode) { int32_t code = TSDB_CODE_DND_DNODE_READ_FILE_ERROR; int32_t len = 0; - int32_t maxLen = 256 *1024; + int32_t maxLen = 256 * 1024; char *content = calloc(1, maxLen + 1); cJSON *root = NULL; FILE *fp = NULL; @@ -302,7 +317,7 @@ static int32_t dndWriteDnodes(SDnode *pDnode) { } int32_t len = 0; - int32_t maxLen = 256 *1024; + int32_t maxLen = 256 * 1024; char *content = calloc(1, maxLen + 1); len += snprintf(content + len, maxLen - len, "{\n"); @@ -409,13 +424,9 @@ static void dndUpdateDnodeEps(SDnode *pDnode, SDnodeEps *pDnodeEps) { taosWUnLockLatch(&pMgmt->latch); } -static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { +static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; - if (pEpSet && pEpSet->numOfEps > 0) { - dndUpdateMnodeEpSet(pDnode, pEpSet); - } - if (pMsg->code != TSDB_CODE_SUCCESS) { pMgmt->statusSent = 0; return; @@ -443,9 +454,9 @@ static void dndProcessStatusRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { pMgmt->statusSent = 0; } -static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } +static void dndProcessAuthRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); } -static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { assert(1); } +static void dndProcessGrantRsp(SDnode *pDnode, SRpcMsg *pMsg) { assert(1); } static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { dError("config msg is received, but not supported yet"); @@ -456,7 +467,7 @@ static void dndProcessConfigDnodeReq(SDnode *pDnode, SRpcMsg *pMsg) { rpcSendResponse(&rspMsg); } -static void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { +void dndProcessStartupReq(SDnode *pDnode, SRpcMsg *pMsg) { dDebug("startup msg is received"); SStartupMsg *pStartup = rpcMallocCont(sizeof(SStartupMsg)); @@ -490,6 +501,7 @@ int32_t dndInitDnode(SDnode *pDnode) { pMgmt->rebootTime = taosGetTimestampMs(); pMgmt->dropped = 0; pMgmt->clusterId = 0; + taosInitRWLatch(&pMgmt->latch); char path[PATH_MAX]; snprintf(path, PATH_MAX, "%s/dnode.json", pDnode->dir.dnode); @@ -511,7 +523,15 @@ int32_t dndInitDnode(SDnode *pDnode) { return -1; } - taosInitRWLatch(&pMgmt->latch); + if (dndInitMgmtWorker(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (dndAllocMgmtQueue(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } pMgmt->threadId = taosCreateThread(dnodeThreadRoutine, pDnode); if (pMgmt->threadId == NULL) { @@ -527,6 +547,9 @@ int32_t dndInitDnode(SDnode *pDnode) { void dndCleanupDnode(SDnode *pDnode) { SDnodeMgmt *pMgmt = &pDnode->dmgmt; + dndCleanupMgmtWorker(pDnode); + dndFreeMgmtQueue(pDnode); + if (pMgmt->threadId != NULL) { taosDestoryThread(pMgmt->threadId); pMgmt->threadId = NULL; @@ -553,39 +576,105 @@ void dndCleanupDnode(SDnode *pDnode) { dInfo("dnode-dnode is cleaned up"); } -void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { +static int32_t dndInitMgmtWorker(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + SWorkerPool *pPool = &pMgmt->mgmtPool; + pPool->name = "dnode-mgmt"; + pPool->min = 1; + pPool->max = 1; + if (tWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + dDebug("dnode mgmt worker is initialized"); + return 0; +} + +static void dndCleanupMgmtWorker(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + tWorkerCleanup(&pMgmt->mgmtPool); + dDebug("dnode mgmt worker is closed"); +} + +static int32_t dndAllocMgmtQueue(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMgmtQueue); + if (pMgmt->pMgmtQ == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + return 0; +} + +static void dndFreeMgmtQueue(SDnode *pDnode) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); + pMgmt->pMgmtQ = NULL; +} + +void dndProcessMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { + SDnodeMgmt *pMgmt = &pDnode->dmgmt; + + if (pEpSet && pEpSet->numOfEps > 0 && pRpcMsg->msgType == TDMT_MND_STATUS_RSP) { + dndUpdateMnodeEpSet(pDnode, pEpSet); + } + + SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); + if (pMsg != NULL) *pMsg = *pRpcMsg; + + if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { + if (pRpcMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pRpcMsg->pCont); + taosFreeQitem(pMsg); + } +} + +static void dndProcessMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { + int32_t code = 0; + switch (pMsg->msgType) { + case TDMT_DND_CREATE_MNODE: + code = dndProcessCreateMnodeReq(pDnode, pMsg); + break; + case TDMT_DND_ALTER_MNODE: + code = dndProcessAlterMnodeReq(pDnode, pMsg); + break; + case TDMT_DND_DROP_MNODE: + code = dndProcessDropMnodeReq(pDnode, pMsg); + break; case TDMT_DND_NETWORK_TEST: dndProcessStartupReq(pDnode, pMsg); break; case TDMT_DND_CONFIG_DNODE: dndProcessConfigDnodeReq(pDnode, pMsg); break; - default: - dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); - SRpcMsg rspMsg = {.handle = pMsg->handle, .code = TSDB_CODE_MSG_NOT_PROCESSED}; - rpcSendResponse(&rspMsg); - } - - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; -} - -void dndProcessDnodeRsp(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - switch (pMsg->msgType) { case TDMT_MND_STATUS_RSP: - dndProcessStatusRsp(pDnode, pMsg, pEpSet); + dndProcessStatusRsp(pDnode, pMsg); break; case TDMT_MND_AUTH_RSP: - dndProcessAuthRsp(pDnode, pMsg, pEpSet); + dndProcessAuthRsp(pDnode, pMsg); break; case TDMT_MND_GRANT_RSP: - dndProcessGrantRsp(pDnode, pMsg, pEpSet); + dndProcessGrantRsp(pDnode, pMsg); break; default: - dError("RPC %p, dnode rsp:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + code = -1; + dError("RPC %p, dnode req:%s not processed", pMsg->handle, TMSG_INFO(pMsg->msgType)); + break; + } + + if (pMsg->msgType & 1u) { + if (code != 0) code = terrno; + SRpcMsg rsp = {.code = code, .handle = pMsg->handle, .ahandle = pMsg->ahandle}; + rpcSendResponse(&rsp); } rpcFreeCont(pMsg->pCont); pMsg->pCont = NULL; + taosFreeQitem(pMsg); } diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 8fbb473af1..50da49d325 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -21,7 +21,6 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode); static int32_t dndInitMnodeWriteWorker(SDnode *pDnode); static int32_t dndInitMnodeSyncWorker(SDnode *pDnode); -static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode); static void dndCleanupMnodeReadWorker(SDnode *pDnode); static void dndCleanupMnodeWriteWorker(SDnode *pDnode); static void dndCleanupMnodeSyncWorker(SDnode *pDnode); @@ -29,7 +28,6 @@ static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); -static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode); static void dndFreeMnodeReadQueue(SDnode *pDnode); static void dndFreeMnodeWriteQueue(SDnode *pDnode); static void dndFreeMnodeSyncQueue(SDnode *pDnode); @@ -38,12 +36,10 @@ static void dndFreeMnodeMgmtQueue(SDnode *pDnode); static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg); static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t dndStartMnodeWorker(SDnode *pDnode); static void dndStopMnodeWorker(SDnode *pDnode); @@ -58,10 +54,6 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption); static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption); static int32_t dndDropMnode(SDnode *pDnode); -static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); -static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); - static SMnode *dndAcquireMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = NULL; @@ -488,7 +480,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { return pMsg; } -static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { +int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { @@ -504,7 +496,7 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } } -static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { +int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { @@ -524,7 +516,7 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { return dndWriteMnodeFile(pDnode); } -static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { +int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { SDropMnodeInMsg *pMsg = pRpcMsg->pCont; pMsg->dnodeId = htonl(pMsg->dnodeId); @@ -536,33 +528,6 @@ static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } } -static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { - int32_t code = 0; - - switch (pMsg->msgType) { - case TDMT_DND_CREATE_MNODE: - code = dndProcessCreateMnodeReq(pDnode, pMsg); - break; - case TDMT_DND_ALTER_MNODE: - code = dndProcessAlterMnodeReq(pDnode, pMsg); - break; - case TDMT_DND_DROP_MNODE: - code = dndProcessDropMnodeReq(pDnode, pMsg); - break; - default: - terrno = TSDB_CODE_MSG_NOT_PROCESSED; - code = -1; - break; - } - - if (pMsg->msgType & 1u) { - if (code != 0) code = terrno; - SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -622,25 +587,6 @@ static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMs return 0; } -void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); - - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); - if (pMsg != NULL) *pMsg = *pRpcMsg; - - if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { - if (pRpcMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pRpcMsg->pCont); - taosFreeQitem(pMsg); - } - - dndReleaseMnode(pDnode, pMnode); -} - void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); @@ -686,42 +632,6 @@ void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndReleaseMnode(pDnode, pMnode); } -static int32_t dndAllocMnodeMgmtQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pMgmtQ = tWorkerAllocQueue(&pMgmt->mgmtPool, pDnode, (FProcessItem)dndProcessMnodeMgmtQueue); - if (pMgmt->pMgmtQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - return 0; -} - -static void dndFreeMnodeMgmtQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->mgmtPool, pMgmt->pMgmtQ); - pMgmt->pMgmtQ = NULL; -} - -static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SWorkerPool *pPool = &pMgmt->mgmtPool; - pPool->name = "mnode-mgmt"; - pPool->min = 1; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("mnode mgmt worker is initialized"); - return 0; -} - -static void dndCleanupMnodeMgmtWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerCleanup(&pMgmt->mgmtPool); - dDebug("mnode mgmt worker is closed"); -} static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -842,16 +752,6 @@ int32_t dndInitMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosInitRWLatch(&pMgmt->latch); - if (dndInitMnodeMgmtWorker(pDnode) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (dndAllocMnodeMgmtQueue(pDnode) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - char path[PATH_MAX]; snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); pMgmt->file = strdup(path); @@ -894,8 +794,6 @@ void dndCleanupMnode(SDnode *pDnode) { dInfo("dnode-mnode start to clean up"); if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); - dndCleanupMnodeMgmtWorker(pDnode); - dndFreeMnodeMgmtQueue(pDnode); tfree(pMgmt->file); mndClose(pMgmt->pMnode); dInfo("dnode-mnode is cleaned up"); diff --git a/source/dnode/mgmt/impl/src/dndTransport.c b/source/dnode/mgmt/impl/src/dndTransport.c index a5be338a17..f39bbb6ba3 100644 --- a/source/dnode/mgmt/impl/src/dndTransport.c +++ b/source/dnode/mgmt/impl/src/dndTransport.c @@ -83,7 +83,7 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_MND_SHOW_RETRIEVE)] = dndProcessMnodeReadMsg; // message from client to dnode - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessDnodeReq; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_NETWORK_TEST)] = dndProcessMgmtMsg; // message from mnode to vnode pMgmt->msgFp[TMSG_INDEX(TDMT_VND_CREATE_STB)] = dndProcessVnodeWriteMsg; @@ -106,24 +106,24 @@ static void dndInitMsgFp(STransMgmt *pMgmt) { pMgmt->msgFp[TMSG_INDEX(TDMT_DND_AUTH_VNODE_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE)] = dndProcessVnodeMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_COMPACT_VNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMnodeMgmtMsg; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_DROP_MNODE_RSP)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessDnodeReq; + pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CONFIG_DNODE_RSP)] = dndProcessMnodeWriteMsg; // message from dnode to mnode pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessDnodeRsp; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_GRANT_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_TRANS_RSP)] = dndProcessMnodeWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS)] = dndProcessMnodeWriteMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessDnodeRsp; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_STATUS_RSP)] = dndProcessMgmtMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH)] = dndProcessMnodeReadMsg; - pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessDnodeRsp; + pMgmt->msgFp[TMSG_INDEX(TDMT_MND_AUTH_RSP)] = dndProcessMgmtMsg; } static void dndProcessResponse(void *parent, SRpcMsg *pMsg, SEpSet *pEpSet) { @@ -191,7 +191,7 @@ static void dndProcessRequest(void *param, SRpcMsg *pMsg, SEpSet *pEpSet) { tmsg_t msgType = pMsg->msgType; if (msgType == TDMT_DND_NETWORK_TEST) { dTrace("RPC %p, network test req, app:%p will be processed, code:0x%x", pMsg->handle, pMsg->ahandle, pMsg->code); - dndProcessDnodeReq(pDnode, pMsg, pEpSet); + dndProcessStartupReq(pDnode, pMsg); return; }