From a8b890bcae713138f1b9963849793eaf2f89f0b2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 12 Apr 2022 19:49:19 +0800 Subject: [PATCH] refact(cluster): node mgmt --- include/util/tprocess.h | 8 +- source/dnode/mgmt/exe/dndMain.c | 2 +- source/dnode/mgmt/implement/inc/dmInt.h | 56 --- .../implement/inc/{dndNode.h => dndImp.h} | 38 +- source/dnode/mgmt/implement/src/dmInt.c | 170 ------- .../mgmt/implement/src/{dmFile.c => dndEps.c} | 22 +- source/dnode/mgmt/implement/src/dndExec.c | 2 +- .../implement/src/{dmHandle.c => dndHandle.c} | 6 +- .../src/{dmMonitor.c => dndMonitor.c} | 2 +- source/dnode/mgmt/implement/src/dndObj.c | 99 +++- .../dnode/mgmt/implement/src/dndTransport.c | 426 ++++++++++-------- .../implement/src/{dmWorker.c => dndWorker.c} | 2 +- source/util/src/tprocess.c | 16 +- source/util/test/procTest.cpp | 28 +- 14 files changed, 409 insertions(+), 468 deletions(-) delete mode 100644 source/dnode/mgmt/implement/inc/dmInt.h rename source/dnode/mgmt/implement/inc/{dndNode.h => dndImp.h} (58%) delete mode 100644 source/dnode/mgmt/implement/src/dmInt.c rename source/dnode/mgmt/implement/src/{dmFile.c => dndEps.c} (94%) rename source/dnode/mgmt/implement/src/{dmHandle.c => dndHandle.c} (98%) rename source/dnode/mgmt/implement/src/{dmMonitor.c => dndMonitor.c} (99%) rename source/dnode/mgmt/implement/src/{dmWorker.c => dndWorker.c} (99%) diff --git a/include/util/tprocess.h b/include/util/tprocess.h index c5f33140dd..2b0fd89aa5 100644 --- a/include/util/tprocess.h +++ b/include/util/tprocess.h @@ -22,13 +22,13 @@ extern "C" { #endif -typedef enum { PROC_REQ = 1, PROC_RSP, PROC_REGIST, PROC_RELEASE } ProcFuncType; +typedef enum { PROC_FUNC_REQ = 1, PROC_FUNC_RSP, PROC_FUNC_REGIST, PROC_FUNC_RELEASE } EProcFuncType; typedef struct SProcObj SProcObj; typedef void *(*ProcMallocFp)(int32_t contLen); typedef void *(*ProcFreeFp)(void *pCont); typedef void (*ProcConsumeFp)(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, - ProcFuncType ftype); + EProcFuncType ftype); typedef struct { ProcConsumeFp childConsumeFp; @@ -53,11 +53,11 @@ int32_t taosProcRun(SProcObj *pProc); void taosProcStop(SProcObj *pProc); int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - void *handle, ProcFuncType ftype); + void *handle, EProcFuncType ftype); void taosProcRemoveHandle(SProcObj *pProc, void *handle); void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)); void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType ftype); + EProcFuncType ftype); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/exe/dndMain.c b/source/dnode/mgmt/exe/dndMain.c index 93892996c7..2454375d46 100644 --- a/source/dnode/mgmt/exe/dndMain.c +++ b/source/dnode/mgmt/exe/dndMain.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dndNode.h" +#include "dndImp.h" #include "tconfig.h" static struct { diff --git a/source/dnode/mgmt/implement/inc/dmInt.h b/source/dnode/mgmt/implement/inc/dmInt.h deleted file mode 100644 index 5d787b36b5..0000000000 --- a/source/dnode/mgmt/implement/inc/dmInt.h +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_DND_DNODE_INT_H_ -#define _TD_DND_DNODE_INT_H_ - -#include "dndNode.h" - -#ifdef __cplusplus -extern "C" { -#endif - - - -// dmFile.c -int32_t dmReadFile(SDnodeData *pMgmt); -int32_t dmWriteFile(SDnodeData *pMgmt); -void dmUpdateDnodeEps(SDnodeData *pMgmt, SArray *pDnodeEps); - -// dmHandle.c -void dmInitMsgHandle(SMgmtWrapper *pWrapper); -void dmSendStatusReq(SDnodeData *pMgmt); -int32_t dmProcessConfigReq(SDnodeData *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessGrantRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); -int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg); - -// dmMonitor.c -void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); -void dmSendMonitorReport(SDnode *pDnode); - -// dmWorker.c -int32_t dmStartThread(SDnodeData *pMgmt); -int32_t dmStartWorker(SDnodeData *pMgmt); -void dmStopWorker(SDnodeData *pMgmt); -int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_DND_DNODE_INT_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/implement/inc/dndNode.h b/source/dnode/mgmt/implement/inc/dndImp.h similarity index 58% rename from source/dnode/mgmt/implement/inc/dndNode.h rename to source/dnode/mgmt/implement/inc/dndImp.h index 283df57707..31b2f0a53a 100644 --- a/source/dnode/mgmt/implement/inc/dndNode.h +++ b/source/dnode/mgmt/implement/inc/dndImp.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_DND_NODE_H_ -#define _TD_DND_NODE_H_ +#ifndef _TD_DND_IMP_H_ +#define _TD_DND_IMP_H_ #include "dndInt.h" @@ -30,6 +30,7 @@ int32_t dndInitTrans(SDnode *pDnode); void dndCleanupTrans(SDnode *pDnode); SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper); int32_t dndInitMsgHandle(SDnode *pDnode); +void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq); void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp); // mgmt @@ -40,19 +41,40 @@ void smSetMgmtFp(SMgmtWrapper *pWrapper); void vmSetMgmtFp(SMgmtWrapper *pWrapper); void mmSetMgmtFp(SMgmtWrapper *pMgmt); -void dmGetMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet); -void dmUpdateMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet); -void dmSendRedirectRsp(SDnodeData *pMgmt, const SRpcMsg *pMsg); - -void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); +void vmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); void mmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonMmInfo *mmInfo); void vmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonVmInfo *vmInfo); void qmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonQmInfo *qmInfo); void smGetMonitorInfo(SMgmtWrapper *pWrapper, SMonSmInfo *smInfo); void bmGetMonitorInfo(SMgmtWrapper *pWrapper, SMonBmInfo *bmInfo); +// dmFile.c +int32_t dmReadFile(SDnodeData *pMgmt); +int32_t dmWriteFile(SDnodeData *pMgmt); +void dmUpdateDnodeEps(SDnodeData *pMgmt, SArray *pDnodeEps); + +// dmHandle.c +void dmInitMsgHandle(SMgmtWrapper *pWrapper); +void dmSendStatusReq(SDnodeData *pMgmt); +int32_t dmProcessConfigReq(SDnodeData *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessStatusRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessAuthRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessGrantRsp(SDnodeData *pMgmt, SNodeMsg *pMsg); +int32_t dmProcessCDnodeReq(SDnode *pDnode, SNodeMsg *pMsg); + +// dmMonitor.c +void dmGetVnodeLoads(SMgmtWrapper *pWrapper, SMonVloadInfo *pInfo); +void dmSendMonitorReport(SDnode *pDnode); + +// dmWorker.c +int32_t dmStartThread(SDnodeData *pMgmt); +int32_t dmStartWorker(SDnodeData *pMgmt); +void dmStopWorker(SDnodeData *pMgmt); +int32_t dmProcessMgmtMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t dmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); + #ifdef __cplusplus } #endif -#endif /*_TD_DND_NODE_H_*/ \ No newline at end of file +#endif /*_TD_DND_IMP_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/implement/src/dmInt.c b/source/dnode/mgmt/implement/src/dmInt.c deleted file mode 100644 index 8151aa8d1e..0000000000 --- a/source/dnode/mgmt/implement/src/dmInt.c +++ /dev/null @@ -1,170 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -#include "dmInt.h" - -void dmGetMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet) { - taosRLockLatch(&pMgmt->latch); - *pEpSet = pMgmt->mnodeEpSet; - taosRUnLockLatch(&pMgmt->latch); -} - -void dmUpdateMnodeEpSet(SDnodeData *pMgmt, SEpSet *pEpSet) { - dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); - - taosWLockLatch(&pMgmt->latch); - pMgmt->mnodeEpSet = *pEpSet; - for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { - dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); - } - - taosWUnLockLatch(&pMgmt->latch); -} - -void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { - SDnodeData *pMgmt = pWrapper->pMgmt; - taosRLockLatch(&pMgmt->latch); - - SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); - if (pDnodeEp != NULL) { - if (pPort != NULL) { - *pPort = pDnodeEp->ep.port; - } - if (pFqdn != NULL) { - tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); - } - if (pEp != NULL) { - snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); - } - } - - taosRUnLockLatch(&pMgmt->latch); -} - -void dmSendRedirectRsp(SDnodeData *pMgmt, const SRpcMsg *pReq) { - SDnode *pDnode = pMgmt->pDnode; - - SEpSet epSet = {0}; - dmGetMnodeEpSet(pMgmt, &epSet); - - dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse); - for (int32_t i = 0; i < epSet.numOfEps; ++i) { - dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); - if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) { - epSet.inUse = (i + 1) % epSet.numOfEps; - } - - epSet.eps[i].port = htons(epSet.eps[i].port); - } - - rpcSendRedirectRsp(pReq->handle, &epSet); -} - -static int32_t dmStart(SMgmtWrapper *pWrapper) { - dDebug("dnode-mgmt start to run"); - return dmStartThread(pWrapper->pMgmt); -} - -static int32_t dmInit(SMgmtWrapper *pWrapper) { - SDnode *pDnode = pWrapper->pDnode; - SDnodeData *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeData)); - dInfo("dnode-mgmt start to init"); - - pDnode->data.dnodeId = 0; - pDnode->data.dropped = 0; - pDnode->data.clusterId = 0; - pMgmt->path = pWrapper->path; - pMgmt->pDnode = pDnode; - taosInitRWLatch(&pMgmt->latch); - - pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - if (pMgmt->dnodeHash == NULL) { - dError("failed to init dnode hash"); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (dmReadFile(pMgmt) != 0) { - dError("failed to read file since %s", terrstr()); - return -1; - } - - if (pDnode->data.dropped) { - dError("dnode will not start since its already dropped"); - return -1; - } - - if (dmStartWorker(pMgmt) != 0) { - return -1; - } - - if (dndInitTrans(pDnode) != 0) { - dError("failed to init transport since %s", terrstr()); - return -1; - } - - pWrapper->pMgmt = pMgmt; - pMgmt->msgCb = dndCreateMsgcb(pWrapper); - - dInfo("dnode-mgmt is initialized"); - return 0; -} - -static void dmCleanup(SMgmtWrapper *pWrapper) { - SDnodeData *pMgmt = pWrapper->pMgmt; - if (pMgmt == NULL) return; - - dInfo("dnode-mgmt start to clean up"); - SDnode *pDnode = pMgmt->pDnode; - dmStopWorker(pMgmt); - - taosWLockLatch(&pMgmt->latch); - - if (pMgmt->dnodeEps != NULL) { - taosArrayDestroy(pMgmt->dnodeEps); - pMgmt->dnodeEps = NULL; - } - - if (pMgmt->dnodeHash != NULL) { - taosHashCleanup(pMgmt->dnodeHash); - pMgmt->dnodeHash = NULL; - } - - taosWUnLockLatch(&pMgmt->latch); - - taosMemoryFree(pMgmt); - pWrapper->pMgmt = NULL; - dndCleanupTrans(pDnode); - - dInfo("dnode-mgmt is cleaned up"); -} - -static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) { - *required = true; - return 0; -} - -void dmSetMgmtFp(SMgmtWrapper *pWrapper) { - SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = dmInit; - mgmtFp.closeFp = dmCleanup; - mgmtFp.startFp = dmStart; - mgmtFp.requiredFp = dmRequire; - - dmInitMsgHandle(pWrapper); - pWrapper->name = "dnode"; - pWrapper->fp = mgmtFp; -} diff --git a/source/dnode/mgmt/implement/src/dmFile.c b/source/dnode/mgmt/implement/src/dndEps.c similarity index 94% rename from source/dnode/mgmt/implement/src/dmFile.c rename to source/dnode/mgmt/implement/src/dndEps.c index aefc28c46d..04de19d6a8 100644 --- a/source/dnode/mgmt/implement/src/dmFile.c +++ b/source/dnode/mgmt/implement/src/dndEps.c @@ -14,12 +14,32 @@ */ #define _DEFAULT_SOURCE -#include "dmInt.h" +#include "dndImp.h" static void dmPrintDnodes(SDnodeData *pMgmt); static bool dmIsEpChanged(SDnodeData *pMgmt, int32_t dnodeId, const char *ep); static void dmResetDnodes(SDnodeData *pMgmt, SArray *dnodeEps); +void dmGetDnodeEp(SMgmtWrapper *pWrapper, int32_t dnodeId, char *pEp, char *pFqdn, uint16_t *pPort) { + SDnodeData *pMgmt = pWrapper->pMgmt; + taosRLockLatch(&pMgmt->latch); + + SDnodeEp *pDnodeEp = taosHashGet(pMgmt->dnodeHash, &dnodeId, sizeof(int32_t)); + if (pDnodeEp != NULL) { + if (pPort != NULL) { + *pPort = pDnodeEp->ep.port; + } + if (pFqdn != NULL) { + tstrncpy(pFqdn, pDnodeEp->ep.fqdn, TSDB_FQDN_LEN); + } + if (pEp != NULL) { + snprintf(pEp, TSDB_EP_LEN, "%s:%u", pDnodeEp->ep.fqdn, pDnodeEp->ep.port); + } + } + + taosRUnLockLatch(&pMgmt->latch); +} + int32_t dmReadFile(SDnodeData *pMgmt) { int32_t code = TSDB_CODE_INVALID_JSON_FORMAT; int32_t len = 0; diff --git a/source/dnode/mgmt/implement/src/dndExec.c b/source/dnode/mgmt/implement/src/dndExec.c index 7bdf0f3791..c30e730d8a 100644 --- a/source/dnode/mgmt/implement/src/dndExec.c +++ b/source/dnode/mgmt/implement/src/dndExec.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dndNode.h" +#include "dndImp.h" static bool dndRequireNode(SMgmtWrapper *pWrapper) { bool required = false; diff --git a/source/dnode/mgmt/implement/src/dmHandle.c b/source/dnode/mgmt/implement/src/dndHandle.c similarity index 98% rename from source/dnode/mgmt/implement/src/dmHandle.c rename to source/dnode/mgmt/implement/src/dndHandle.c index 36edc089f0..196671c916 100644 --- a/source/dnode/mgmt/implement/src/dmHandle.c +++ b/source/dnode/mgmt/implement/src/dndHandle.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dmInt.h" +#include "dndImp.h" void dmSendStatusReq(SDnodeData *pMgmt) { SDnode *pDnode = pMgmt->pDnode; @@ -57,9 +57,7 @@ void dmSendStatusReq(SDnodeData *pMgmt) { pMgmt->statusSent = 1; dTrace("send req:%s to mnode, app:%p", TMSG_INFO(rpcMsg.msgType), rpcMsg.ahandle); - SEpSet epSet = {0}; - dmGetMnodeEpSet(pMgmt, &epSet); - tmsgSendReq(&pMgmt->msgCb, &epSet, &rpcMsg); + dndSendMsgToMnode(pDnode, &rpcMsg); } static void dmUpdateDnodeCfg(SDnodeData *pMgmt, SDnodeCfg *pCfg) { diff --git a/source/dnode/mgmt/implement/src/dmMonitor.c b/source/dnode/mgmt/implement/src/dndMonitor.c similarity index 99% rename from source/dnode/mgmt/implement/src/dmMonitor.c rename to source/dnode/mgmt/implement/src/dndMonitor.c index dc086ffc96..73aa8d9424 100644 --- a/source/dnode/mgmt/implement/src/dmMonitor.c +++ b/source/dnode/mgmt/implement/src/dndMonitor.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dmInt.h" +#include "dndImp.h" static void dmGetMonitorBasicInfo(SDnode *pDnode, SMonBasicInfo *pInfo) { pInfo->protocol = 1; diff --git a/source/dnode/mgmt/implement/src/dndObj.c b/source/dnode/mgmt/implement/src/dndObj.c index b008b5ce8a..bfbdb756c6 100644 --- a/source/dnode/mgmt/implement/src/dndObj.c +++ b/source/dnode/mgmt/implement/src/dndObj.c @@ -14,7 +14,104 @@ */ #define _DEFAULT_SOURCE -#include "dndNode.h" +#include "dndImp.h" + + +static int32_t dmStart(SMgmtWrapper *pWrapper) { + dDebug("dnode-mgmt start to run"); + return dmStartThread(pWrapper->pMgmt); +} + +static int32_t dmInit(SMgmtWrapper *pWrapper) { + SDnode *pDnode = pWrapper->pDnode; + SDnodeData *pMgmt = taosMemoryCalloc(1, sizeof(SDnodeData)); + dInfo("dnode-mgmt start to init"); + + pDnode->data.dnodeId = 0; + pDnode->data.dropped = 0; + pDnode->data.clusterId = 0; + pMgmt->path = pWrapper->path; + pMgmt->pDnode = pDnode; + taosInitRWLatch(&pMgmt->latch); + + pMgmt->dnodeHash = taosHashInit(4, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + if (pMgmt->dnodeHash == NULL) { + dError("failed to init dnode hash"); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + if (dmReadFile(pMgmt) != 0) { + dError("failed to read file since %s", terrstr()); + return -1; + } + + if (pDnode->data.dropped) { + dError("dnode will not start since its already dropped"); + return -1; + } + + if (dmStartWorker(pMgmt) != 0) { + return -1; + } + + if (dndInitTrans(pDnode) != 0) { + dError("failed to init transport since %s", terrstr()); + return -1; + } + + pWrapper->pMgmt = pMgmt; + pMgmt->msgCb = dndCreateMsgcb(pWrapper); + + dInfo("dnode-mgmt is initialized"); + return 0; +} + +static void dmCleanup(SMgmtWrapper *pWrapper) { + SDnodeData *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; + + dInfo("dnode-mgmt start to clean up"); + SDnode *pDnode = pMgmt->pDnode; + dmStopWorker(pMgmt); + + taosWLockLatch(&pMgmt->latch); + + if (pMgmt->dnodeEps != NULL) { + taosArrayDestroy(pMgmt->dnodeEps); + pMgmt->dnodeEps = NULL; + } + + if (pMgmt->dnodeHash != NULL) { + taosHashCleanup(pMgmt->dnodeHash); + pMgmt->dnodeHash = NULL; + } + + taosWUnLockLatch(&pMgmt->latch); + + taosMemoryFree(pMgmt); + pWrapper->pMgmt = NULL; + dndCleanupTrans(pDnode); + + dInfo("dnode-mgmt is cleaned up"); +} + +static int32_t dmRequire(SMgmtWrapper *pWrapper, bool *required) { + *required = true; + return 0; +} + +void dmSetMgmtFp(SMgmtWrapper *pWrapper) { + SMgmtFp mgmtFp = {0}; + mgmtFp.openFp = dmInit; + mgmtFp.closeFp = dmCleanup; + mgmtFp.startFp = dmStart; + mgmtFp.requiredFp = dmRequire; + + dmInitMsgHandle(pWrapper); + pWrapper->name = "dnode"; + pWrapper->fp = mgmtFp; +} static int32_t dndInitVars(SDnode *pDnode, const SDnodeOpt *pOption) { pDnode->data.supportVnodes = pOption->numOfSupportVnodes; diff --git a/source/dnode/mgmt/implement/src/dndTransport.c b/source/dnode/mgmt/implement/src/dndTransport.c index 197cfffd7e..7d9ba1d56d 100644 --- a/source/dnode/mgmt/implement/src/dndTransport.c +++ b/source/dnode/mgmt/implement/src/dndTransport.c @@ -14,19 +14,28 @@ */ #define _DEFAULT_SOURCE -#include "dndNode.h" +#include "dndImp.h" #define INTERNAL_USER "_dnd" #define INTERNAL_CKEY "_key" #define INTERNAL_SECRET "_pwd" -static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq); -static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp); -static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg); -static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type); -static void dndUpdateMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[NODE_BEGIN]; - dmUpdateMnodeEpSet(pWrapper->pMgmt, pEpSet); +static void dndGetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { + taosRLockLatch(&pDnode->data.latch); + *pEpSet = pDnode->data.mnodeEpSet; + taosRUnLockLatch(&pDnode->data.latch); +} + +static void dndSetMnodeEpSet(SDnode *pDnode, SEpSet *pEpSet) { + dInfo("mnode is changed, num:%d use:%d", pEpSet->numOfEps, pEpSet->inUse); + + taosWLockLatch(&pDnode->data.latch); + pDnode->data.mnodeEpSet = *pEpSet; + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { + dInfo("mnode index:%d %s:%u", i, pEpSet->eps[i].fqdn, pEpSet->eps[i].port); + } + + taosWUnLockLatch(&pDnode->data.latch); } static inline NodeMsgFp dndGetMsgFp(SMgmtWrapper *pWrapper, SRpcMsg *pRpc) { @@ -60,7 +69,7 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS uint16_t msgType = pRpc->msgType; if (pEpSet && pEpSet->numOfEps > 0 && msgType == TDMT_MND_STATUS_RSP) { - dndUpdateMnodeEpSet(pWrapper->pDnode, pEpSet); + dndSetMnodeEpSet(pWrapper->pDnode, pEpSet); } if (dndMarkWrapper(pWrapper) != 0) goto _OVER; @@ -74,7 +83,7 @@ static void dndProcessRpcMsg(SMgmtWrapper *pWrapper, SRpcMsg *pRpc, SEpSet *pEpS } else if (pWrapper->procType == DND_PROC_PARENT) { dTrace("msg:%p, is created and put into child queue, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); code = taosProcPutToChildQ(pWrapper->procObj, pMsg, sizeof(SNodeMsg), pRpc->pCont, pRpc->contLen, pRpc->handle, - PROC_REQ); + PROC_FUNC_REQ); } else { dTrace("msg:%p, should not processed in child process, handle:%p user:%s", pMsg, pRpc->handle, pMsg->user); ASSERT(1); @@ -162,11 +171,215 @@ static void dndProcessMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { dndProcessRpcMsg(pWrapper, pMsg, pEpSet); } +int32_t dndInitMsgHandle(SDnode *pDnode) { + SDnodeTrans *pTrans = &pDnode->trans; + + for (EDndNodeType n = NODE_BEGIN + 1; n < NODE_END; ++n) { + SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; + + for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { + NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; + int8_t vgId = pWrapper->msgVgIds[msgIndex]; + if (msgFp == NULL) continue; + + SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex]; + if (vgId == QNODE_HANDLE) { + if (pHandle->pQndWrapper != NULL) { + dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); + return -1; + } + pHandle->pQndWrapper = pWrapper; + } else if (vgId == MNODE_HANDLE) { + if (pHandle->pMndWrapper != NULL) { + dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); + return -1; + } + pHandle->pMndWrapper = pWrapper; + } else { + if (pHandle->pNdWrapper != NULL) { + dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); + return -1; + } + pHandle->pNdWrapper = pWrapper; + } + } + } + + return 0; +} + +static inline int32_t dndSendRpcReq(SDnode *pDnode, const SEpSet *pEpSet, SRpcMsg *pReq) { + if (pDnode->trans.clientRpc == NULL) { + terrno = TSDB_CODE_NODE_OFFLINE; + return -1; + } + + rpcSendRequest(pDnode->trans.clientRpc, pEpSet, pReq, NULL); + return 0; +} + +static void dndSendRpcRedirectRsp(SDnode *pDnode, const SRpcMsg *pReq) { + SEpSet epSet = {0}; + dndGetMnodeEpSet(pDnode, &epSet); + + dDebug("RPC %p, req is redirected, num:%d use:%d", pReq->handle, epSet.numOfEps, epSet.inUse); + for (int32_t i = 0; i < epSet.numOfEps; ++i) { + dDebug("mnode index:%d %s:%u", i, epSet.eps[i].fqdn, epSet.eps[i].port); + if (strcmp(epSet.eps[i].fqdn, pDnode->data.localFqdn) == 0 && epSet.eps[i].port == pDnode->data.serverPort) { + epSet.inUse = (i + 1) % epSet.numOfEps; + } + + epSet.eps[i].port = htons(epSet.eps[i].port); + } + + rpcSendRedirectRsp(pReq->handle, &epSet); +} + +static inline void dndSendRpcRsp(SDnode *pDnode, const SRpcMsg *pRsp) { + if (pRsp->code == TSDB_CODE_NODE_REDIRECT) { + dndSendRpcRedirectRsp(pDnode, pRsp); + } else { + rpcSendResponse(pRsp); + } +} + +void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { + rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp); +} + +void dndSendMsgToMnode(SDnode *pDnode, SRpcMsg *pReq) { + SEpSet epSet = {0}; + dndGetMnodeEpSet(pDnode, &epSet); + dndSendRpcReq(pDnode, &epSet, pReq); +} + +static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) { + SEpSet epSet = {0}; + dndGetMnodeEpSet(pDnode, &epSet); + rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp); +} + +static inline int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { + if (dndGetStatus(pWrapper->pDnode) != DND_STAT_RUNNING) { + terrno = TSDB_CODE_NODE_OFFLINE; + dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); + return -1; + } + + if (pWrapper->procType != DND_PROC_CHILD) { + return dndSendRpcReq(pWrapper->pDnode, pEpSet, pReq); + } else { + char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet)); + if (pHead == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + memcpy(pHead, pReq, sizeof(SRpcMsg)); + memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); + taosProcPutToParentQ(pWrapper->procObj, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen, + PROC_FUNC_REQ); + taosMemoryFree(pHead); + return 0; + } +} + +static inline void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { + if (pWrapper->procType != DND_PROC_CHILD) { + dndSendRpcRsp(pWrapper->pDnode, pRsp); + } else { + taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_FUNC_RSP); + } +} + +static inline void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { + if (pWrapper->procType != DND_PROC_CHILD) { + rpcRegisterBrokenLinkArg(pMsg); + } else { + taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_FUNC_REGIST); + } +} + +static inline void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { + if (pWrapper->procType != DND_PROC_CHILD) { + rpcReleaseHandle(handle, type); + } else { + SRpcMsg msg = {.handle = handle, .code = type}; + taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_FUNC_RELEASE); + } +} + +static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, + EProcFuncType ftype) { + SRpcMsg *pRpc = &pMsg->rpcMsg; + pRpc->pCont = pCont; + dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle); + + NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; + int32_t code = (*msgFp)(pWrapper, pMsg); + + if (code != 0) { + dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); + if (pRpc->msgType & 1U) { + SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; + dndSendRsp(pWrapper, &rsp); + } + + dTrace("msg:%p, is freed", pMsg); + taosFreeQitem(pMsg); + rpcFreeCont(pCont); + } +} + +static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, + EProcFuncType ftype) { + pMsg->pCont = pCont; + dTrace("msg:%p, get from parent queue, ftype:%d handle:%p code:0x%04x mtype:%d, app:%p", pMsg, ftype, pMsg->handle, + pMsg->code & 0xFFFF, pMsg->msgType, pMsg->ahandle); + + switch (ftype) { + case PROC_FUNC_REGIST: + rpcRegisterBrokenLinkArg(pMsg); + break; + case PROC_FUNC_RELEASE: + taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); + rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); + rpcFreeCont(pCont); + break; + case PROC_FUNC_REQ: + dndSendRpcReq(pWrapper->pDnode, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); + break; + case PROC_FUNC_RSP: + taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); + dndSendRpcRsp(pWrapper->pDnode, pMsg); + break; + default: + break; + } + taosMemoryFree(pMsg); +} + +SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { + SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, + .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, + .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, + .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, + .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, + .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, + .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, + .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, + .shm = pWrapper->procShm, + .parent = pWrapper, + .name = pWrapper->name}; + return cfg; +} + static int32_t dndInitClient(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); + SRpcInit rpcInit = {0}; rpcInit.label = "DND"; rpcInit.numOfThreads = 1; rpcInit.cfp = (RpcCfp)dndProcessMsg; @@ -201,13 +414,6 @@ static void dndCleanupClient(SDnode *pDnode) { } } -static inline void dndSendMsgToMnodeRecv(SDnode *pDnode, SRpcMsg *pReq, SRpcMsg *pRsp) { - SEpSet epSet = {0}; - SMgmtWrapper *pWrapper = &pDnode->wrappers[NODE_BEGIN]; - dmGetMnodeEpSet(pWrapper->pMgmt, &epSet); - rpcSendRecv(pDnode->trans.clientRpc, &epSet, pReq, pRsp); -} - static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { int32_t code = 0; @@ -231,7 +437,8 @@ static inline int32_t dndGetHideUserAuth(SDnode *pDnode, char *user, char *spi, return code; } -static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { +static inline int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, + char *ckey) { if (dndGetHideUserAuth(pDnode, user, spi, encrypt, secret, ckey) == 0) { dTrace("user:%s, get auth from mnode, spi:%d encrypt:%d", user, *spi, *encrypt); return 0; @@ -269,8 +476,7 @@ static int32_t dndRetrieveUserAuthInfo(SDnode *pDnode, char *user, char *spi, ch static int32_t dndInitServer(SDnode *pDnode) { SDnodeTrans *pTrans = &pDnode->trans; - SRpcInit rpcInit; - memset(&rpcInit, 0, sizeof(rpcInit)); + SRpcInit rpcInit = {0}; rpcInit.localPort = pDnode->data.serverPort; rpcInit.label = "DND"; rpcInit.numOfThreads = tsNumOfRpcThreads; @@ -319,179 +525,3 @@ void dndCleanupTrans(SDnode *pDnode) { dndCleanupServer(pDnode); dndCleanupClient(pDnode); } - -int32_t dndInitMsgHandle(SDnode *pDnode) { - SDnodeTrans *pTrans = &pDnode->trans; - - for (EDndNodeType n = 0; n < NODE_END; ++n) { - SMgmtWrapper *pWrapper = &pDnode->wrappers[n]; - - for (int32_t msgIndex = 0; msgIndex < TDMT_MAX; ++msgIndex) { - NodeMsgFp msgFp = pWrapper->msgFps[msgIndex]; - int8_t vgId = pWrapper->msgVgIds[msgIndex]; - if (msgFp == NULL) continue; - - SMsgHandle *pHandle = &pTrans->msgHandles[msgIndex]; - if (vgId == QNODE_HANDLE) { - if (pHandle->pQndWrapper != NULL) { - dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); - return -1; - } - pHandle->pQndWrapper = pWrapper; - } else if (vgId == MNODE_HANDLE) { - if (pHandle->pMndWrapper != NULL) { - dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); - return -1; - } - pHandle->pMndWrapper = pWrapper; - } else { - if (pHandle->pNdWrapper != NULL) { - dError("msg:%s has multiple process nodes", tMsgInfo[msgIndex]); - return -1; - } - pHandle->pNdWrapper = pWrapper; - } - } - } - - return 0; -} - -static int32_t dndSendRpcReq(SDnodeTrans *pTrans, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (pTrans->clientRpc == NULL) { - terrno = TSDB_CODE_NODE_OFFLINE; - return -1; - } - - rpcSendRequest(pTrans->clientRpc, pEpSet, pReq, NULL); - return 0; -} - -static void dndSendRpcRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { - if (pRsp->code == TSDB_CODE_NODE_REDIRECT) { - dmSendRedirectRsp(pWrapper->pMgmt, pRsp); - } else { - rpcSendResponse(pRsp); - } -} - -static int32_t dndSendReq(SMgmtWrapper *pWrapper, const SEpSet *pEpSet, SRpcMsg *pReq) { - if (dndGetStatus(pWrapper->pDnode) != DND_STAT_RUNNING) { - terrno = TSDB_CODE_NODE_OFFLINE; - dError("failed to send rpc msg since %s, handle:%p", terrstr(), pReq->handle); - return -1; - } - - if (pWrapper->procType != DND_PROC_CHILD) { - return dndSendRpcReq(&pWrapper->pDnode->trans, pEpSet, pReq); - } else { - char *pHead = taosMemoryMalloc(sizeof(SRpcMsg) + sizeof(SEpSet)); - if (pHead == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - memcpy(pHead, pReq, sizeof(SRpcMsg)); - memcpy(pHead + sizeof(SRpcMsg), pEpSet, sizeof(SEpSet)); - taosProcPutToParentQ(pWrapper->procObj, pHead, sizeof(SRpcMsg) + sizeof(SEpSet), pReq->pCont, pReq->contLen, - PROC_REQ); - taosMemoryFree(pHead); - return 0; - } -} - -static void dndSendRsp(SMgmtWrapper *pWrapper, const SRpcMsg *pRsp) { - if (pWrapper->procType != DND_PROC_CHILD) { - dndSendRpcRsp(pWrapper, pRsp); - } else { - taosProcPutToParentQ(pWrapper->procObj, pRsp, sizeof(SRpcMsg), pRsp->pCont, pRsp->contLen, PROC_RSP); - } -} - -static void dndRegisterBrokenLinkArg(SMgmtWrapper *pWrapper, SRpcMsg *pMsg) { - if (pWrapper->procType != DND_PROC_CHILD) { - rpcRegisterBrokenLinkArg(pMsg); - } else { - taosProcPutToParentQ(pWrapper->procObj, pMsg, sizeof(SRpcMsg), pMsg->pCont, pMsg->contLen, PROC_REGIST); - } -} - -static void dndReleaseHandle(SMgmtWrapper *pWrapper, void *handle, int8_t type) { - if (pWrapper->procType != DND_PROC_CHILD) { - rpcReleaseHandle(handle, type); - } else { - SRpcMsg msg = {.handle = handle, .code = type}; - taosProcPutToParentQ(pWrapper->procObj, &msg, sizeof(SRpcMsg), NULL, 0, PROC_RELEASE); - } -} - -static void dndConsumeChildQueue(SMgmtWrapper *pWrapper, SNodeMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, - ProcFuncType ftype) { - SRpcMsg *pRpc = &pMsg->rpcMsg; - pRpc->pCont = pCont; - dTrace("msg:%p, get from child queue, handle:%p app:%p", pMsg, pRpc->handle, pRpc->ahandle); - - NodeMsgFp msgFp = pWrapper->msgFps[TMSG_INDEX(pRpc->msgType)]; - int32_t code = (*msgFp)(pWrapper, pMsg); - - if (code != 0) { - dError("msg:%p, failed to process since code:0x%04x:%s", pMsg, code & 0XFFFF, tstrerror(code)); - if (pRpc->msgType & 1U) { - SRpcMsg rsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = terrno}; - dndSendRsp(pWrapper, &rsp); - } - - dTrace("msg:%p, is freed", pMsg); - taosFreeQitem(pMsg); - rpcFreeCont(pCont); - } -} - -static void dndConsumeParentQueue(SMgmtWrapper *pWrapper, SRpcMsg *pMsg, int16_t msgLen, void *pCont, int32_t contLen, - ProcFuncType ftype) { - pMsg->pCont = pCont; - dTrace("msg:%p, get from parent queue, ftype:%d handle:%p code:0x%04x mtype:%d, app:%p", pMsg, ftype, pMsg->handle, - pMsg->code & 0xFFFF, pMsg->msgType, pMsg->ahandle); - - switch (ftype) { - case PROC_REGIST: - rpcRegisterBrokenLinkArg(pMsg); - break; - case PROC_RELEASE: - taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); - rpcReleaseHandle(pMsg->handle, (int8_t)pMsg->code); - rpcFreeCont(pCont); - break; - case PROC_REQ: - dndSendRpcReq(&pWrapper->pDnode->trans, (SEpSet *)((char *)pMsg + sizeof(SRpcMsg)), pMsg); - break; - case PROC_RSP: - taosProcRemoveHandle(pWrapper->procObj, pMsg->handle); - dndSendRpcRsp(pWrapper, pMsg); - break; - default: - break; - } - taosMemoryFree(pMsg); -} - -SProcCfg dndGenProcCfg(SMgmtWrapper *pWrapper) { - SProcCfg cfg = {.childConsumeFp = (ProcConsumeFp)dndConsumeChildQueue, - .childMallocHeadFp = (ProcMallocFp)taosAllocateQitem, - .childFreeHeadFp = (ProcFreeFp)taosFreeQitem, - .childMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .childFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .parentConsumeFp = (ProcConsumeFp)dndConsumeParentQueue, - .parentMallocHeadFp = (ProcMallocFp)taosMemoryMalloc, - .parentFreeHeadFp = (ProcFreeFp)taosMemoryFree, - .parentMallocBodyFp = (ProcMallocFp)rpcMallocCont, - .parentFreeBodyFp = (ProcFreeFp)rpcFreeCont, - .shm = pWrapper->procShm, - .parent = pWrapper, - .name = pWrapper->name}; - return cfg; -} - -void dndSendRecv(SDnode *pDnode, SEpSet *pEpSet, SRpcMsg *pReq, SRpcMsg *pRsp) { - rpcSendRecv(pDnode->trans.clientRpc, pEpSet, pReq, pRsp); -} \ No newline at end of file diff --git a/source/dnode/mgmt/implement/src/dmWorker.c b/source/dnode/mgmt/implement/src/dndWorker.c similarity index 99% rename from source/dnode/mgmt/implement/src/dmWorker.c rename to source/dnode/mgmt/implement/src/dndWorker.c index 6886c56fab..1f42c8105b 100644 --- a/source/dnode/mgmt/implement/src/dmWorker.c +++ b/source/dnode/mgmt/implement/src/dndWorker.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "dmInt.h" +#include "dndImp.h" static void *dmThreadRoutine(void *param) { SDnodeData *pMgmt = param; diff --git a/source/util/src/tprocess.c b/source/util/src/tprocess.c index 2cafd3f7f6..ae7b3d6f1f 100644 --- a/source/util/src/tprocess.c +++ b/source/util/src/tprocess.c @@ -154,7 +154,7 @@ static void taosProcCleanupQueue(SProcQueue *pQueue) { } static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char *pHead, int16_t rawHeadLen, - const char *pBody, int32_t rawBodyLen, int64_t handle, ProcFuncType ftype) { + const char *pBody, int32_t rawBodyLen, int64_t handle, EProcFuncType ftype) { if (rawHeadLen == 0 || pHead == NULL) { terrno = TSDB_CODE_INVALID_PARA; return -1; @@ -171,7 +171,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char return -1; } - if (handle != 0 && ftype == PROC_REQ) { + if (handle != 0 && ftype == PROC_FUNC_REQ) { if (taosHashPut(pProc->hash, &handle, sizeof(int64_t), &handle, sizeof(int64_t)) != 0) { taosThreadMutexUnlock(&pQueue->mutex); terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -232,7 +232,7 @@ static int32_t taosProcQueuePush(SProcObj *pProc, SProcQueue *pQueue, const char } static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHeadLen, void **ppBody, int32_t *pBodyLen, - ProcFuncType *pFuncType, ProcMallocFp mallocHeadFp, ProcFreeFp freeHeadFp, + EProcFuncType *pFuncType, ProcMallocFp mallocHeadFp, ProcFreeFp freeHeadFp, ProcMallocFp mallocBodyFp, ProcFreeFp freeBodyFp) { tsem_wait(&pQueue->sem); @@ -309,7 +309,7 @@ static int32_t taosProcQueuePop(SProcQueue *pQueue, void **ppHead, int16_t *pHea *ppBody = pBody; *pHeadLen = rawHeadLen; *pBodyLen = rawBodyLen; - *pFuncType = (ProcFuncType)ftype; + *pFuncType = (EProcFuncType)ftype; uTrace("proc:%s, pop msg at pos:%d ftype:%d remain:%d, head:%d %p body:%d %p", pQueue->name, pos, ftype, pQueue->items, rawHeadLen, pHead, rawBodyLen, pBody); @@ -364,7 +364,7 @@ SProcObj *taosProcInit(const SProcCfg *pCfg) { static void taosProcThreadLoop(SProcObj *pProc) { void *pHead, *pBody; int16_t headLen; - ProcFuncType ftype; + EProcFuncType ftype; int32_t bodyLen; SProcQueue *pQueue; ProcConsumeFp consumeFp; @@ -454,8 +454,8 @@ void taosProcCleanup(SProcObj *pProc) { } int32_t taosProcPutToChildQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - void *handle, ProcFuncType ftype) { - if (ftype != PROC_REQ) { + void *handle, EProcFuncType ftype) { + if (ftype != PROC_FUNC_REQ) { terrno = TSDB_CODE_INVALID_PARA; return -1; } @@ -482,7 +482,7 @@ void taosProcCloseHandles(SProcObj *pProc, void (*HandleFp)(void *handle)) { } void taosProcPutToParentQ(SProcObj *pProc, const void *pHead, int16_t headLen, const void *pBody, int32_t bodyLen, - ProcFuncType ftype) { + EProcFuncType ftype) { int32_t retry = 0; while (taosProcQueuePush(pProc, pProc->pParentQueue, pHead, headLen, pBody, bodyLen, 0, ftype) != 0) { uWarn("proc:%s, failed to put to queue:%p since %s, retry:%d", pProc->name, pProc->pParentQueue, terrstr(), retry); diff --git a/source/util/test/procTest.cpp b/source/util/test/procTest.cpp index 54aaf49673..e05008e8e1 100644 --- a/source/util/test/procTest.cpp +++ b/source/util/test/procTest.cpp @@ -89,7 +89,7 @@ TEST_F(UtilTesProc, 00_Init_Cleanup) { taosDropShm(&shm); } -void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { +void ConsumeChild1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) { STestMsg msg; memcpy(&msg, pHead, headLen); char body[2000] = {0}; @@ -120,20 +120,20 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { SProcObj *cproc = taosProcInit(&cfg); ASSERT_NE(cproc, nullptr); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RSP), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REGIST), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_RELEASE), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_REQ), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_REQ), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_REQ), 0); - ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RSP), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REGIST), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_RELEASE), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, NULL, 12, body, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, 0, body, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, shm.size, body, 0, 0, PROC_FUNC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, shm.size, 0, PROC_FUNC_REQ), 0); for (int32_t j = 0; j < 1000; j++) { int32_t i = 0; for (i = 0; i < 20; ++i) { - ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0); + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0); } - ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_REQ), 0); + ASSERT_NE(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, 0, PROC_FUNC_REQ), 0); cfg.isChild = true; cfg.name = "1235_p"; @@ -147,7 +147,7 @@ TEST_F(UtilTesProc, 01_Push_Pop_Child) { taosDropShm(&shm); } -void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { +void ConsumeParent1(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) { STestMsg msg; memcpy(&msg, pHead, headLen); char body[2000] = {0}; @@ -186,7 +186,7 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { for (int32_t j = 0; j < 1000; j++) { int32_t i = 0; for (i = 0; i < 20; ++i) { - taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_REQ); + taosProcPutToParentQ(pproc, &head, sizeof(STestMsg), body, i, PROC_FUNC_REQ); } taosProcRun(cproc); @@ -198,7 +198,7 @@ TEST_F(UtilTesProc, 02_Push_Pop_Parent) { taosDropShm(&shm); } -void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, ProcFuncType ftype) { +void ConsumeChild3(void *parent, void *pHead, int16_t headLen, void *pBody, int32_t bodyLen, EProcFuncType ftype) { STestMsg msg; memcpy(&msg, pHead, headLen); char body[2000] = {0}; @@ -236,7 +236,7 @@ TEST_F(UtilTesProc, 03_Handle) { int32_t i = 0; for (i = 0; i < 20; ++i) { head.handle = (void *)((int64_t)i); - ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_REQ), 0); + ASSERT_EQ(taosProcPutToChildQ(cproc, &head, sizeof(STestMsg), body, i, (void *)((int64_t)i), PROC_FUNC_REQ), 0); } cfg.isChild = true;