From 7906e78257565977d4a5ac1f7a81ebfe267dd657 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 26 Nov 2021 14:43:56 +0800 Subject: [PATCH] TD-11265 refact dndVnodes --- include/dnode/mgmt/dnode.h | 2 +- include/dnode/vnode/vnode.h | 18 ----------- source/dnode/mgmt/daemon/src/daemon.c | 2 +- source/dnode/mgmt/impl/src/dndDnode.c | 13 ++++---- source/dnode/mgmt/impl/src/dndMnode.c | 6 ++-- source/dnode/mgmt/impl/src/dndVnodes.c | 44 +++++++++++++++++--------- source/dnode/mgmt/impl/src/dnode.c | 1 + source/dnode/vnode/impl/src/vnodeInt.c | 7 ++-- 8 files changed, 43 insertions(+), 50 deletions(-) diff --git a/include/dnode/mgmt/dnode.h b/include/dnode/mgmt/dnode.h index 7dd7730443..f43fe107fe 100644 --- a/include/dnode/mgmt/dnode.h +++ b/include/dnode/mgmt/dnode.h @@ -78,7 +78,7 @@ typedef struct { * @brief data file's directory. * */ - char dataDir[PATH_MAX]; + char dataDir[TSDB_FILENAME_LEN]; /** * @brief local endpoint. diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index 3f6705fac6..1edd93f509 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -184,27 +184,9 @@ typedef struct { SRpcMsg rpcMsg[]; } SVnodeMsg; -typedef struct SDnode SDnode; -typedef void (*SendMsgToDnodeFp)(SDnode *pDnode, struct SEpSet *epSet, struct SRpcMsg *rpcMsg); -typedef void (*SendMsgToMnodeFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef void (*SendRedirectMsgFp)(SDnode *pDnode, struct SRpcMsg *rpcMsg); -typedef int32_t (*PutMsgToVnodeQFp)(SDnode *pDnode, int32_t vgId, SVnodeMsg *pMsg); - -typedef struct { - PutMsgToVnodeQFp putMsgToApplyQueueFp; - SendMsgToDnodeFp sendMsgToDnodeFp; - SendMsgToMnodeFp sendMsgToMnodeFp; -} SVnodePara; - -int32_t vnodeInit(SVnodePara); -void vnodeCleanup(); - int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); -SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg); -void vnodeDrop(SVnode *pVnode); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); - int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); SVnodeMsg *vnodeInitMsg(int32_t msgNum); diff --git a/source/dnode/mgmt/daemon/src/daemon.c b/source/dnode/mgmt/daemon/src/daemon.c index 429b097fb8..effaec66a8 100644 --- a/source/dnode/mgmt/daemon/src/daemon.c +++ b/source/dnode/mgmt/daemon/src/daemon.c @@ -141,7 +141,7 @@ void dmnInitOption(SDnodeOpt *pOption) { pOption->shellActivityTimer = tsShellActivityTimer; pOption->statusInterval = tsStatusInterval; pOption->serverPort = tsServerPort; - tstrncpy(pOption->dataDir, tsDataDir, PATH_MAX); + tstrncpy(pOption->dataDir, tsDataDir, TSDB_FILENAME_LEN); tstrncpy(pOption->localEp, tsLocalEp, TSDB_EP_LEN); tstrncpy(pOption->localFqdn, tsLocalFqdn, TSDB_FQDN_LEN); tstrncpy(pOption->firstEp, tsFirst, TSDB_EP_LEN); diff --git a/source/dnode/mgmt/impl/src/dndDnode.c b/source/dnode/mgmt/impl/src/dndDnode.c index ef30503494..fa601a0d99 100644 --- a/source/dnode/mgmt/impl/src/dndDnode.c +++ b/source/dnode/mgmt/impl/src/dndDnode.c @@ -455,12 +455,11 @@ static void *dnodeThreadRoutine(void *param) { while (true) { taosMsleep(ms); - if (dndGetStat(pDnode) != DND_STAT_RUNNING) { - continue; - } - pthread_testcancel(); - dndSendStatusMsg(pDnode); + + if (dndGetStat(pDnode) == DND_STAT_RUNNING) { + dndSendStatusMsg(pDnode); + } } } @@ -501,7 +500,7 @@ int32_t dndInitDnode(SDnode *pDnode) { return -1; } - dInfo("dnd-dnode is initialized"); + dInfo("dnode-dnode is initialized"); return 0; } @@ -531,7 +530,7 @@ void dndCleanupDnode(SDnode *pDnode) { } dndWUnLockDnode(pDnode); - dInfo("dnd-dnode is cleaned up"); + dInfo("dnode-dnode is cleaned up"); } void dndProcessDnodeReq(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index d581c7761c..9b3435a49e 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -616,7 +616,7 @@ static int32_t dndInitMnodeMgmtWorker(SDnode *pDnode) { pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -652,7 +652,7 @@ static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { pPool->min = 0; pPool->max = 1; if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -703,7 +703,7 @@ static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { pPool->min = 0; pPool->max = 1; if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index d5e94106a7..ac3e55ffa7 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -22,6 +22,7 @@ typedef struct { int32_t refCount; int8_t dropped; int8_t accessState; + char *path; SVnode *pImpl; taos_queue pWriteQ; taos_queue pSyncQ; @@ -74,7 +75,7 @@ static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SVnode static SVnodeObj *dndAcquireVnode(SDnode *pDnode, int32_t vgId); static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl); +static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl); static void dndDropVnodeWrapper(SDnode *pDnode, SVnodeObj *pVnode); static SVnodeObj **dndGetVnodesFromHash(SDnode *pDnode, int32_t *numOfVnodes); static int32_t dndGetVnodesFromFile(SDnode *pDnode, SVnodeObj **ppVnodes, int32_t *numOfVnodes); @@ -125,7 +126,7 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { } } -static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl) { +static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, char *path, SVnode *pImpl) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; SVnodeObj *pVnode = calloc(1, sizeof(SVnodeObj)); if (pVnode == NULL) { @@ -139,6 +140,12 @@ static int32_t dndCreateVnodeWrapper(SDnode *pDnode, int32_t vgId, SVnode *pImpl pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; + pVnode->path = tstrdup(path); + if (pVnode->path == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) { return -1; } @@ -354,22 +361,25 @@ static int32_t dndWriteVnodesToFile(SDnode *pDnode) { static int32_t dndCreateVnode(SDnode *pDnode, int32_t vgId, SVnodeCfg *pCfg) { char path[PATH_MAX + 20] = {0}; snprintf(path, sizeof(path), "%s/vnode%d", pDnode->dir.vnodes, vgId); - SVnode *pImpl = vnodeCreate(vgId, path, pCfg); + // SVnode *pImpl = vnodeCreate(vgId, path, pCfg); + SVnode *pImpl = vnodeOpen(path, NULL); if (pImpl == NULL) { return -1; } - int32_t code = dndCreateVnodeWrapper(pDnode, vgId, pImpl); + int32_t code = dndCreateVnodeWrapper(pDnode, vgId, path, pImpl); if (code != 0) { - vnodeDrop(pImpl); + vnodeClose(pImpl); + vnodeDestroy(path); terrno = code; return code; } code = dndWriteVnodesToFile(pDnode); if (code != 0) { - vnodeDrop(pImpl); + vnodeClose(pImpl); + vnodeDestroy(path); terrno = code; return code; } @@ -385,7 +395,8 @@ static int32_t dndDropVnode(SDnode *pDnode, SVnodeObj *pVnode) { } dndDropVnodeWrapper(pDnode, pVnode); - vnodeDrop(pVnode->pImpl); + vnodeClose(pVnode->pImpl); + vnodeDestroy(pVnode->path); dndWriteVnodesToFile(pDnode); return 0; } @@ -413,7 +424,7 @@ static void *dnodeOpenVnodeFunc(void *param) { dError("vgId:%d, failed to open vnode by thread:%d", pVnode->vgId, pThread->threadIndex); pThread->failed++; } else { - dndCreateVnodeWrapper(pDnode, pVnode->vgId, pImpl); + dndCreateVnodeWrapper(pDnode, pVnode->vgId, path, pImpl); dDebug("vgId:%d, is opened by thread:%d", pVnode->vgId, pThread->threadIndex); pThread->opened++; } @@ -433,7 +444,7 @@ static int32_t dndOpenVnodes(SDnode *pDnode) { pMgmt->hash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); if (pMgmt->hash == NULL) { dError("failed to init vnode hash"); - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -874,13 +885,13 @@ static int32_t dndInitVnodeMgmtWorker(SDnode *pDnode) { pPool->min = 1; pPool->max = 1; if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } pMgmt->pMgmtQ = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)dndProcessVnodeMgmtQueue); if (pMgmt->pMgmtQ == NULL) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -918,6 +929,7 @@ static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + return 0; } @@ -938,7 +950,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { pPool->min = (int32_t)threadsForQuery; pPool->max = pPool->min; if (tWorkerInit(pPool) != 0) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } pPool = &pMgmt->fetchPool; @@ -946,7 +959,8 @@ static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores); pPool->max = pPool->min; if (tWorkerInit(pPool) != 0) { - TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; } return 0; @@ -998,7 +1012,7 @@ static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { pPool->name = "vnode-write"; pPool->max = tsNumOfCores; if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -1036,7 +1050,7 @@ static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { pPool->name = "vnode-sync"; pPool->max = maxThreads; if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mgmt/impl/src/dnode.c b/source/dnode/mgmt/impl/src/dnode.c index 0bfcf5d721..aa0070cfa9 100644 --- a/source/dnode/mgmt/impl/src/dnode.c +++ b/source/dnode/mgmt/impl/src/dnode.c @@ -116,6 +116,7 @@ static int32_t dndInitEnv(SDnode *pDnode, SDnodeOpt *pOptions) { return -1; } + memcpy(&pDnode->opt, pOptions, sizeof(SDnodeOpt)); return 0; } diff --git a/source/dnode/vnode/impl/src/vnodeInt.c b/source/dnode/vnode/impl/src/vnodeInt.c index 2cbdf318a2..8a6fc8bf5e 100644 --- a/source/dnode/vnode/impl/src/vnodeInt.c +++ b/source/dnode/vnode/impl/src/vnodeInt.c @@ -17,9 +17,6 @@ #include "vnodeInt.h" #include "tqueue.h" -int32_t vnodeInit(SVnodePara para) { return 0; } -void vnodeCleanup() {} - int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } SVnode *vnodeCreate(int32_t vgId, const char *path, const SVnodeCfg *pCfg) { return NULL; } void vnodeDrop(SVnode *pVnode) {} @@ -31,7 +28,7 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) { return 0; } SVnodeMsg *vnodeInitMsg(int32_t msgNum) { SVnodeMsg *pMsg = taosAllocateQitem(msgNum * sizeof(SRpcMsg *) + sizeof(SVnodeMsg)); if (pMsg == NULL) { - terrno = TSDB_CODE_VND_OUT_OF_MEMORY; + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } else { pMsg->allocNum = msgNum; @@ -41,7 +38,7 @@ SVnodeMsg *vnodeInitMsg(int32_t msgNum) { int32_t vnodeAppendMsg(SVnodeMsg *pMsg, SRpcMsg *pRpcMsg) { if (pMsg->curNum >= pMsg->allocNum) { - return TSDB_CODE_VND_OUT_OF_MEMORY; + return TSDB_CODE_OUT_OF_MEMORY; } pMsg->rpcMsg[pMsg->curNum++] = *pRpcMsg;