From 938bd01b17ee68f8df6dfd3166fd0529df489965 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 27 Dec 2021 19:57:24 -0800 Subject: [PATCH] add qnode --- include/dnode/qnode/qnode.h | 14 +- include/util/taoserror.h | 1 + source/dnode/mgmt/impl/inc/dndInt.h | 29 ++- source/dnode/mgmt/impl/inc/dndWorker.h | 33 ++++ source/dnode/mgmt/impl/src/dndQnode.c | 257 ++++++------------------- source/dnode/mgmt/impl/src/dndWorker.c | 85 ++++++++ source/util/src/terror.c | 1 + 7 files changed, 198 insertions(+), 222 deletions(-) create mode 100644 source/dnode/mgmt/impl/inc/dndWorker.h create mode 100644 source/dnode/mgmt/impl/src/dndWorker.c diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 29a3d1af20..8084175a90 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -78,24 +78,14 @@ void qndClose(SQnode *pQnode); int32_t qndGetLoad(SQnode *pQnode, SQnodeLoad *pLoad); /** - * @brief Process a query message. + * @brief Process a query or fetch message. * * @param pQnode The qnode object. * @param pMsg The request message * @param pRsp The response message * @return int32_t 0 for success, -1 for failure */ -int32_t qndProcessQueryReq(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -/** - * @brief Process a fetch message. - * - * @param pQnode The qnode object. - * @param pMsg The request message - * @param pRsp The response message - * @return int32_t 0 for success, -1 for failure - */ -int32_t qndProcessFetchReq(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int32_t qndProcessMsg(SQnode *pQnode, SRpcMsg *pMsg, SRpcMsg **pRsp); #ifdef __cplusplus } diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 408a656e83..ba3e122db3 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -69,6 +69,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_CHECKSUM_ERROR TAOS_DEF_ERROR_CODE(0, 0x0107) #define TSDB_CODE_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x0108) #define TSDB_CODE_MSG_NOT_PROCESSED TAOS_DEF_ERROR_CODE(0, 0x0109) +#define TSDB_CODE_INVALID_PARA TAOS_DEF_ERROR_CODE(0, 0x010A) #define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0110) #define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0111) #define TSDB_CODE_REF_ID_REMOVED TAOS_DEF_ERROR_CODE(0, 0x0112) diff --git a/source/dnode/mgmt/impl/inc/dndInt.h b/source/dnode/mgmt/impl/inc/dndInt.h index 9679a03c3e..0d37828ecd 100644 --- a/source/dnode/mgmt/impl/inc/dndInt.h +++ b/source/dnode/mgmt/impl/inc/dndInt.h @@ -54,8 +54,20 @@ extern int32_t dDebugFlag; #define dTrace(...) { if (dDebugFlag & DEBUG_TRACE) { taosPrintLog("DND ", dDebugFlag, __VA_ARGS__); }} typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EStat; +typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EDndWorkerType; typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); +typedef struct { + EDndWorkerType type; + const char *name; + int32_t minNum; + int32_t maxNum; + FProcessItem fp; + SDnode *pDnode; + taos_queue queue; + SWorkerPool pool; +} SDnodeWorker; + typedef struct { char *dnode; char *mnode; @@ -100,16 +112,13 @@ typedef struct { } SMnodeMgmt; typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - char *file; - SQnode *pQnode; - SRWLatch latch; - taos_queue pQueryQ; - taos_queue pFetchQ; - SWorkerPool queryPool; - SWorkerPool fetchPool; + int32_t refCount; + int8_t deployed; + int8_t dropped; + SQnode *pQnode; + SRWLatch latch; + SDnodeWorker queryWorker; + SDnodeWorker fetchWorker; } SQnodeMgmt; typedef struct { diff --git a/source/dnode/mgmt/impl/inc/dndWorker.h b/source/dnode/mgmt/impl/inc/dndWorker.h new file mode 100644 index 0000000000..237c0518e8 --- /dev/null +++ b/source/dnode/mgmt/impl/inc/dndWorker.h @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_WORKER_H_ +#define _TD_DND_WORKER_H_ + +#ifdef __cplusplus +extern "C" { +#endif +#include "dndInt.h" + +int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum, + int32_t maxNum, FProcessItem fp); +void dndCleanupWorker(SDnodeWorker *pWorker); +int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_WORKER_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 08044ea263..8c76bf95a6 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -17,30 +17,9 @@ #include "dndQnode.h" #include "dndDnode.h" #include "dndTransport.h" +#include "dndWorker.h" -static int32_t dndInitQnodeQueryWorker(SDnode *pDnode); -static int32_t dndInitQnodeFetchWorker(SDnode *pDnode); -static void dndCleanupQnodeQueryWorker(SDnode *pDnode); -static void dndCleanupQnodeFetchWorker(SDnode *pDnode); -static int32_t dndAllocQnodeQueryQueue(SDnode *pDnode); -static int32_t dndAllocQnodeFetchQueue(SDnode *pDnode); -static void dndFreeQnodeQueryQueue(SDnode *pDnode); -static void dndFreeQnodeFetchQueue(SDnode *pDnode); - -static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); -static int32_t dndWriteQnodeMsgToQueue(SQnode *pQnode, taos_queue pQueue, SRpcMsg *pRpcMsg); - -static int32_t dndStartQnodeWorker(SDnode *pDnode); -static void dndStopQnodeWorker(SDnode *pDnode); - -static SQnode *dndAcquireQnode(SDnode *pDnode); -static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode); - -static int32_t dndReadQnodeFile(SDnode *pDnode); -static int32_t dndWriteQnodeFile(SDnode *pDnode); - -static int32_t dndOpenQnode(SDnode *pDnode); -static int32_t dndDropQnode(SDnode *pDnode); +static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); static SQnode *dndAcquireQnode(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; @@ -85,44 +64,47 @@ static int32_t dndReadQnodeFile(SDnode *pDnode) { char *content = calloc(1, maxLen + 1); cJSON *root = NULL; - FILE *fp = fopen(pMgmt->file, "r"); + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); + + FILE *fp = fopen(file, "r"); if (fp == NULL) { - dDebug("file %s not exist", pMgmt->file); + dDebug("file %s not exist", file); code = 0; - goto PRASE_MNODE_OVER; + goto PRASE_QNODE_OVER; } len = (int32_t)fread(content, 1, maxLen, fp); if (len <= 0) { - dError("failed to read %s since content is null", pMgmt->file); - goto PRASE_MNODE_OVER; + dError("failed to read %s since content is null", file); + goto PRASE_QNODE_OVER; } content[len] = 0; root = cJSON_Parse(content); if (root == NULL) { - dError("failed to read %s since invalid json format", pMgmt->file); - goto PRASE_MNODE_OVER; + dError("failed to read %s since invalid json format", file); + goto PRASE_QNODE_OVER; } cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); if (!deployed || deployed->type != cJSON_Number) { - dError("failed to read %s since deployed not found", pMgmt->file); - goto PRASE_MNODE_OVER; + dError("failed to read %s since deployed not found", file); + goto PRASE_QNODE_OVER; } pMgmt->deployed = deployed->valueint; cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", pMgmt->file); - goto PRASE_MNODE_OVER; + dError("failed to read %s since dropped not found", file); + goto PRASE_QNODE_OVER; } pMgmt->dropped = dropped->valueint; code = 0; - dDebug("succcessed to read file %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); + dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); -PRASE_MNODE_OVER: +PRASE_QNODE_OVER: if (content != NULL) free(content); if (root != NULL) cJSON_Delete(root); if (fp != NULL) fclose(fp); @@ -134,8 +116,8 @@ PRASE_MNODE_OVER: static int32_t dndWriteQnodeFile(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; - char file[PATH_MAX + 20] = {0}; - snprintf(file, sizeof(file), "%s.bak", pMgmt->file); + char file[PATH_MAX + 20]; + snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); FILE *fp = fopen(file, "w"); if (fp == NULL) { @@ -154,41 +136,34 @@ static int32_t dndWriteQnodeFile(SDnode *pDnode) { len += snprintf(content + len, maxLen - len, "}\n"); fwrite(content, 1, len, fp); - taosFfetchFile(fileno(fp)); + taosFsyncFile(fileno(fp)); fclose(fp); free(content); - if (taosRenameFile(file, pMgmt->file) != 0) { + if (taosRenameFile(file, file) != 0) { terrno = TSDB_CODE_DND_QNODE_WRITE_FILE_ERROR; - dError("failed to rename %s since %s", pMgmt->file, terrstr()); + dError("failed to rename %s since %s", file, terrstr()); return -1; } - dInfo("successed to write %s, deployed:%d dropped:%d", pMgmt->file, pMgmt->deployed, pMgmt->dropped); + dInfo("successed to write %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); return 0; } static int32_t dndStartQnodeWorker(SDnode *pDnode) { - if (dndInitQnodeQueryWorker(pDnode) != 0) { + SQnodeMgmt *pMgmt = &pDnode->qmgmt; + if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, + (FProcessItem)dndProcessQnodeQueue) != 0) { dError("failed to start qnode query worker since %s", terrstr()); return -1; } - if (dndInitQnodeFetchWorker(pDnode) != 0) { + if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, + (FProcessItem)dndProcessQnodeQueue) != 0) { dError("failed to start qnode fetch worker since %s", terrstr()); return -1; } - if (dndAllocQnodeQueryQueue(pDnode) != 0) { - dError("failed to alloc qnode query queue since %s", terrstr()); - return -1; - } - - if (dndAllocQnodeFetchQueue(pDnode) != 0) { - dError("failed to alloc qnode fetch queue since %s", terrstr()); - return -1; - } - return 0; } @@ -199,15 +174,12 @@ static void dndStopQnodeWorker(SDnode *pDnode) { pMgmt->deployed = 0; taosWUnLockLatch(&pMgmt->latch); - while (pMgmt->refCount > 1) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pQueryQ)) taosMsleep(10); - while (!taosQueueEmpty(pMgmt->pFetchQ)) taosMsleep(10); + while (pMgmt->refCount > 1) { + taosMsleep(10); + } - dndCleanupQnodeQueryWorker(pDnode); - dndCleanupQnodeFetchWorker(pDnode); - - dndFreeQnodeQueryQueue(pDnode); - dndFreeQnodeFetchQueue(pDnode); + dndCleanupWorker(&pMgmt->queryWorker); + dndCleanupWorker(&pMgmt->fetchWorker); } static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { @@ -230,28 +202,17 @@ static int32_t dndOpenQnode(SDnode *pDnode) { dError("failed to open qnode since %s", terrstr()); return -1; } - pMgmt->deployed = 1; - int32_t code = dndWriteQnodeFile(pDnode); - if (code != 0) { - dError("failed to write qnode file since %s", terrstr()); - code = terrno; - pMgmt->deployed = 0; + if (dndStartQnodeWorker(pDnode) != 0) { + dError("failed to start qnode worker since %s", terrstr()); qndClose(pQnode); - // qndDestroy(pDnode->dir.qnode); - terrno = code; return -1; } - code = dndStartQnodeWorker(pDnode); - if (code != 0) { - dError("failed to start qnode worker since %s", terrstr()); - code = terrno; - pMgmt->deployed = 0; + if (dndWriteQnodeFile(pDnode) != 0) { + dError("failed to write qnode file since %s", terrstr()); dndStopQnodeWorker(pDnode); qndClose(pQnode); - // qndDestroy(pDnode->dir.qnode); - terrno = code; return -1; } @@ -289,7 +250,6 @@ static int32_t dndDropQnode(SDnode *pDnode) { dndReleaseQnode(pDnode, pQnode); dndStopQnodeWorker(pDnode); - dndWriteQnodeFile(pDnode); qndClose(pQnode); pMgmt->pQnode = NULL; // qndDestroy(pDnode->dir.qnode); @@ -324,13 +284,11 @@ int32_t dndProcessDropQnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; SRpcMsg *pRsp = NULL; - int32_t code = 0; + int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED; SQnode *pQnode = dndAcquireQnode(pDnode); - if (pQnode == NULL) { - code = -1; - } else { - code = qndProcessQueryReq(pQnode, pMsg, &pRsp); + if (pQnode != NULL) { + code = qndProcessMsg(pQnode, pMsg, &pRsp); } if (pRsp != NULL) { @@ -347,135 +305,36 @@ static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { taosFreeQitem(pMsg); } -static int32_t dndWriteQnodeMsgToQueue(SQnode *pQnode, taos_queue pQueue, SRpcMsg *pRpcMsg) { - int32_t code = 0; +static void dndWriteQnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { + int32_t code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED; - if (pQnode == NULL || pQueue == NULL) { - code = TSDB_CODE_DND_QNODE_NOT_DEPLOYED; - } else { - SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); - if (pMsg == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - } else { - *pMsg = *pRpcMsg; - if (taosWriteQitem(pQueue, pMsg) != 0) { - code = TSDB_CODE_OUT_OF_MEMORY; - } - } + SQnode *pQnode = dndAcquireQnode(pDnode); + if (pQnode != NULL) { + code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); } + dndReleaseQnode(pDnode, pQnode); if (code != 0) { - if (pRpcMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; rpcSendResponse(&rsp); } - rpcFreeCont(pRpcMsg->pCont); + rpcFreeCont(pMsg->pCont); } } void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SQnode *pQnode = dndAcquireQnode(pDnode); - dndWriteQnodeMsgToQueue(pQnode, pMgmt->pQueryQ, pMsg); - dndReleaseQnode(pDnode, pQnode); + dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg); } void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SQnode *pQnode = dndAcquireQnode(pDnode); - dndWriteQnodeMsgToQueue(pQnode, pMgmt->pFetchQ, pMsg); - dndReleaseQnode(pDnode, pQnode); -} - -static int32_t dndAllocQnodeQueryQueue(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - pMgmt->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pDnode, (FProcessItem)dndProcessQnodeQueue); - if (pMgmt->pQueryQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeQnodeQueryQueue(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - tWorkerFreeQueue(&pMgmt->queryPool, pMgmt->pQueryQ); - pMgmt->pQueryQ = NULL; -} - -static int32_t dndInitQnodeQueryWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SWorkerPool *pPool = &pMgmt->queryPool; - pPool->name = "qnode-query"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("qnode query worker is initialized"); - return 0; -} - -static void dndCleanupQnodeQueryWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - tWorkerCleanup(&pMgmt->queryPool); - dDebug("qnode query worker is closed"); -} - -static int32_t dndAllocQnodeFetchQueue(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - pMgmt->pFetchQ = tWorkerAllocQueue(&pMgmt->queryPool, pDnode, (FProcessItem)dndProcessQnodeQueue); - if (pMgmt->pFetchQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeQnodeFetchQueue(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - tWorkerFreeQueue(&pMgmt->fetchPool, pMgmt->pFetchQ); - pMgmt->pFetchQ = NULL; -} - -static int32_t dndInitQnodeFetchWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SWorkerPool *pPool = &pMgmt->fetchPool; - pPool->name = "qnode-fetch"; - pPool->min = 0; - pPool->max = 1; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("qnode fetch worker is initialized"); - return 0; -} - -static void dndCleanupQnodeFetchWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - tWorkerCleanup(&pMgmt->fetchPool); - dDebug("qnode fetch worker is closed"); + dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg); } int32_t dndInitQnode(SDnode *pDnode) { - dInfo("dnode-qnode start to init"); SQnodeMgmt *pMgmt = &pDnode->qmgmt; taosInitRWLatch(&pMgmt->latch); - char path[PATH_MAX]; - snprintf(path, PATH_MAX, "%s/qnode.json", pDnode->dir.dnode); - pMgmt->file = strdup(path); - if (pMgmt->file == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - if (dndReadQnodeFile(pDnode) != 0) { return -1; } @@ -488,11 +347,9 @@ int32_t dndInitQnode(SDnode *pDnode) { void dndCleanupQnode(SDnode *pDnode) { SQnodeMgmt *pMgmt = &pDnode->qmgmt; - - dInfo("dnode-qnode start to clean up"); - if (pMgmt->pQnode) dndStopQnodeWorker(pDnode); - tfree(pMgmt->file); - qndClose(pMgmt->pQnode); - pMgmt->pQnode = NULL; - dInfo("dnode-qnode is cleaned up"); + if (pMgmt->pQnode) { + dndStopQnodeWorker(pDnode); + qndClose(pMgmt->pQnode); + pMgmt->pQnode = NULL; + } } diff --git a/source/dnode/mgmt/impl/src/dndWorker.c b/source/dnode/mgmt/impl/src/dndWorker.c new file mode 100644 index 0000000000..da0e3a9319 --- /dev/null +++ b/source/dnode/mgmt/impl/src/dndWorker.c @@ -0,0 +1,85 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dndWorker.h" + +int32_t dndInitWorker(SDnode *pDnode, SDnodeWorker *pWorker, EDndWorkerType type, const char *name, int32_t minNum, + int32_t maxNum, FProcessItem fp) { + if (pDnode == NULL || pWorker == NULL || name == NULL || minNum < 0 || maxNum <= 0 || fp == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } + + pWorker->type = type; + pWorker->name = name; + pWorker->minNum = minNum; + pWorker->maxNum = maxNum; + pWorker->fp = fp; + pWorker->pDnode = pDnode; + + if (pWorker->type == DND_WORKER_SINGLE) { + SWorkerPool *pPool = &pWorker->pool; + pPool->min = minNum; + pPool->max = maxNum; + if (tWorkerInit(pPool) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pWorker->queue = tWorkerAllocQueue(&pPool, pDnode, fp); + if (pWorker->queue == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } else { + terrno = TSDB_CODE_INVALID_PARA; + } + + return 0; +} + +void dndCleanupWorker(SDnodeWorker *pWorker) { + if (pWorker->type == DND_WORKER_SINGLE) { + while (!taosQueueEmpty(pWorker->queue)) { + taosMsleep(10); + } + tWorkerCleanup(&pWorker->pool); + tWorkerFreeQueue(&pWorker->pool, pWorker->queue); + } +} + +int32_t dndWriteMsgToWorker(SDnodeWorker *pWorker, void *pCont, int32_t contLen) { + if (pWorker == NULL || pWorker->queue == NULL) { + terrno = TSDB_CODE_INVALID_PARA; + return -1; + } + + void *pMsg = taosAllocateQitem(contLen); + if (pMsg == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + memcpy(pMsg, pCont, contLen); + + if (taosWriteQitem(pWorker, pMsg) != 0) { + taosFreeItem(pMsg); + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; +} \ No newline at end of file diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 451b7e4fa4..3a923c6653 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -79,6 +79,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_FILE_CORRUPTED, "Data file corrupted") TAOS_DEFINE_ERROR(TSDB_CODE_CHECKSUM_ERROR, "Checksum error") TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_MSG, "Invalid config message") TAOS_DEFINE_ERROR(TSDB_CODE_MSG_NOT_PROCESSED, "Message not processed") +TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_PARA, "Invalid parameters") TAOS_DEFINE_ERROR(TSDB_CODE_REF_NO_MEMORY, "Ref out of memory") TAOS_DEFINE_ERROR(TSDB_CODE_REF_FULL, "too many Ref Objs") TAOS_DEFINE_ERROR(TSDB_CODE_REF_ID_REMOVED, "Ref ID is removed")