diff --git a/include/dnode/qnode/qnode.h b/include/dnode/qnode/qnode.h index 69297270b9..df735cf1b3 100644 --- a/include/dnode/qnode/qnode.h +++ b/include/dnode/qnode/qnode.h @@ -36,7 +36,6 @@ typedef struct { } SQnodeLoad; typedef struct { - int32_t sver; int32_t dnodeId; int64_t clusterId; SMgmtWrapper *pWrapper; diff --git a/source/dnode/mgmt/bnode/inc/bmInt.h b/source/dnode/mgmt/bnode/inc/bmInt.h index df7940f014..f748d609b0 100644 --- a/source/dnode/mgmt/bnode/inc/bmInt.h +++ b/source/dnode/mgmt/bnode/inc/bmInt.h @@ -17,7 +17,6 @@ #define _TD_DND_BNODE_INT_H_ #include "bm.h" -#include "dm.h" #ifdef __cplusplus extern "C" { diff --git a/source/dnode/mgmt/qnode/inc/qm.h b/source/dnode/mgmt/qnode/inc/qm.h new file mode 100644 index 0000000000..e28ea3e948 --- /dev/null +++ b/source/dnode/mgmt/qnode/inc/qm.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_QNODE_H_ +#define _TD_DND_QNODE_H_ + +#include "dnd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void qmGetMgmtFp(SMgmtWrapper *pMgmt); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_QNODE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/qnode/inc/qmInt.h b/source/dnode/mgmt/qnode/inc/qmInt.h index 45412f8a3c..52d5e89492 100644 --- a/source/dnode/mgmt/qnode/inc/qmInt.h +++ b/source/dnode/mgmt/qnode/inc/qmInt.h @@ -16,55 +16,35 @@ #ifndef _TD_DND_QNODE_INT_H_ #define _TD_DND_QNODE_INT_H_ -#include "dnd.h" +#include "qm.h" #ifdef __cplusplus extern "C" { #endif typedef struct SQnodeMgmt { - int32_t refCount; - int8_t deployed; - int8_t dropped; - SQnode *pQnode; - SRWLatch latch; - SDnodeWorker queryWorker; - SDnodeWorker fetchWorker; - - // - SProcObj *pProcess; - bool singleProc; + SQnode *pQnode; + SDnode *pDnode; + SMgmtWrapper *pWrapper; + const char *path; + SDnodeWorker queryWorker; + SDnodeWorker fetchWorker; } SQnodeMgmt; -void qmGetMgmtFp(SMgmtWrapper *pMgmt); - -int32_t dndInitQnode(SDnode *pDnode); -void dndCleanupQnode(SDnode *pDnode); - -void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); - -// qmHandle.h -int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +// qmInt.c +int32_t qmOpen(SMgmtWrapper *pWrapper); +int32_t qmDrop(SMgmtWrapper *pWrapper); +// qmMsg.c void qmInitMsgHandles(SMgmtWrapper *pWrapper); -int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); - -int32_t qmStartWorker(SDnode *pDnode); -void qmStopWorker(SDnode *pDnode); -void qmInitMsgFp(SMnodeMgmt *pMgmt); -void qmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t qmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t qmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void qmConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void qmConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); - -void qmProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void qmProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void qmProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +// qmWorker.c +int32_t qmStartWorker(SQnodeMgmt *pMgmt); +void qmStopWorker(SQnodeMgmt *pMgmt); +int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/qnode/src/qmFile.c b/source/dnode/mgmt/qnode/src/qmFile.c deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/dnode/mgmt/qnode/src/qmInt.c b/source/dnode/mgmt/qnode/src/qmInt.c index c15a15c6c1..806f77bcf1 100644 --- a/source/dnode/mgmt/qnode/src/qmInt.c +++ b/source/dnode/mgmt/qnode/src/qmInt.c @@ -18,13 +18,110 @@ static int32_t qmRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } +static void qmInitOption(SQnodeMgmt *pMgmt, SQnodeOpt *pOption) { + SDnode *pDnode = pMgmt->pDnode; + pOption->pWrapper = pMgmt->pWrapper; + pOption->sendReqFp = dndSendReqToDnode; + pOption->sendMnodeReqFp = dndSendReqToMnode; + pOption->dnodeId = pDnode->dnodeId; + pOption->clusterId = pDnode->clusterId; +} + +static int32_t qmOpenImp(SQnodeMgmt *pMgmt) { + SQnodeOpt option = {0}; + qmInitOption(pMgmt, &option); + + pMgmt->pQnode = qndOpen(&option); + if (pMgmt->pQnode == NULL) { + dError("failed to open qnode since %s", terrstr()); + return -1; + } + + if (qmStartWorker(pMgmt) != 0) { + dError("failed to start qnode worker since %s", terrstr()); + return -1; + } + + bool deployed = true; + if (dndWriteFile(pMgmt->pWrapper, deployed) != 0) { + dError("failed to write qnode file since %s", terrstr()); + return -1; + } + + return 0; +} + +static void qmCloseImp(SQnodeMgmt *pMgmt) { + if (pMgmt->pQnode != NULL) { + qmStopWorker(pMgmt); + qndClose(pMgmt->pQnode); + pMgmt->pQnode = NULL; + } +} + +int32_t qmDrop(SMgmtWrapper *pWrapper) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return 0; + + dInfo("qnode-mgmt start to drop"); + bool deployed = false; + if (dndWriteFile(pWrapper, deployed) != 0) { + dError("failed to drop qnode since %s", terrstr()); + return -1; + } + + qmCloseImp(pMgmt); + taosRemoveDir(pMgmt->path); + pWrapper->pMgmt = NULL; + free(pMgmt); + dInfo("qnode-mgmt is dropped"); + return 0; +} + +static void qmClose(SMgmtWrapper *pWrapper) { + SQnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; + + dInfo("qnode-mgmt start to cleanup"); + qmCloseImp(pMgmt); + pWrapper->pMgmt = NULL; + free(pMgmt); + dInfo("qnode-mgmt is cleaned up"); +} + +int32_t qmOpen(SMgmtWrapper *pWrapper) { + dInfo("qnode-mgmt start to init"); + SQnodeMgmt *pMgmt = calloc(1, sizeof(SQnodeMgmt)); + if (pMgmt == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pMgmt->path = pWrapper->path; + pMgmt->pDnode = pWrapper->pDnode; + pMgmt->pWrapper = pWrapper; + pWrapper->pMgmt = pMgmt; + + int32_t code = qmOpenImp(pMgmt); + if (code != 0) { + dError("failed to init qnode-mgmt since %s", terrstr()); + qmClose(pWrapper); + } else { + dInfo("qnode-mgmt is initialized"); + } + + return code; +} + void qmGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = NULL; - mgmtFp.closeFp = NULL; + mgmtFp.openFp = qmOpen; + mgmtFp.closeFp = qmClose; + mgmtFp.createMsgFp = qmProcessCreateReq; + mgmtFp.dropMsgFp = qmProcessDropReq; mgmtFp.requiredFp = qmRequire; - // qmInitMsgHandles(pWrapper); + qmInitMsgHandles(pWrapper); pWrapper->name = "qnode"; pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/qnode/src/qmMgmt.c b/source/dnode/mgmt/qnode/src/qmMgmt.c deleted file mode 100644 index 5106b39b14..0000000000 --- a/source/dnode/mgmt/qnode/src/qmMgmt.c +++ /dev/null @@ -1,378 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#define _DEFAULT_SOURCE -// #include "dndQnode.h" -// #include "dm.h" -// #include "dndTransport.h" -// #include "dndWorker.h" - -#if 0 -static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg); - -static SQnode *dndAcquireQnode(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SQnode *pQnode = NULL; - int32_t refCount = 0; - - taosRLockLatch(&pMgmt->latch); - if (pMgmt->deployed && !pMgmt->dropped && pMgmt->pQnode != NULL) { - refCount = atomic_add_fetch_32(&pMgmt->refCount, 1); - pQnode = pMgmt->pQnode; - } else { - terrno = TSDB_CODE_NODE_NOT_DEPLOYED; - } - taosRUnLockLatch(&pMgmt->latch); - - if (pQnode != NULL) { - dTrace("acquire qnode, refCount:%d", refCount); - } - return pQnode; -} - -static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) { - if (pQnode == NULL) return; - - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - taosRLockLatch(&pMgmt->latch); - int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - taosRUnLockLatch(&pMgmt->latch); - dTrace("release qnode, refCount:%d", refCount); -} - -static int32_t dndReadQnodeFile(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - int32_t code = TSDB_CODE_NODE_PARSE_FILE_ERROR; - int32_t len = 0; - int32_t maxLen = 1024; - char *content = calloc(1, maxLen + 1); - cJSON *root = NULL; - - char file[PATH_MAX + 20]; - snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); - - // FILE *fp = fopen(file, "r"); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_READ); - if (pFile == NULL) { - dDebug("file %s not exist", file); - code = 0; - goto PRASE_QNODE_OVER; - } - - len = (int32_t)taosReadFile(pFile, content, maxLen); - if (len <= 0) { - dError("failed to read %s since content is null", file); - goto PRASE_QNODE_OVER; - } - - content[len] = 0; - root = cJSON_Parse(content); - if (root == NULL) { - dError("failed to read %s since invalid json format", file); - goto PRASE_QNODE_OVER; - } - - cJSON *deployed = cJSON_GetObjectItem(root, "deployed"); - if (!deployed || deployed->type != cJSON_Number) { - dError("failed to read %s since deployed not found", file); - goto PRASE_QNODE_OVER; - } - pMgmt->deployed = deployed->valueint; - - cJSON *dropped = cJSON_GetObjectItem(root, "dropped"); - if (!dropped || dropped->type != cJSON_Number) { - dError("failed to read %s since dropped not found", file); - goto PRASE_QNODE_OVER; - } - pMgmt->dropped = dropped->valueint; - - code = 0; - dDebug("succcessed to read file %s, deployed:%d dropped:%d", file, pMgmt->deployed, pMgmt->dropped); - -PRASE_QNODE_OVER: - if (content != NULL) free(content); - if (root != NULL) cJSON_Delete(root); - if (pFile != NULL) taosCloseFile(&pFile); - - terrno = code; - return code; -} - -static int32_t dndWriteQnodeFile(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - - char file[PATH_MAX + 20]; - snprintf(file, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); - - // FILE *fp = fopen(file, "w"); - TdFilePtr pFile = taosOpenFile(file, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_TRUNC); - if (pFile == NULL) { - terrno = TSDB_CODE_NODE_WRITE_FILE_ERROR; - dError("failed to write %s since %s", file, terrstr()); - return -1; - } - - int32_t len = 0; - int32_t maxLen = 1024; - char *content = calloc(1, maxLen + 1); - - len += snprintf(content + len, maxLen - len, "{\n"); - len += snprintf(content + len, maxLen - len, " \"deployed\": %d,\n", pMgmt->deployed); - len += snprintf(content + len, maxLen - len, " \"dropped\": %d\n", pMgmt->dropped); - len += snprintf(content + len, maxLen - len, "}\n"); - - taosWriteFile(pFile, content, len); - taosFsyncFile(pFile); - taosCloseFile(&pFile); - free(content); - - char realfile[PATH_MAX + 20]; - snprintf(realfile, PATH_MAX + 20, "%s/qnode.json", pDnode->dir.dnode); - - if (taosRenameFile(file, realfile) != 0) { - terrno = TSDB_CODE_NODE_WRITE_FILE_ERROR; - dError("failed to rename %s since %s", file, terrstr()); - return -1; - } - - dInfo("successed to write %s, deployed:%d dropped:%d", realfile, pMgmt->deployed, pMgmt->dropped); - return 0; -} - -static int32_t dndStartQnodeWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - if (dndInitWorker(pDnode, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, dndProcessQnodeQueue) != 0) { - dError("failed to start qnode query worker since %s", terrstr()); - return -1; - } - - if (dndInitWorker(pDnode, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, dndProcessQnodeQueue) != 0) { - dError("failed to start qnode fetch worker since %s", terrstr()); - return -1; - } - - return 0; -} - -static void dndStopQnodeWorker(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - - taosWLockLatch(&pMgmt->latch); - pMgmt->deployed = 0; - taosWUnLockLatch(&pMgmt->latch); - - while (pMgmt->refCount > 0) { - taosMsleep(10); - } - - dndCleanupWorker(&pMgmt->queryWorker); - dndCleanupWorker(&pMgmt->fetchWorker); -} - -static void dndBuildQnodeOption(SDnode *pDnode, SQnodeOpt *pOption) { - pOption->pDnode = pDnode; - pOption->sendReqFp = dndSendReqToDnode; - pOption->sendMnodeReqFp = dndSendReqToMnode; - pOption->sendRedirectRspFp = dndSendRedirectRsp; - pOption->dnodeId = pDnode->dnodeId; - pOption->clusterId = pDnode->clusterId; - pOption->sver = tsVersion; -} - -static int32_t dndOpenQnode(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - - SQnode *pQnode = dndAcquireQnode(pDnode); - if (pQnode != NULL) { - dndReleaseQnode(pDnode, pQnode); - terrno = TSDB_CODE_NODE_ALREADY_DEPLOYED; - dError("failed to create qnode since %s", terrstr()); - return -1; - } - - SQnodeOpt option = {0}; - dndBuildQnodeOption(pDnode, &option); - - pQnode = qndOpen(&option); - if (pQnode == NULL) { - dError("failed to open qnode since %s", terrstr()); - return -1; - } - - if (dndStartQnodeWorker(pDnode) != 0) { - dError("failed to start qnode worker since %s", terrstr()); - qndClose(pQnode); - return -1; - } - - pMgmt->deployed = 1; - if (dndWriteQnodeFile(pDnode) != 0) { - pMgmt->deployed = 0; - dError("failed to write qnode file since %s", terrstr()); - dndStopQnodeWorker(pDnode); - qndClose(pQnode); - return -1; - } - - taosWLockLatch(&pMgmt->latch); - pMgmt->pQnode = pQnode; - taosWUnLockLatch(&pMgmt->latch); - - dInfo("qnode open successfully"); - return 0; -} - -static int32_t dndDropQnode(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - - SQnode *pQnode = dndAcquireQnode(pDnode); - if (pQnode == NULL) { - dError("failed to drop qnode since %s", terrstr()); - return -1; - } - - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 1; - taosRUnLockLatch(&pMgmt->latch); - - if (dndWriteQnodeFile(pDnode) != 0) { - taosRLockLatch(&pMgmt->latch); - pMgmt->dropped = 0; - taosRUnLockLatch(&pMgmt->latch); - - dndReleaseQnode(pDnode, pQnode); - dError("failed to drop qnode since %s", terrstr()); - return -1; - } - - dndReleaseQnode(pDnode, pQnode); - dndStopQnodeWorker(pDnode); - pMgmt->deployed = 0; - dndWriteQnodeFile(pDnode); - qndClose(pQnode); - pMgmt->pQnode = NULL; - - return 0; -} - -int32_t qmProcessCreateReq(SDnode *pDnode, SRpcMsg *pReq) { - SDCreateQnodeReq createReq = {0}; - if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (createReq.dnodeId != pDnode->dnodeId) { - terrno = TSDB_CODE_NODE_INVALID_OPTION; - dError("failed to create qnode since %s", terrstr()); - return -1; - } else { - return dndOpenQnode(pDnode); - } -} - -int32_t qmProcessDropReq(SDnode *pDnode, SRpcMsg *pReq) { - SDDropQnodeReq dropReq = {0}; - if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { - terrno = TSDB_CODE_INVALID_MSG; - return -1; - } - - if (dropReq.dnodeId != pDnode->dnodeId) { - terrno = TSDB_CODE_NODE_INVALID_OPTION; - dError("failed to drop qnode since %s", terrstr()); - return -1; - } else { - return dndDropQnode(pDnode); - } -} - -static void dndProcessQnodeQueue(SDnode *pDnode, SRpcMsg *pMsg) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - SRpcMsg *pRsp = NULL; - int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED; - - SQnode *pQnode = dndAcquireQnode(pDnode); - if (pQnode != NULL) { - code = qndProcessMsg(pQnode, pMsg, &pRsp); - } - dndReleaseQnode(pDnode, pQnode); - - if (pMsg->msgType & 1u) { - if (pRsp != NULL) { - pRsp->ahandle = pMsg->ahandle; - rpcSendResponse(pRsp); - free(pRsp); - } else { - if (code != 0) code = terrno; - SRpcMsg rpcRsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rpcRsp); - } - } - - rpcFreeCont(pMsg->pCont); - taosFreeQitem(pMsg); -} - -static void dndWriteQnodeMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMsg *pMsg) { - int32_t code = TSDB_CODE_NODE_NOT_DEPLOYED; - - SQnode *pQnode = dndAcquireQnode(pDnode); - if (pQnode != NULL) { - code = dndWriteMsgToWorker(pWorker, pMsg, sizeof(SRpcMsg)); - } - dndReleaseQnode(pDnode, pQnode); - - if (code != 0) { - if (pMsg->msgType & 1u) { - SRpcMsg rsp = {.handle = pMsg->handle, .ahandle = pMsg->ahandle, .code = code}; - rpcSendResponse(&rsp); - } - rpcFreeCont(pMsg->pCont); - } -} - -void dndProcessQnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg); -} - -void dndProcessQnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { - dndWriteQnodeMsgToWorker(pDnode, &pDnode->qmgmt.queryWorker, pMsg); -} - -int32_t dndInitQnode(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - taosInitRWLatch(&pMgmt->latch); - - if (dndReadQnodeFile(pDnode) != 0) { - return -1; - } - - if (pMgmt->dropped) return 0; - if (!pMgmt->deployed) return 0; - - return dndOpenQnode(pDnode); -} - -void dndCleanupQnode(SDnode *pDnode) { - SQnodeMgmt *pMgmt = &pDnode->qmgmt; - if (pMgmt->pQnode) { - dndStopQnodeWorker(pDnode); - qndClose(pMgmt->pQnode); - pMgmt->pQnode = NULL; - } -} - -#endif \ No newline at end of file diff --git a/source/dnode/mgmt/qnode/src/qmMsg.c b/source/dnode/mgmt/qnode/src/qmMsg.c index 2673a9138c..fa7d42b3e3 100644 --- a/source/dnode/mgmt/qnode/src/qmMsg.c +++ b/source/dnode/mgmt/qnode/src/qmMsg.c @@ -16,9 +16,42 @@ #define _DEFAULT_SOURCE #include "qmInt.h" -int32_t qmProcessCreateReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;} -int32_t qmProcessDropReq(SQnodeMgmt *pMgmt, SNodeMsg *pMsg){return 0;} +int32_t qmProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SDnode *pDnode = pWrapper->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; -void qmInitMsgHandles(SMgmtWrapper *pWrapper) { + SDCreateQnodeReq createReq = {0}; + if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (createReq.dnodeId != pDnode->dnodeId) { + terrno = TSDB_CODE_NODE_INVALID_OPTION; + dError("failed to create qnode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId); + return -1; + } else { + return qmOpen(pWrapper); + } } +int32_t qmProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SDnode *pDnode = pWrapper->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; + + SDDropQnodeReq dropReq = {0}; + if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (dropReq.dnodeId != pDnode->dnodeId) { + terrno = TSDB_CODE_NODE_INVALID_OPTION; + dError("failed to drop qnode since %s", terrstr()); + return -1; + } else { + return qmDrop(pWrapper); + } +} + +void qmInitMsgHandles(SMgmtWrapper *pWrapper) {} diff --git a/source/dnode/mgmt/qnode/src/qmWorker.c b/source/dnode/mgmt/qnode/src/qmWorker.c index e69de29bb2..a55253ef70 100644 --- a/source/dnode/mgmt/qnode/src/qmWorker.c +++ b/source/dnode/mgmt/qnode/src/qmWorker.c @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "qmInt.h" + +static void qmProcessQueue(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { + dTrace("msg:%p, will be processed in qnode queue", pMsg); + SRpcMsg *pRsp = NULL; + SRpcMsg *pRpc = &pMsg->rpcMsg; + int32_t code = qndProcessMsg(pMgmt->pQnode, pRpc, &pRsp); + + if (pRpc->msgType & 1u) { + if (pRsp != NULL) { + pRsp->ahandle = pRpc->ahandle; + dndSendRsp(pMgmt->pWrapper, pRsp); + free(pRsp); + } else { + if (code != 0) code = terrno; + SRpcMsg rpcRsp = {.handle = pRpc->handle, .ahandle = pRpc->ahandle, .code = code}; + dndSendRsp(pMgmt->pWrapper, &rpcRsp); + } + } + + dTrace("msg:%p, is freed", pMsg); + rpcFreeCont(pRpc->pCont); + taosFreeQitem(pMsg); +} + +int32_t qmProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnodeWorker *pWorker = &pMgmt->queryWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + return dndWriteMsgToWorker(pWorker, pMsg, 0); +} + +int32_t qmProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg) { + SDnodeWorker *pWorker = &pMgmt->fetchWorker; + + dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name); + return dndWriteMsgToWorker(pWorker, pMsg, 0); +} + +int32_t qmStartWorker(SQnodeMgmt *pMgmt) { + if (dndInitWorker(pMgmt, &pMgmt->queryWorker, DND_WORKER_SINGLE, "qnode-query", 0, 1, qmProcessQueue) != 0) { + dError("failed to start qnode query worker since %s", terrstr()); + return -1; + } + + if (dndInitWorker(pMgmt, &pMgmt->fetchWorker, DND_WORKER_SINGLE, "qnode-fetch", 0, 1, qmProcessQueue) != 0) { + dError("failed to start qnode fetch worker since %s", terrstr()); + return -1; + } + + return 0; +} + +void qmStopWorker(SQnodeMgmt *pMgmt) { + dndCleanupWorker(&pMgmt->queryWorker); + dndCleanupWorker(&pMgmt->fetchWorker); +}