From d60941d4e3a74e94f1f0f0fce44e5b06761aa879 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 28 Dec 2021 00:20:48 -0800 Subject: [PATCH 1/4] add bnode --- include/dnode/bnode/bnode.h | 10 +- include/dnode/snode/snode.h | 10 +- source/dnode/bnode/src/bnode.c | 4 +- source/dnode/mgmt/impl/inc/dndBnode.h | 2 +- source/dnode/mgmt/impl/inc/dndInt.h | 25 +- source/dnode/mgmt/impl/inc/dndWorker.h | 4 +- source/dnode/mgmt/impl/src/dndBnode.c | 369 +++++++++++++++++++++++++ source/dnode/mgmt/impl/src/dndQnode.c | 1 - source/dnode/mgmt/impl/src/dndSnode.c | 344 +++++++++++++++++++++++ source/dnode/mgmt/impl/src/dndWorker.c | 36 ++- source/dnode/snode/src/snode.c | 4 +- 11 files changed, 778 insertions(+), 31 deletions(-) create mode 100644 source/dnode/mgmt/impl/src/dndBnode.c create mode 100644 source/dnode/mgmt/impl/src/dndSnode.c diff --git a/include/dnode/bnode/bnode.h b/include/dnode/bnode/bnode.h index 74574f5462..23cc3ca617 100644 --- a/include/dnode/bnode/bnode.h +++ b/include/dnode/bnode/bnode.h @@ -49,10 +49,11 @@ typedef struct { /** * @brief Start one Bnode in Dnode. * + * @param path Path of the bnode. * @param pOption Option of the bnode. * @return SBnode* The bnode object. */ -SBnode *bndOpen(const SBnodeOpt *pOption); +SBnode *bndOpen(const char *path, const SBnodeOpt *pOption); /** * @brief Stop Bnode in Dnode. @@ -79,6 +80,13 @@ int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad); */ int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs); +/** + * @brief Drop a bnode. + * + * @param path Path of the bnode. + */ +void bndDestroy(const char *path); + #ifdef __cplusplus } #endif diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 1d30bd1e43..97069437f2 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -49,10 +49,11 @@ typedef struct { /** * @brief Start one Snode in Dnode. * + * @param path Path of the snode. * @param pOption Option of the snode. * @return SSnode* The snode object. */ -SSnode *sndOpen(const SSnodeOpt *pOption); +SSnode *sndOpen(const char *path, const SSnodeOpt *pOption); /** * @brief Stop Snode in Dnode. @@ -80,6 +81,13 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); */ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +/** + * @brief Drop a snode. + * + * @param path Path of the snode. + */ +void sndDestroy(const char *path); + #ifdef __cplusplus } #endif diff --git a/source/dnode/bnode/src/bnode.c b/source/dnode/bnode/src/bnode.c index 40b22dd58d..9570bc72a0 100644 --- a/source/dnode/bnode/src/bnode.c +++ b/source/dnode/bnode/src/bnode.c @@ -15,7 +15,7 @@ #include "bndInt.h" -SBnode *bndOpen(const SBnodeOpt *pOption) { +SBnode *bndOpen(const char *path, const SBnodeOpt *pOption) { SBnode *pBnode = calloc(1, sizeof(SBnode)); return pBnode; } @@ -25,3 +25,5 @@ void bndClose(SBnode *pBnode) { free(pBnode); } int32_t bndGetLoad(SBnode *pBnode, SBnodeLoad *pLoad) { return 0; } int32_t bndProcessWMsgs(SBnode *pBnode, SArray *pMsgs) { return 0; } + +void bndDestroy(const char *path) {} diff --git a/source/dnode/mgmt/impl/inc/dndBnode.h b/source/dnode/mgmt/impl/inc/dndBnode.h index a350eae2d4..853b54ff69 100644 --- a/source/dnode/mgmt/impl/inc/dndBnode.h +++ b/source/dnode/mgmt/impl/inc/dndBnode.h @@ -24,7 +24,7 @@ extern "C" { int32_t dndInitBnode(SDnode *pDnode); void dndCleanupBnode(SDnode *pDnode); -ioid dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); +void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 0d37828ecd..ff96b7cfdf 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -54,18 +54,19 @@ extern int32_t dDebugFlag; #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat; -typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EDndWorkerType; +typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); typedef struct { - EDndWorkerType type; + EWorkerType type; const char *name; int32_t minNum; int32_t maxNum; - FProcessItem fp; + void *queueFp; SDnode *pDnode; taos_queue queue; SWorkerPool pool; + SMWorkerPool mpool; } SDnodeWorker; typedef struct { @@ -122,25 +123,21 @@ typedef struct { } SQnodeMgmt; typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - char *file; - SSnode *pSnode; - SRWLatch latch; - taos_queue pWriteQ; - SWorkerPool writePool; + int32_t refCount; + int8_t deployed; + int8_t dropped; + SSnode *pSnode; + SRWLatch latch; + SDnodeWorker writeWorker; } SSnodeMgmt; typedef struct { int32_t refCount; int8_t deployed; int8_t dropped; - char *file; SBnode *pBnode; SRWLatch latch; - taos_queue pWriteQ; - SMWorkerPool writePool; + SDnodeWorker writeWorker; } SBnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndWorker.h b/source/dnode/mgmt/impl/inc/dndWorker.h index 237c0518e8..49ef88e67d 100644 --- a/source/dnode/mgmt/impl/inc/dndWorker.h +++ b/source/dnode/mgmt/impl/inc/dndWorker.h @@ -21,8 +21,8 @@ extern "C" { #endif #include "dndInt.h" -int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, FProcessItem fp); +int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, + int32_t maxNum, void *queueFp); void dndCleanupWorker(SDnodeWorker *pWorker); int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c new file mode 100644 index 0000000000..b978c1102f --- /dev/null +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -0,0 +1,369 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndBnode.h" +#include "dndDnode.h" +#include "dndTransport.h" +#include "dndWorker.h" + +static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs); + +static SBnode *dndAcquireBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + SBnode *pBnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pMgmt->deployed && !pMgmt->dropped) { + refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); + pBnode = pMgmt->pBnode; + } else { + terrno = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; + } + taosRUnLockLatch(&pMgmt->latch); + + if (pBnode != NULL) { + dTrace("acquire bnode, refCount:%d", refCount); + } + return pBnode; +} + +static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pBnode != NULL) { + refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); + } + taosRUnLockLatch(&pMgmt->latch); + + if (pBnode != NULL) { + dTrace("release bnode, refCount:%d", refCount); + } +} + +static int32_t dndReadBnodeFile(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + int32_t code = TSDB_CODE_DND_BNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "r"); + if (fp == NULL) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_BNODE_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_BNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_BNODE_OVER; + } + + cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); + if (!deployed || deployed->type != cJSON_Number) { + dError("failed to read %s since deployed not found", file); + goto PRASE_BNODE_OVER; + } + pMgmt->deployed = deployed->valueint; + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_Number) { + dError("failed to read %s since dropped not found", file); + goto PRASE_BNODE_OVER; + } + pMgmt->dropped = dropped->valueint; + + code = 0; + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + +PRASE_BNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + terrno = code; + return code; +} + +static int32_t dndWriteBnodeFile(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "w"); + if (fp == NULL) { + terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; + dError("failed to write %s since %s", file, terrstr()); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + + if (taosRenameFile(file, file) != 0) { + terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; + dError("failed to rename %s since %s", file, terrstr()); + return -1; + } + + dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + return 0; +} + +static int32_t dndStartBnodeWorker(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "bnode-write", 0, 1, + (FProcessItem)dndProcessBnodeQueue) != 0) { + dError("failed to start bnode write worker since %s", terrstr()); + return -1; + } + + return 0; +} + +static void dndStopBnodeWorker(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + + taosWLockLatch(&pMgmt->latch); + pMgmt->deployed = 0; + taosWUnLockLatch(&pMgmt->latch); + + while (pMgmt->refCount > 1) { + taosMsleep(10); + } + + dndCleanupWorker(&pMgmt->writeWorker); +} + +static void dndBuildBnodeOption(SDnode *pDnode, SBnodeOpt *pOption) { + pOption->pDnode = pDnode; + pOption->sendMsgToDnodeFp = dndSendMsgToDnode; + pOption->sendMsgToMnodeFp = dndSendMsgToMnode; + pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->dnodeId = dndGetDnodeId(pDnode); + pOption->clusterId = dndGetClusterId(pDnode); + pOption->cfg.sver = pDnode->opt.sver; +} + +static int32_t dndOpenBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + SBnodeOpt option = {0}; + dndBuildBnodeOption(pDnode, &option); + + SBnode *pBnode = bndOpen(pDnode->dir.bnode, &option); + if (pBnode == NULL) { + dError("failed to open bnode since %s", terrstr()); + return -1; + } + + if (dndStartBnodeWorker(pDnode) != 0) { + dError("failed to start bnode worker since %s", terrstr()); + bndClose(pBnode); + return -1; + } + + if (dndWriteBnodeFile(pDnode) != 0) { + dError("failed to write bnode file since %s", terrstr()); + dndStopBnodeWorker(pDnode); + bndClose(pBnode); + return -1; + } + + taosWLockLatch(&pMgmt->latch); + pMgmt->pBnode = pBnode; + pMgmt->deployed = 1; + taosWUnLockLatch(&pMgmt->latch); + + dInfo("bnode open successfully"); + return 0; +} + +static int32_t dndDropBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + + SBnode *pBnode = dndAcquireBnode(pDnode); + if (pBnode == NULL) { + dError("failed to drop bnode since %s", terrstr()); + return -1; + } + + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 1; + taosRUnLockLatch(&pMgmt->latch); + + if (dndWriteBnodeFile(pDnode) != 0) { + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 0; + taosRUnLockLatch(&pMgmt->latch); + + dndReleaseBnode(pDnode, pBnode); + dError("failed to drop bnode since %s", terrstr()); + return -1; + } + + dndReleaseBnode(pDnode, pBnode); + dndStopBnodeWorker(pDnode); + bndClose(pBnode); + pMgmt->pBnode = NULL; + bndDestroy(pDnode->dir.bnode); + + return 0; +} + +int32_t dndProcessCreateBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { + SCreateBnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); + + if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_BNODE_ID_INVALID; + return -1; + } else { + return dndOpenBnode(pDnode); + } +} + +int32_t dndProcessDropBnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { + SDropBnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); + + if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_BNODE_ID_INVALID; + return -1; + } else { + return dndDropBnode(pDnode); + } +} + +static void dndSendBnodeErrorRsp(SRpcMsg *pMsg, int32_t code) { + SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rpcRsp); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static void dndSendBnodeErrorRsps(taos_qall qall, int32_t numOfMsgs, int32_t code) { + for (int32_t i = 0; i < numOfMsgs; ++i) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + dndSendBnodeErrorRsp(pMsg, code); + } +} + +static void dndProcessBnodeQueue(SDnode *pDnode, taos_qall qall, int32_t numOfMsgs) { + SBnode *pBnode = dndAcquireBnode(pDnode); + if (pBnode == NULL) { + dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); + return; + } + + SArray *pArray = taosArrayInit(numOfMsgs, sizeof(SRpcMsg *)); + if (pArray == NULL) { + dndReleaseBnode(pDnode, pBnode); + dndSendBnodeErrorRsps(qall, numOfMsgs, TSDB_CODE_OUT_OF_MEMORY); + return; + } + + for (int32_t i = 0; i < numOfMsgs; ++i) { + SRpcMsg *pMsg = NULL; + taosGetQitem(qall, (void **)&pMsg); + void *ptr = taosArrayPush(pArray, &pMsg); + if (ptr == NULL) { + dndSendBnodeErrorRsp(pMsg, TSDB_CODE_OUT_OF_MEMORY); + } + } + + bndProcessWMsgs(pBnode, pArray); + + for (size_t i = 0; i < numOfMsgs; i++) { + SRpcMsg *pMsg = *(SRpcMsg **)taosArrayGet(pArray, i); + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); + } + taosArrayDestroy(pArray); + dndReleaseBnode(pDnode, pBnode); +} + +static void dndWriteBnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_BNODE_NOT_DEPLOYED; + + SBnode *pBnode = dndAcquireBnode(pDnode); + if (pBnode != NULL) { + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + dndReleaseBnode(pDnode, pBnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + +void dndProcessBnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteBnodeMsgToWorker(pDnode, &pDnode->bmgmt.writeWorker, pMsg); +} + +int32_t dndInitBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + taosInitRWLatch(&pMgmt->latch); + + if (dndReadBnodeFile(pDnode) != 0) { + return -1; + } + + if (pMgmt->dropped) return 0; + if (!pMgmt->deployed) return 0; + + return dndOpenBnode(pDnode); +} + +void dndCleanupBnode(SDnode *pDnode) { + SBnodeMgmt *pMgmt = &pDnode->bmgmt; + if (pMgmt->pBnode) { + dndStopBnodeWorker(pDnode); + bndClose(pMgmt->pBnode); + pMgmt->pBnode = NULL; + } +} diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 8c76bf95a6..0b92de81c1 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -252,7 +252,6 @@ static int32_t dndDropQnode(SDnode *pDnode) { dndStopQnodeWorker(pDnode); qndClose(pQnode); pMgmt->pQnode = NULL; - // qndDestroy(pDnode->dir.qnode); return 0; } diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c new file mode 100644 index 0000000000..c1eb347350 --- /dev/null +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -0,0 +1,344 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndSnode.h" +#include "dndDnode.h" +#include "dndTransport.h" +#include "dndWorker.h" + +static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); + +static SSnode *dndAcquireSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + SSnode *pSnode = NULL; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pMgmt->deployed && !pMgmt->dropped) { + refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); + pSnode = pMgmt->pSnode; + } else { + terrno = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + } + taosRUnLockLatch(&pMgmt->latch); + + if (pSnode != NULL) { + dTrace("acquire snode, refCount:%d", refCount); + } + return pSnode; +} + +static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + int32_t refCount = 0; + + taosRLockLatch(&pMgmt->latch); + if (pSnode != NULL) { + refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); + } + taosRUnLockLatch(&pMgmt->latch); + + if (pSnode != NULL) { + dTrace("release snode, refCount:%d", refCount); + } +} + +static int32_t dndReadSnodeFile(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + int32_t code = TSDB_CODE_DND_SNODE_READ_FILE_ERROR; + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + cJSON *root = NULL; + + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "r"); + if (fp == NULL) { + dDebug("file %s not exist", file); + code = 0; + goto PRASE_SNODE_OVER; + } + + len = (int32_t)fread(content, 1, maxLen, fp); + if (len <= 0) { + dError("failed to read %s since content is null", file); + goto PRASE_SNODE_OVER; + } + + content[len] = 0; + root = cJSON_Parse(content); + if (root == NULL) { + dError("failed to read %s since invalid json format", file); + goto PRASE_SNODE_OVER; + } + + cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); + if (!deployed || deployed->type != cJSON_Number) { + dError("failed to read %s since deployed not found", file); + goto PRASE_SNODE_OVER; + } + pMgmt->deployed = deployed->valueint; + + cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); + if (!dropped || dropped->type != cJSON_Number) { + dError("failed to read %s since dropped not found", file); + goto PRASE_SNODE_OVER; + } + pMgmt->dropped = dropped->valueint; + + code = 0; + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + +PRASE_SNODE_OVER: + if (content != NULL) free(content); + if (root != NULL) cJSON_Delete(root); + if (fp != NULL) fclose(fp); + + terrno = code; + return code; +} + +static int32_t dndWriteSnodeFile(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "w"); + if (fp == NULL) { + terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR; + dError("failed to write %s since %s", file, terrstr()); + return -1; + } + + int32_t len = 0; + int32_t maxLen = 4096; + char *content = calloc(1, maxLen + 1); + + len += snprintf(content + len, maxLen - len, "{\n"); + len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); + len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped); + len += snprintf(content + len, maxLen - len, "}\n"); + + fwrite(content, 1, len, fp); + taosFsyncFile(fileno(fp)); + fclose(fp); + free(content); + + if (taosRenameFile(file, file) != 0) { + terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR; + dError("failed to rename %s since %s", file, terrstr()); + return -1; + } + + dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + return 0; +} + +static int32_t dndStartSnodeWorker(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, + (FProcessItem)dndProcessSnodeQueue) != 0) { + dError("failed to start snode write worker since %s", terrstr()); + return -1; + } + + return 0; +} + +static void dndStopSnodeWorker(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + + taosWLockLatch(&pMgmt->latch); + pMgmt->deployed = 0; + taosWUnLockLatch(&pMgmt->latch); + + while (pMgmt->refCount > 1) { + taosMsleep(10); + } + + dndCleanupWorker(&pMgmt->writeWorker); +} + +static void dndBuildSnodeOption(SDnode *pDnode, SSnodeOpt *pOption) { + pOption->pDnode = pDnode; + pOption->sendMsgToDnodeFp = dndSendMsgToDnode; + pOption->sendMsgToMnodeFp = dndSendMsgToMnode; + pOption->sendRedirectMsgFp = dndSendRedirectMsg; + pOption->dnodeId = dndGetDnodeId(pDnode); + pOption->clusterId = dndGetClusterId(pDnode); + pOption->cfg.sver = pDnode->opt.sver; +} + +static int32_t dndOpenSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + SSnodeOpt option = {0}; + dndBuildSnodeOption(pDnode, &option); + + SSnode *pSnode = sndOpen(pDnode->dir.snode, &option); + if (pSnode == NULL) { + dError("failed to open snode since %s", terrstr()); + return -1; + } + + if (dndStartSnodeWorker(pDnode) != 0) { + dError("failed to start snode worker since %s", terrstr()); + sndClose(pSnode); + return -1; + } + + if (dndWriteSnodeFile(pDnode) != 0) { + dError("failed to write snode file since %s", terrstr()); + dndStopSnodeWorker(pDnode); + sndClose(pSnode); + return -1; + } + + taosWLockLatch(&pMgmt->latch); + pMgmt->pSnode = pSnode; + pMgmt->deployed = 1; + taosWUnLockLatch(&pMgmt->latch); + + dInfo("snode open successfully"); + return 0; +} + +static int32_t dndDropSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode == NULL) { + dError("failed to drop snode since %s", terrstr()); + return -1; + } + + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 1; + taosRUnLockLatch(&pMgmt->latch); + + if (dndWriteSnodeFile(pDnode) != 0) { + taosRLockLatch(&pMgmt->latch); + pMgmt->dropped = 0; + taosRUnLockLatch(&pMgmt->latch); + + dndReleaseSnode(pDnode, pSnode); + dError("failed to drop snode since %s", terrstr()); + return -1; + } + + dndReleaseSnode(pDnode, pSnode); + dndStopSnodeWorker(pDnode); + sndClose(pSnode); + pMgmt->pSnode = NULL; + sndDestroy(pDnode->dir.snode); + + return 0; +} + +int32_t dndProcessCreateSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { + SCreateSnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); + + if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_SNODE_ID_INVALID; + return -1; + } else { + return dndOpenSnode(pDnode); + } +} + +int32_t dndProcessDropSnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { + SDropSnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); + + if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { + terrno = TSDB_CODE_DND_SNODE_ID_INVALID; + return -1; + } else { + return dndDropSnode(pDnode); + } +} + +static void dndProcessSnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + SRpcMsg *pRsp = NULL; + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + code = sndProcessMsg(pSnode, pMsg, &pRsp); + } + + if (pRsp != NULL) { + pRsp->ahandle = pMsg->ahandle; + rpcSendResponse(pRsp); + free(pRsp); + } else { + if (code != 0) code = terrno; + SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rpcRsp); + } + + rpcFreeCont(pMsg->pCont); + taosFreeQitem(pMsg); +} + +static void dndWriteSnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_SNODE_NOT_DEPLOYED; + + SSnode *pSnode = dndAcquireSnode(pDnode); + if (pSnode != NULL) { + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); + } + dndReleaseSnode(pDnode, pSnode); + + if (code != 0) { + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pMsg->pCont); + } +} + +void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { + dndWriteSnodeMsgToWorker(pDnode, &pDnode->smgmt.writeWorker, pMsg); +} + +int32_t dndInitSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + taosInitRWLatch(&pMgmt->latch); + + if (dndReadSnodeFile(pDnode) != 0) { + return -1; + } + + if (pMgmt->dropped) return 0; + if (!pMgmt->deployed) return 0; + + return dndOpenSnode(pDnode); +} + +void dndCleanupSnode(SDnode *pDnode) { + SSnodeMgmt *pMgmt = &pDnode->smgmt; + if (pMgmt->pSnode) { + dndStopSnodeWorker(pDnode); + sndClose(pMgmt->pSnode); + pMgmt->pSnode = NULL; + } +} diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index da0e3a9319..c421437e4d 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -16,9 +16,9 @@ #define _DEFAULT_SOURCE #include "dndWorker.h" -int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum, - int32_t maxNum, FProcessItem fp) { - if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || fp == NULL) { +int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EWorkerType type, const char *name, int32_t minNum, + int32_t maxNum, void *queueFp) { + if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || queueFp == NULL) { terrno = TSDB_CODE_INVALID_PARA; return -1; } @@ -27,19 +27,32 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type pWorker->name = name; pWorker->minNum = minNum; pWorker->maxNum = maxNum; - pWorker->fp = fp; + pWorker->queueFp = queueFp; pWorker->pDnode = pDnode; if (pWorker->type == DND_WORKER_SINGLE) { SWorkerPool *pPool = &pWorker->pool; + pPool->name = name; pPool->min = minNum; pPool->max = maxNum; if (tWorkerInit(pPool) != 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - - pWorker->queue = tWorkerAllocQueue(&pPool, pDnode, fp); + pWorker->queue = tWorkerAllocQueue(pPool, pDnode, (FProcessItem)queueFp); + if (pWorker->queue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } else if (pWorker->type == DND_WORKER_MULTI) { + SMWorkerPool *pPool = &pWorker->mpool; + pPool->name = name; + pPool->max = maxNum; + if (tMWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + pWorker->queue = tMWorkerAllocQueue(pPool, pDnode, (FProcessItems)queueFp); if (pWorker->queue == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; @@ -52,12 +65,17 @@ int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type } void dndCleanupWorker(SDnodeWorker *pWorker) { + while (!taosQueueEmpty(pWorker->queue)) { + taosMsleep(10); + } + if (pWorker->type == DND_WORKER_SINGLE) { - while (!taosQueueEmpty(pWorker->queue)) { - taosMsleep(10); - } tWorkerCleanup(&pWorker->pool); tWorkerFreeQueue(&pWorker->pool, pWorker->queue); + } else if (pWorker->type == DND_WORKER_MULTI) { + tWorkerCleanup(&pWorker->mpool); + tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue); + } else { } } diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 3423ce41e2..7ae4d49059 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -15,7 +15,7 @@ #include "sndInt.h" -SSnode *sndOpen(const SSnodeOpt *pOption) { +SSnode *sndOpen(const char *path, const SSnodeOpt *pOption) { SSnode *pSnode = calloc(1, sizeof(SSnode)); return pSnode; } @@ -28,3 +28,5 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { *pRsp = NULL; return 0; } + +void sndDestroy(const char *path) {} \ No newline at end of file From c9f38d2be294544308154059da53d6023e6941d3 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 28 Dec 2021 17:01:09 +0800 Subject: [PATCH 2/4] send create topic req --- include/common/tmsg.h | 21 +++++++++++++ include/libs/parser/parser.h | 4 +-- source/client/src/clientImpl.c | 56 +++++++++++++++++++++++++++++++++- 3 files changed, 78 insertions(+), 3 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 6727dd3289..25002a9f92 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1063,6 +1063,27 @@ typedef struct STaskDropRsp { int32_t code; } STaskDropRsp; +typedef struct { + int8_t igExists; + char* name; + char* phyPlan; +} SCMCreateTopicReq; + +static FORCE_INLINE int tSerializeSCMCreateTopicReq(void** buf, const SCMCreateTopicReq* pReq) { + int tlen = 0; + tlen += taosEncodeString(buf, pReq->name); + tlen += taosEncodeFixedI8(buf, pReq->igExists); + tlen += taosEncodeString(buf, pReq->phyPlan); + return tlen; +} + +static FORCE_INLINE void* tDeserializeSCMCreateTopicReq(void* buf, SCMCreateTopicReq* pReq) { + buf = taosDecodeFixedI8(buf, &(pReq->igExists)); + buf = taosDecodeString(buf, &(pReq->name)); + buf = taosDecodeString(buf, &(pReq->phyPlan)); + return buf; +} + typedef struct { char name[TSDB_TOPIC_FNAME_LEN]; int8_t igExists; diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index f2f3fcd49b..c7d637ee8a 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -46,7 +46,7 @@ int32_t qParseQuerySql(SParseContext* pContext, SQueryNode** pQuery); bool qIsDdlQuery(const SQueryNode* pQuery); -void qDestoryQuery(SQueryNode* pQuery); +void qDestroyQuery(SQueryNode* pQuery); /** * Convert a normal sql statement to only query tags information to enable that the subscribe client can be aware quickly of the true vgroup ids that @@ -86,4 +86,4 @@ void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* p } #endif -#endif /*_TD_PARSER_H_*/ \ No newline at end of file +#endif /*_TD_PARSER_H_*/ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 0ee99f77aa..6220d7eb50 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -212,6 +212,60 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, void** pJob) { return scheduleAsyncExecJob(pRequest->pTscObj->pTransporter, NULL/*todo appInfo.xxx*/, pDag, pJob); } +TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen) { + STscObj* pTscObj = (STscObj*)taos; + SRequestObj* pRequest = NULL; + SQueryNode* pQuery = NULL; + SQueryDag* pDag = NULL; + char *dagStr = NULL; + + //parse sql to logical plan and physical plan + //send topic name and plans to mnode + + terrno = TSDB_CODE_SUCCESS; + + CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); + CHECK_CODE_GOTO(parseSql(pRequest, &pQuery), _return); + //TODO: check sql valid + + CHECK_CODE_GOTO(qCreateQueryDag(pQuery, &pDag), _return); + + dagStr = qDagToString(pDag); + if(dagStr == NULL) { + //TODO + } + + SCMCreateTopicReq req = { + .name = (char*)name, + .igExists = 0, + .phyPlan = dagStr, + }; + + void* buf = NULL; + int tlen = tSerializeSCMCreateTopicReq(&buf, &req); + + pRequest->body.requestMsg = (SDataBuf){ .pData = buf, .len = tlen }; + + SMsgSendInfo* body = buildSendMsgInfoImpl(pRequest); + SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet; + + int64_t transporterId = 0; + asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, body); + + tsem_wait(&pRequest->body.rspSem); + + destroySendMsgInfo(body); + +_return: + qDestroyQuery(pQuery); + qDestroyQueryDag(pDag); + destroySendMsgInfo(body); + if (pRequest != NULL && terrno != TSDB_CODE_SUCCESS) { + pRequest->code = terrno; + } + return pRequest; +} + TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { STscObj *pTscObj = (STscObj *)taos; if (sqlLen > (size_t) tsMaxSQLStringLen) { @@ -239,7 +293,7 @@ TAOS_RES *taos_query_l(TAOS *taos, const char *sql, int sqlLen) { } _return: - qDestoryQuery(pQuery); + qDestroyQuery(pQuery); qDestroyQueryDag(pDag); if (NULL != pRequest && TSDB_CODE_SUCCESS != terrno) { pRequest->code = terrno; From 041e6716010b2e641e076c2ed2b566200392bc38 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 28 Dec 2021 01:38:15 -0800 Subject: [PATCH 3/4] refact mnode queue --- include/dnode/mnode/mnode.h | 20 +- include/dnode/snode/snode.h | 2 +- source/dnode/mgmt/impl/inc/dndInt.h | 32 +- source/dnode/mgmt/impl/src/dndBnode.c | 22 +- source/dnode/mgmt/impl/src/dndMnode.c | 368 ++++---------------- source/dnode/mgmt/impl/src/dndQnode.c | 18 +- source/dnode/mgmt/impl/src/dndSnode.c | 22 +- source/dnode/mgmt/impl/src/dndWorker.c | 19 +- source/dnode/mgmt/impl/test/dnode/dnode.cpp | 2 +- source/dnode/mnode/impl/src/mndDnode.c | 2 +- source/dnode/mnode/impl/src/mnode.c | 8 +- source/libs/parser/src/astToMsg.c | 1 + tests/script/unique/dnode/basic1.sim | 11 + 13 files changed, 160 insertions(+), 367 deletions(-) diff --git a/include/dnode/mnode/mnode.h b/include/dnode/mnode/mnode.h index e0619b2133..a288e3e630 100644 --- a/include/dnode/mnode/mnode.h +++ b/include/dnode/mnode/mnode.h @@ -147,28 +147,12 @@ void mndCleanupMsg(SMnodeMsg *pMsg); void mndSendRsp(SMnodeMsg *pMsg, int32_t code); /** - * @brief Process the read request. + * @brief Process the read, write, sync request. * * @param pMsg The request msg. * @return int32_t 0 for success, -1 for failure. */ -void mndProcessReadMsg(SMnodeMsg *pMsg); - -/** - * @brief Process the write request. - * - * @param pMsg The request msg. - * @return int32_t 0 for success, -1 for failure. - */ -void mndProcessWriteMsg(SMnodeMsg *pMsg); - -/** - * @brief Process the sync request. - * - * @param pMsg The request msg. - * @return int32_t 0 for success, -1 for failure. - */ -void mndProcessSyncMsg(SMnodeMsg *pMsg); +void mndProcessMsg(SMnodeMsg *pMsg); #ifdef __cplusplus } diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 97069437f2..4913d2572f 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -79,7 +79,7 @@ int32_t sndGetLoad(SSnode *pSnode, SSnodeLoad *pLoad); * @param pRsp The response message * @return int32_t 0 for success, -1 for failure */ -int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t sndProcessMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg **pRsp); /** * @brief Drop a snode. diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index ff96b7cfdf..954e21aefa 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -65,8 +65,10 @@ typedef struct { void *queueFp; SDnode *pDnode; taos_queue queue; - SWorkerPool pool; - SMWorkerPool mpool; + union { + SWorkerPool pool; + SMWorkerPool mpool; + }; } SDnodeWorker; typedef struct { @@ -95,21 +97,17 @@ typedef struct { } SDnodeMgmt; typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; - char *file; - SMnode *pMnode; - SRWLatch latch; - taos_queue pReadQ; - taos_queue pWriteQ; - taos_queue pSyncQ; - SWorkerPool readPool; - SWorkerPool writePool; - SWorkerPool syncPool; + int32_t refCount; + int8_t deployed; + int8_t dropped; + SMnode *pMnode; + SRWLatch latch; + SDnodeWorker readWorker; + SDnodeWorker writeWorker; + SDnodeWorker syncWorker; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; } SMnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index b978c1102f..992f6ac0a1 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -140,20 +140,22 @@ static int32_t dndWriteBnodeFile(SDnode *pDnode) { fclose(fp); free(content); - if (taosRenameFile(file, file) != 0) { + char realfile[PATH_MAX + 20]; + snprintf(realfile, PATH_MAX + 20, "%s/bnode.json", pDnode->dir.dnode); + + if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR; dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartBnodeWorker(SDnode *pDnode) { SBnodeMgmt *pMgmt = &pDnode->bmgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "bnode-write", 0, 1, - (FProcessItem)dndProcessBnodeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_MULTI, "bnode-write", 0, 1, dndProcessBnodeQueue) != 0) { dError("failed to start bnode write worker since %s", terrstr()); return -1; } @@ -202,7 +204,9 @@ static int32_t dndOpenBnode(SDnode *pDnode) { return -1; } + pMgmt->deployed = 1; if (dndWriteBnodeFile(pDnode) != 0) { + pMgmt->deployed = 0; dError("failed to write bnode file since %s", terrstr()); dndStopBnodeWorker(pDnode); bndClose(pBnode); @@ -211,7 +215,6 @@ static int32_t dndOpenBnode(SDnode *pDnode) { taosWLockLatch(&pMgmt->latch); pMgmt->pBnode = pBnode; - pMgmt->deployed = 1; taosWUnLockLatch(&pMgmt->latch); dInfo("bnode open successfully"); @@ -243,6 +246,8 @@ static int32_t dndDropBnode(SDnode *pDnode) { dndReleaseBnode(pDnode, pBnode); dndStopBnodeWorker(pDnode); + pMgmt->deployed = 0; + dndWriteBnodeFile(pDnode); bndClose(pBnode); pMgmt->pBnode = NULL; bndDestroy(pDnode->dir.bnode); @@ -353,7 +358,12 @@ int32_t dndInitBnode(SDnode *pDnode) { return -1; } - if (pMgmt->dropped) return 0; + if (pMgmt->dropped) { + dInfo("bnode has been deployed and needs to be deleted"); + bndDestroy(pDnode->dir.bnode); + return 0; + } + if (!pMgmt->deployed) return 0; return dndOpenBnode(pDnode); diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 59f809489e..8fb95c0b75 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -17,42 +17,9 @@ #include "dndMnode.h" #include "dndDnode.h" #include "dndTransport.h" +#include "dndWorker.h" -static int32_t dndInitMnodeReadWorker(SDnode *pDnode); -static int32_t dndInitMnodeWriteWorker(SDnode *pDnode); -static int32_t dndInitMnodeSyncWorker(SDnode *pDnode); -static void dndCleanupMnodeReadWorker(SDnode *pDnode); -static void dndCleanupMnodeWriteWorker(SDnode *pDnode); -static void dndCleanupMnodeSyncWorker(SDnode *pDnode); -static void dndCleanupMnodeMgmtWorker(SDnode *pDnode); -static int32_t dndAllocMnodeReadQueue(SDnode *pDnode); -static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode); -static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode); -static void dndFreeMnodeReadQueue(SDnode *pDnode); -static void dndFreeMnodeWriteQueue(SDnode *pDnode); -static void dndFreeMnodeSyncQueue(SDnode *pDnode); -static void dndFreeMnodeMgmtQueue(SDnode *pDnode); - -static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg); -static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg); -void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); - -static int32_t dndStartMnodeWorker(SDnode *pDnode); -static void dndStopMnodeWorker(SDnode *pDnode); - -static SMnode *dndAcquireMnode(SDnode *pDnode); -static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode); - -static int32_t dndReadMnodeFile(SDnode *pDnode); -static int32_t dndWriteMnodeFile(SDnode *pDnode); - -static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption); -static int32_t dndAlterMnode(SDnode *pDnode, SMnodeOpt *pOption); -static int32_t dndDropMnode(SDnode *pDnode); +static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg); static SMnode *dndAcquireMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; @@ -97,49 +64,52 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { char *content = calloc(1, maxLen + 1); cJSON *root = NULL; - FILE *fp = fopen(pMgmt->file, "r"); + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "r"); if (fp == NULL) { - dDebug("file %s not exist", pMgmt->file); + dDebug("file %s not exist", file); code = 0; goto PRASE_MNODE_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", pMgmt->file); + dError("failed to read %s since content is null", file); goto PRASE_MNODE_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", pMgmt->file); + dError("failed to read %s since invalid json format", file); goto PRASE_MNODE_OVER; } cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); if (!deployed || deployed->type != cJSON_Number) { - dError("failed to read %s since deployed not found", pMgmt->file); + dError("failed to read %s since deployed not found", file); goto PRASE_MNODE_OVER; } pMgmt->deployed = deployed->valueint; cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", pMgmt->file); + dError("failed to read %s since dropped not found", file); goto PRASE_MNODE_OVER; } pMgmt->dropped = dropped->valueint; cJSON *mnodes = cJSON_GetObjectItem(root, "mnodes"); if (!mnodes || mnodes->type != cJSON_Array) { - dError("failed to read %s since nodes not found", pMgmt->file); + dError("failed to read %s since nodes not found", file); goto PRASE_MNODE_OVER; } pMgmt->replica = cJSON_GetArraySize(mnodes); if (pMgmt->replica <= 0 || pMgmt->replica > TSDB_MAX_REPLICA) { - dError("failed to read %s since mnodes size %d invalid", pMgmt->file, pMgmt->replica); + dError("failed to read %s since mnodes size %d invalid", file, pMgmt->replica); goto PRASE_MNODE_OVER; } @@ -151,28 +121,28 @@ static int32_t dndReadMnodeFile(SDnode *pDnode) { cJSON *id = cJSON_GetObjectItem(node, "id"); if (!id || id->type != cJSON_Number) { - dError("failed to read %s since id not found", pMgmt->file); + dError("failed to read %s since id not found", file); goto PRASE_MNODE_OVER; } pReplica->id = id->valueint; cJSON *fqdn = cJSON_GetObjectItem(node, "fqdn"); if (!fqdn || fqdn->type != cJSON_String || fqdn->valuestring == NULL) { - dError("failed to read %s since fqdn not found", pMgmt->file); + dError("failed to read %s since fqdn not found", file); goto PRASE_MNODE_OVER; } tstrncpy(pReplica->fqdn, fqdn->valuestring, TSDB_FQDN_LEN); cJSON *port = cJSON_GetObjectItem(node, "port"); if (!port || port->type != cJSON_Number) { - dError("failed to read %s since port not found", pMgmt->file); + dError("failed to read %s since port not found", file); goto PRASE_MNODE_OVER; } pReplica->port = port->valueint; } code = 0; - dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); PRASE_MNODE_OVER: if (content != NULL) free(content); @@ -186,8 +156,8 @@ PRASE_MNODE_OVER: static int32_t dndWriteMnodeFile(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; - char file[PATH_MAX + 20] = {0}; - snprintf(file, sizeof(file), "%s.bak", pMgmt->file); + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/mnode.json.bak", pDnode->dir.dnode); FILE *fp = fopen(file, "w"); if (fp == NULL) { @@ -223,47 +193,36 @@ static int32_t dndWriteMnodeFile(SDnode *pDnode) { fclose(fp); free(content); - if (taosRenameFile(file, pMgmt->file) != 0) { + char realfile[PATH_MAX + 20]; + snprintf(realfile, PATH_MAX + 20, "%s/mnode.json", pDnode->dir.dnode); + + if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_MNODE_WRITE_FILE_ERROR; - dError("failed to rename %s since %s", pMgmt->file, terrstr()); + dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartMnodeWorker(SDnode *pDnode) { - if (dndInitMnodeReadWorker(pDnode) != 0) { + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, dndProcessMnodeQueue) != 0) { dError("failed to start mnode read worker since %s", terrstr()); return -1; } - if (dndInitMnodeWriteWorker(pDnode) != 0) { + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "mnode-write", 0, 1, dndProcessMnodeQueue) != 0) { dError("failed to start mnode write worker since %s", terrstr()); return -1; } - if (dndInitMnodeSyncWorker(pDnode) != 0) { + if (dndInitWorker(pDnode, &pMgmt->syncWorker, DND_WORKER_SINGLE, "mnode-sync", 0, 1, dndProcessMnodeQueue) != 0) { dError("failed to start mnode sync worker since %s", terrstr()); return -1; } - if (dndAllocMnodeReadQueue(pDnode) != 0) { - dError("failed to alloc mnode read queue since %s", terrstr()); - return -1; - } - - if (dndAllocMnodeWriteQueue(pDnode) != 0) { - dError("failed to alloc mnode write queue since %s", terrstr()); - return -1; - } - - if (dndAllocMnodeSyncQueue(pDnode) != 0) { - dError("failed to alloc mnode sync queue since %s", terrstr()); - return -1; - } - return 0; } @@ -274,18 +233,13 @@ static void dndStopMnodeWorker(SDnode *pDnode) { pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); - while (pMgmt->refCount > 1) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pReadQ)) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pWriteQ)) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pSyncQ)) taosMsleep(10); + while (pMgmt->refCount > 1) { + taosMsleep(10); + } - dndCleanupMnodeReadWorker(pDnode); - dndCleanupMnodeWriteWorker(pDnode); - dndCleanupMnodeSyncWorker(pDnode); - - dndFreeMnodeReadQueue(pDnode); - dndFreeMnodeWriteQueue(pDnode); - dndFreeMnodeSyncQueue(pDnode); + dndCleanupWorker(&pMgmt->readWorker); + dndCleanupWorker(&pMgmt->writeWorker); + dndCleanupWorker(&pMgmt->syncWorker); } static bool dndNeedDeployMnode(SDnode *pDnode) { @@ -383,28 +337,21 @@ static int32_t dndOpenMnode(SDnode *pDnode, SMnodeOpt *pOption) { dError("failed to open mnode since %s", terrstr()); return -1; } - pMgmt->deployed = 1; - int32_t code = dndWriteMnodeFile(pDnode); - if (code != 0) { - dError("failed to write mnode file since %s", terrstr()); - code = terrno; - pMgmt->deployed = 0; + if (dndStartMnodeWorker(pDnode) != 0) { + dError("failed to start mnode worker since %s", terrstr()); mndClose(pMnode); mndDestroy(pDnode->dir.mnode); - terrno = code; return -1; } - code = dndStartMnodeWorker(pDnode); - if (code != 0) { - dError("failed to start mnode worker since %s", terrstr()); - code = terrno; + pMgmt->deployed = 1; + if (dndWriteMnodeFile(pDnode) != 0) { + dError("failed to write mnode file since %s", terrstr()); pMgmt->deployed = 0; dndStopMnodeWorker(pDnode); mndClose(pMnode); mndDestroy(pDnode->dir.mnode); - terrno = code; return -1; } @@ -461,6 +408,7 @@ static int32_t dndDropMnode(SDnode *pDnode) { dndReleaseMnode(pDnode, pMnode); dndStopMnodeWorker(pDnode); + pMgmt->deployed = 0; dndWriteMnodeFile(pDnode); mndClose(pMnode); pMgmt->pMnode = NULL; @@ -528,13 +476,12 @@ int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } } - -static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { +static void dndProcessMnodeQueue(SDnode *pDnode, SMnodeMsg *pMsg) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { - mndProcessReadMsg(pMsg); + mndProcessMsg(pMsg); dndReleaseMnode(pDnode, pMnode); } else { mndSendRsp(pMsg, terrno); @@ -543,208 +490,43 @@ static void dndProcessMnodeReadQueue(SDnode *pDnode, SMnodeMsg *pMsg) { mndCleanupMsg(pMsg); } -static void dndProcessMnodeWriteQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; +static void dndWriteMnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pRpcMsg) { + int32_t code = TSDB_CODE_DND_MNODE_NOT_DEPLOYED; SMnode *pMnode = dndAcquireMnode(pDnode); if (pMnode != NULL) { - mndProcessWriteMsg(pMsg); - dndReleaseMnode(pDnode, pMnode); - } else { - mndSendRsp(pMsg, terrno); + SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); + if (pMsg == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + } else { + code = dndWriteMsgToWorker(pWorker, pMsg, 0); + } + + if (code != 0) { + mndCleanupMsg(pMsg); + } } + dndReleaseMnode(pDnode, pMnode); - mndCleanupMsg(pMsg); -} - -static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode != NULL) { - mndProcessSyncMsg(pMsg); - dndReleaseMnode(pDnode, pMnode); - } else { - mndSendRsp(pMsg, terrno); + if (code != 0) { + if (pRpcMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .ahandle = pRpcMsg->ahandle, .code = code}; + rpcSendResponse(&rsp); + } + rpcFreeCont(pRpcMsg->pCont); } - - mndCleanupMsg(pMsg); -} - -static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) { - SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); - if (pMsg == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - if (taosWriteQitem(pQueue, pMsg) != 0) { - mndCleanupMsg(pMsg); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; } void dndProcessMnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pWriteQ, pMsg) != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - - dndReleaseMnode(pDnode, pMnode); + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.writeWorker, pMsg); } void dndProcessMnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pSyncQ, pMsg) != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - - dndReleaseMnode(pDnode, pMnode); + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.syncWorker, pMsg); } void dndProcessMnodeReadMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SMnode *pMnode = dndAcquireMnode(pDnode); - if (pMnode == NULL || dndWriteMnodeMsgToQueue(pMnode, pMgmt->pReadQ, pMsg) != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = terrno}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; - } - - dndReleaseMnode(pDnode, pMnode); -} - - -static int32_t dndAllocMnodeReadQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pReadQ = tWorkerAllocQueue(&pMgmt->readPool, pDnode, (FProcessItem)dndProcessMnodeReadQueue); - if (pMgmt->pReadQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeMnodeReadQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->readPool, pMgmt->pReadQ); - pMgmt->pReadQ = NULL; -} - -static int32_t dndInitMnodeReadWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SWorkerPool *pPool = &pMgmt->readPool; - pPool->name = "mnode-read"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("mnode read worker is initialized"); - return 0; -} - -static void dndCleanupMnodeReadWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerCleanup(&pMgmt->readPool); - dDebug("mnode read worker is closed"); -} - -static int32_t dndAllocMnodeWriteQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pWriteQ = tWorkerAllocQueue(&pMgmt->writePool, pDnode, (FProcessItem)dndProcessMnodeWriteQueue); - if (pMgmt->pWriteQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeMnodeWriteQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->writePool, pMgmt->pWriteQ); - pMgmt->pWriteQ = NULL; -} - -static int32_t dndInitMnodeWriteWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SWorkerPool *pPool = &pMgmt->writePool; - pPool->name = "mnode-write"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("mnode write worker is initialized"); - return 0; -} - -static void dndCleanupMnodeWriteWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerCleanup(&pMgmt->writePool); - dDebug("mnode write worker is closed"); -} - -static int32_t dndAllocMnodeSyncQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - pMgmt->pSyncQ = tWorkerAllocQueue(&pMgmt->syncPool, pDnode, (FProcessItem)dndProcessMnodeSyncQueue); - if (pMgmt->pSyncQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeMnodeSyncQueue(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerFreeQueue(&pMgmt->syncPool, pMgmt->pSyncQ); - pMgmt->pSyncQ = NULL; -} - -static int32_t dndInitMnodeSyncWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - SWorkerPool *pPool = &pMgmt->syncPool; - pPool->name = "mnode-sync"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("mnode sync worker is initialized"); - return 0; -} - -static void dndCleanupMnodeSyncWorker(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - tWorkerCleanup(&pMgmt->syncPool); - dDebug("mnode sync worker is closed"); + dndWriteMnodeMsgToWorker(pDnode, &pDnode->mmgmt.readWorker, pMsg); } int32_t dndInitMnode(SDnode *pDnode) { @@ -752,14 +534,6 @@ int32_t dndInitMnode(SDnode *pDnode) { SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosInitRWLatch(&pMgmt->latch); - char path[PATH_MAX]; - snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); - pMgmt->file = strdup(path); - if (pMgmt->file == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - if (dndReadMnodeFile(pDnode) != 0) { return -1; } @@ -790,13 +564,13 @@ int32_t dndInitMnode(SDnode *pDnode) { } void dndCleanupMnode(SDnode *pDnode) { - SMnodeMgmt *pMgmt = &pDnode->mmgmt; - dInfo("dnode-mnode start to clean up"); - if (pMgmt->pMnode) dndStopMnodeWorker(pDnode); - tfree(pMgmt->file); - mndClose(pMgmt->pMnode); - pMgmt->pMnode = NULL; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; + if (pMgmt->pMnode) { + dndStopMnodeWorker(pDnode); + mndClose(pMgmt->pMnode); + pMgmt->pMnode = NULL; + } dInfo("dnode-mnode is cleaned up"); } diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 0b92de81c1..5d04a4f449 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -140,26 +140,27 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) { fclose(fp); free(content); - if (taosRenameFile(file, file) != 0) { + char realfile[PATH_MAX + 20]; + snprintf(realfile, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); + + if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR; dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartQnodeWorker(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; - if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, - (FProcessItem)dndProcessQnodeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, dndProcessQnodeQueue) != 0) { dError("failed to start qnode query worker since %s", terrstr()); return -1; } - if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, - (FProcessItem)dndProcessQnodeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, dndProcessQnodeQueue) != 0) { dError("failed to start qnode fetch worker since %s", terrstr()); return -1; } @@ -209,7 +210,9 @@ static int32_t dndOpenQnode(SDnode *pDnode) { return -1; } + pMgmt->deployed = 1; if (dndWriteQnodeFile(pDnode) != 0) { + pMgmt->deployed = 0; dError("failed to write qnode file since %s", terrstr()); dndStopQnodeWorker(pDnode); qndClose(pQnode); @@ -218,7 +221,6 @@ static int32_t dndOpenQnode(SDnode *pDnode) { taosWLockLatch(&pMgmt->latch); pMgmt->pQnode = pQnode; - pMgmt->deployed = 1; taosWUnLockLatch(&pMgmt->latch); dInfo("qnode open successfully"); @@ -250,6 +252,8 @@ static int32_t dndDropQnode(SDnode *pDnode) { dndReleaseQnode(pDnode, pQnode); dndStopQnodeWorker(pDnode); + pMgmt->deployed = 0; + dndWriteQnodeFile(pDnode); qndClose(pQnode); pMgmt->pQnode = NULL; diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index c1eb347350..151fc7e6a1 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -140,20 +140,22 @@ static int32_t dndWriteSnodeFile(SDnode *pDnode) { fclose(fp); free(content); - if (taosRenameFile(file, file) != 0) { + char realfile[PATH_MAX + 20]; + snprintf(realfile, PATH_MAX + 20, "%s/snode.json", pDnode->dir.dnode); + + if (taosRenameFile(file, realfile) != 0) { terrno = TSDB_CODE_DND_SNODE_WRITE_FILE_ERROR; dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartSnodeWorker(SDnode *pDnode) { SSnodeMgmt *pMgmt = &pDnode->smgmt; - if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, - (FProcessItem)dndProcessSnodeQueue) != 0) { + if (dndInitWorker(pDnode, &pMgmt->writeWorker, DND_WORKER_SINGLE, "snode-write", 0, 1, dndProcessSnodeQueue) != 0) { dError("failed to start snode write worker since %s", terrstr()); return -1; } @@ -202,7 +204,9 @@ static int32_t dndOpenSnode(SDnode *pDnode) { return -1; } + pMgmt->deployed = 1; if (dndWriteSnodeFile(pDnode) != 0) { + pMgmt->deployed = 0; dError("failed to write snode file since %s", terrstr()); dndStopSnodeWorker(pDnode); sndClose(pSnode); @@ -211,7 +215,6 @@ static int32_t dndOpenSnode(SDnode *pDnode) { taosWLockLatch(&pMgmt->latch); pMgmt->pSnode = pSnode; - pMgmt->deployed = 1; taosWUnLockLatch(&pMgmt->latch); dInfo("snode open successfully"); @@ -243,6 +246,8 @@ static int32_t dndDropSnode(SDnode *pDnode) { dndReleaseSnode(pDnode, pSnode); dndStopSnodeWorker(pDnode); + pMgmt->deployed = 0; + dndWriteSnodeFile(pDnode); sndClose(pSnode); pMgmt->pSnode = NULL; sndDestroy(pDnode->dir.snode); @@ -328,7 +333,12 @@ int32_t dndInitSnode(SDnode *pDnode) { return -1; } - if (pMgmt->dropped) return 0; + if (pMgmt->dropped) { + dInfo("snode has been deployed and needs to be deleted"); + sndDestroy(pDnode->dir.snode); + return 0; + } + if (!pMgmt->deployed) return 0; return dndOpenSnode(pDnode); diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c index c421437e4d..b1107fd185 100644 --- a/source/dnode/mgmt/impl/src/dndWorker.c +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -73,7 +73,7 @@ void dndCleanupWorker(SDnodeWorker *pWorker) { tWorkerCleanup(&pWorker->pool); tWorkerFreeQueue(&pWorker->pool, pWorker->queue); } else if (pWorker->type == DND_WORKER_MULTI) { - tWorkerCleanup(&pWorker->mpool); + tMWorkerCleanup(&pWorker->mpool); tMWorkerFreeQueue(&pWorker->mpool, pWorker->queue); } else { } @@ -85,16 +85,23 @@ int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) return -1; } - void *pMsg = taosAllocateQitem(contLen); + void *pMsg = NULL; + if (contLen != 0) { + pMsg = taosAllocateQitem(contLen); + if (pMsg != NULL) { + memcpy(pMsg, pCont, contLen); + } + } else { + pMsg = pCont; + } + if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } - memcpy(pMsg, pCont, contLen); - - if (taosWriteQitem(pWorker, pMsg) != 0) { - taosFreeItem(pMsg); + if (taosWriteQitem(pWorker->queue, pMsg) != 0) { + taosFreeQitem(pMsg); terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } diff --git a/source/dnode/mgmt/impl/test/dnode/dnode.cpp b/source/dnode/mgmt/impl/test/dnode/dnode.cpp index dc352c5a3f..ec2c2d9a44 100644 --- a/source/dnode/mgmt/impl/test/dnode/dnode.cpp +++ b/source/dnode/mgmt/impl/test/dnode/dnode.cpp @@ -162,7 +162,7 @@ TEST_F(DndTestDnode, 03_Create_Drop_Restart_Dnode) { SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); strcpy(pReq->fqdn, "localhost"); - pReq->port = htonl(904); + pReq->port = htonl(9044); SRpcMsg* pMsg = test.SendMsg(TDMT_MND_CREATE_DNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 56559cbea1..2d236906e1 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -388,7 +388,7 @@ static int32_t mndCreateDnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateDnodeMsg * dnodeObj.updateTime = dnodeObj.createdTime; dnodeObj.port = pCreate->port; memcpy(dnodeObj.fqdn, pCreate->fqdn, TSDB_FQDN_LEN); - snprintf(dnodeObj.ep, "%s:%u", dnodeObj.fqdn, dnodeObj.port); + snprintf(dnodeObj.ep, TSDB_EP_LEN, "%s:%u", dnodeObj.fqdn, dnodeObj.port); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, &pMsg->rpcMsg); if (pTrans == NULL) { diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index 64ea85044a..9281e46f4f 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -390,7 +390,7 @@ void mndSendRsp(SMnodeMsg *pMsg, int32_t code) { rpcSendResponse(&rpcRsp); } -static void mndProcessRpcMsg(SMnodeMsg *pMsg) { +void mndProcessMsg(SMnodeMsg *pMsg) { SMnode *pMnode = pMsg->pMnode; int32_t code = 0; tmsg_t msgType = pMsg->rpcMsg.msgType; @@ -451,12 +451,6 @@ void mndSetMsgHandle(SMnode *pMnode, tmsg_t msgType, MndMsgFp fp) { } } -void mndProcessReadMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } - -void mndProcessWriteMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } - -void mndProcessSyncMsg(SMnodeMsg *pMsg) { mndProcessRpcMsg(pMsg); } - uint64_t mndGenerateUid(char *name, int32_t len) { int64_t us = taosGetTimestampUs(); int32_t hashval = MurmurHash3_32(name, len); diff --git a/source/libs/parser/src/astToMsg.c b/source/libs/parser/src/astToMsg.c index 2f80af225a..6c99411b71 100644 --- a/source/libs/parser/src/astToMsg.c +++ b/source/libs/parser/src/astToMsg.c @@ -428,6 +428,7 @@ SDropDnodeMsg *buildDropDnodeMsg(SSqlInfo* pInfo, int32_t* len, SMsgBuf* pMsgBuf char* end = NULL; SDropDnodeMsg * pDrop = (SDropDnodeMsg *)calloc(1, sizeof(SDropDnodeMsg)); pDrop->dnodeId = strtoll(pzName->z, &end, 10); + pDrop->dnodeId = htonl(pDrop->dnodeId); *len = sizeof(SDropDnodeMsg); if (end - pzName->z != pzName->n) { diff --git a/tests/script/unique/dnode/basic1.sim b/tests/script/unique/dnode/basic1.sim index 730864ef26..49b29a4ac8 100644 --- a/tests/script/unique/dnode/basic1.sim +++ b/tests/script/unique/dnode/basic1.sim @@ -94,5 +94,16 @@ if $rows != 2 then return -1 endi +print =============== drop dnode +sql drop dnode 2; +sql show dnodes; +if $rows != 1 then + return -1 +endi + +if $data00 != 1 then + return -1 +endi + system sh/exec.sh -n dnode1 -s stop -x SIGINT system sh/exec.sh -n dnode2 -s stop -x SIGINT \ No newline at end of file From 97f334c9dd05bbcc0ca9efbf42828a97886a50a6 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 28 Dec 2021 17:51:13 +0800 Subject: [PATCH 4/4] send create topic req --- include/client/taos.h | 3 +++ include/dnode/vnode/tq/tq.h | 2 ++ source/client/src/clientImpl.c | 3 --- source/client/test/clientTests.cpp | 35 ++++++++++++++++++++++++++++++ source/libs/parser/src/parser.c | 2 +- 5 files changed, 41 insertions(+), 4 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 7357478555..4669ca51f7 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -193,6 +193,9 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); DLL_EXPORT int taos_load_table_info(TAOS *taos, const char* tableNameList); DLL_EXPORT TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision); + +DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sqlLen); + #ifdef __cplusplus } #endif diff --git a/include/dnode/vnode/tq/tq.h b/include/dnode/vnode/tq/tq.h index b6cb96a57b..5774131377 100644 --- a/include/dnode/vnode/tq/tq.h +++ b/include/dnode/vnode/tq/tq.h @@ -251,6 +251,8 @@ typedef struct STqMetaStore { STqMetaList* bucket[TQ_BUCKET_SIZE]; // a table head STqMetaList* unpersistHead; + // topics that are not connectted + STqMetaList* unconnectTopic; // TODO:temporaral use, to be replaced by unified tfile int fileFd; diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index c3c2f70b33..b40c718d51 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -237,9 +237,6 @@ TAOS_RES *tmq_create_topic(TAOS* taos, const char* name, const char* sql, int sq SQueryDag* pDag = NULL; char *dagStr = NULL; - //parse sql to logical plan and physical plan - //send topic name and plans to mnode - terrno = TSDB_CODE_SUCCESS; CHECK_CODE_GOTO(buildRequest(pTscObj, sql, sqlLen, &pRequest), _return); diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 83d0e61eb3..de494cb031 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -399,6 +399,41 @@ TEST(testCase, drop_stable_Test) { taos_close(pConn); } +TEST(testCase, create_topic_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "create database abc1"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create stable st1(ts timestamp, k int) tags(a int)"); + if (taos_errno(pRes) != 0) { + printf("error in create stable, reason:%s\n", taos_errstr(pRes)); + } + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == NULL); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from st1"; + tmq_create_topic(pConn, "test_topic_1", sql, strlen(sql)); + taos_close(pConn); +} + + //TEST(testCase, show_table_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); diff --git a/source/libs/parser/src/parser.c b/source/libs/parser/src/parser.c index 5c9a48e52f..2ccd76723b 100644 --- a/source/libs/parser/src/parser.c +++ b/source/libs/parser/src/parser.c @@ -229,6 +229,6 @@ void qParserClearupMetaRequestInfo(SCatalogReq* pMetaReq) { taosArrayDestroy(pMetaReq->pUdf); } -void qDestoryQuery(SQueryNode* pQuery) { +void qDestroyQuery(SQueryNode* pQuery) { // todo }