From 06a2b831b47191c734837f864d00626e7af727ec Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 18 Mar 2022 13:37:19 +0800 Subject: [PATCH] shm --- include/dnode/snode/snode.h | 1 - source/dnode/mgmt/snode/inc/sm.h | 31 ++++++++ source/dnode/mgmt/snode/inc/smInt.h | 52 +++++-------- source/dnode/mgmt/snode/src/smFile.c | 0 source/dnode/mgmt/snode/src/smInt.c | 103 ++++++++++++++++++++++++- source/dnode/mgmt/snode/src/smMsg.c | 40 +++++++++- source/dnode/mgmt/snode/src/smWorker.c | 0 7 files changed, 189 insertions(+), 38 deletions(-) create mode 100644 source/dnode/mgmt/snode/inc/sm.h delete mode 100644 source/dnode/mgmt/snode/src/smFile.c delete mode 100644 source/dnode/mgmt/snode/src/smWorker.c diff --git a/include/dnode/snode/snode.h b/include/dnode/snode/snode.h index 32d711ad17..14819eecbd 100644 --- a/include/dnode/snode/snode.h +++ b/include/dnode/snode/snode.h @@ -33,7 +33,6 @@ typedef struct { } SSnodeLoad; typedef struct { - int32_t sver; int32_t dnodeId; int64_t clusterId; SMgmtWrapper *pWrapper; diff --git a/source/dnode/mgmt/snode/inc/sm.h b/source/dnode/mgmt/snode/inc/sm.h new file mode 100644 index 0000000000..3ab5102340 --- /dev/null +++ b/source/dnode/mgmt/snode/inc/sm.h @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DND_SNODE_H_ +#define _TD_DND_SNODE_H_ + +#include "dnd.h" + +#ifdef __cplusplus +extern "C" { +#endif + +void bmGetMgmtFp(SMgmtWrapper *pWrapper); + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_DND_SNODE_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/snode/inc/smInt.h b/source/dnode/mgmt/snode/inc/smInt.h index fccccafb62..95b130b023 100644 --- a/source/dnode/mgmt/snode/inc/smInt.h +++ b/source/dnode/mgmt/snode/inc/smInt.h @@ -16,47 +16,37 @@ #ifndef _TD_DND_SNODE_INT_H_ #define _TD_DND_SNODE_INT_H_ -#include "dnd.h" +#include "sm.h" #ifdef __cplusplus extern "C" { #endif typedef struct SSnodeMgmt { - int32_t refCount; - int8_t deployed; - int8_t dropped; - int8_t uniqueWorkerInUse; - SSnode *pSnode; - SRWLatch latch; - SArray *uniqueWorkers; // SArray - SDnodeWorker sharedWorker; + SSnode *pSnode; + SDnode *pDnode; + SMgmtWrapper *pWrapper; + const char *path; + SRWLatch latch; + int8_t uniqueWorkerInUse; + SArray *uniqueWorkers; // SArray + SDnodeWorker sharedWorker; } SSnodeMgmt; -void smGetMgmtFp(SMgmtWrapper *pMgmt); +// smInt.c +int32_t smOpen(SMgmtWrapper *pWrapper); +int32_t smDrop(SMgmtWrapper *pWrapper); -int32_t dndInitSnode(SDnode *pDnode); -void dndCleanupSnode(SDnode *pDnode); - -void dndProcessSnodeWriteMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); -int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg); - -void smInitMsgHandles(SMgmtWrapper *pWrapper); - -int32_t smStartWorker(SDnode *pDnode); -void smStopWorker(SDnode *pDnode); -void smInitMsgFp(SMnodeMgmt *pMgmt); -void smProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t smPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -int32_t smPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); -void smConsumeChildQueue(SDnode *pDnode, SNodeMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); -void smConsumeParentQueue(SDnode *pDnode, SRpcMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen); - -void smProcessWriteMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void smProcessSyncMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); -void smProcessReadMsg(SDnode *pDnode, SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +// smMsg.c +void smInitMsgHandles(SMgmtWrapper *pWrapper); +int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); +// smWorker.c +int32_t smStartWorker(SQnodeMgmt *pMgmt); +void smStopWorker(SQnodeMgmt *pMgmt); +int32_t smProcessQueryMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); +int32_t smProcessFetchMsg(SQnodeMgmt *pMgmt, SNodeMsg *pMsg); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/snode/src/smFile.c b/source/dnode/mgmt/snode/src/smFile.c deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/source/dnode/mgmt/snode/src/smInt.c b/source/dnode/mgmt/snode/src/smInt.c index 051fe265f6..0b29649ad4 100644 --- a/source/dnode/mgmt/snode/src/smInt.c +++ b/source/dnode/mgmt/snode/src/smInt.c @@ -18,13 +18,110 @@ static int32_t smRequire(SMgmtWrapper *pWrapper, bool *required) { return dndReadFile(pWrapper, required); } +static void smInitOption(SSnodeMgmt *pMgmt, SSnodeOpt *pOption) { + SDnode *pDnode = pMgmt->pDnode; + pOption->pWrapper = pMgmt->pWrapper; + pOption->sendReqFp = dndSendReqToDnode; + pOption->sendMnodeReqFp = dndSendReqToMnode; + pOption->dnodeId = pDnode->dnodeId; + pOption->clusterId = pDnode->clusterId; +} + +static int32_t smOpenImp(SSnodeMgmt *pMgmt) { + SSnodeOpt option = {0}; + smInitOption(pMgmt, &option); + + pMgmt->pSnode = sndOpen(pMgmt->path, &option); + if (pMgmt->pSnode == NULL) { + dError("failed to open snode since %s", terrstr()); + return -1; + } + + if (smStartWorker(pMgmt) != 0) { + dError("failed to start snode worker since %s", terrstr()); + return -1; + } + + bool deployed = true; + if (dndWriteFile(pMgmt->pWrapper, deployed) != 0) { + dError("failed to write snode file since %s", terrstr()); + return -1; + } + + return 0; +} + +static void smCloseImp(SSnodeMgmt *pMgmt) { + if (pMgmt->pSnode != NULL) { + smStopWorker(pMgmt); + sndClose(pMgmt->pSnode); + pMgmt->pSnode = NULL; + } +} + +int32_t smDrop(SMgmtWrapper *pWrapper) { + SSnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return 0; + + dInfo("snode-mgmt start to drop"); + bool deployed = false; + if (dndWriteFile(pWrapper, deployed) != 0) { + dError("failed to drop snode since %s", terrstr()); + return -1; + } + + smCloseImp(pMgmt); + taosRemoveDir(pMgmt->path); + pWrapper->pMgmt = NULL; + free(pMgmt); + dInfo("snode-mgmt is dropped"); + return 0; +} + +static void smClose(SMgmtWrapper *pWrapper) { + SSnodeMgmt *pMgmt = pWrapper->pMgmt; + if (pMgmt == NULL) return; + + dInfo("snode-mgmt start to cleanup"); + smCloseImp(pMgmt); + pWrapper->pMgmt = NULL; + free(pMgmt); + dInfo("snode-mgmt is cleaned up"); +} + +int32_t smOpen(SMgmtWrapper *pWrapper) { + dInfo("snode-mgmt start to init"); + SSnodeMgmt *pMgmt = calloc(1, sizeof(SSnodeMgmt)); + if (pMgmt == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pMgmt->path = pWrapper->path; + pMgmt->pDnode = pWrapper->pDnode; + pMgmt->pWrapper = pWrapper; + pWrapper->pMgmt = pMgmt; + + int32_t code = smOpenImp(pMgmt); + if (code != 0) { + dError("failed to init snode-mgmt since %s", terrstr()); + smClose(pWrapper); + } else { + dInfo("snode-mgmt is initialized"); + } + + return code; +} + void smGetMgmtFp(SMgmtWrapper *pWrapper) { SMgmtFp mgmtFp = {0}; - mgmtFp.openFp = NULL; - mgmtFp.closeFp = NULL; + mgmtFp.openFp = smOpen; + mgmtFp.closeFp = smClose; + mgmtFp.createMsgFp = smProcessCreateReq; + mgmtFp.dropMsgFp = smProcessDropReq; mgmtFp.requiredFp = smRequire; - // smInitMsgHandles(pWrapper); + smInitMsgHandles(pWrapper); pWrapper->name = "snode"; pWrapper->fp = mgmtFp; } diff --git a/source/dnode/mgmt/snode/src/smMsg.c b/source/dnode/mgmt/snode/src/smMsg.c index 7ebc11c5a5..ad325f336f 100644 --- a/source/dnode/mgmt/snode/src/smMsg.c +++ b/source/dnode/mgmt/snode/src/smMsg.c @@ -16,8 +16,42 @@ #define _DEFAULT_SOURCE #include "smInt.h" -int32_t smProcessCreateReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;} -int32_t smProcessDropReq(SSnodeMgmt *pMgmt, SNodeMsg *pMsg) {return 0;} +int32_t smProcessCreateReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SDnode *pDnode = pWrapper->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; -void smInitMsgHandles(SMgmtWrapper *pWrapper) { + SDCreateSnodeReq createReq = {0}; + if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &createReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (createReq.dnodeId != pDnode->dnodeId) { + terrno = TSDB_CODE_NODE_INVALID_OPTION; + dError("failed to create snode since %s, input:%d cur:%d", terrstr(), createReq.dnodeId, pDnode->dnodeId); + return -1; + } else { + return smOpen(pWrapper); + } } + +int32_t smProcessDropReq(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { + SDnode *pDnode = pWrapper->pDnode; + SRpcMsg *pReq = &pMsg->rpcMsg; + + SDDropSnodeReq dropReq = {0}; + if (tDeserializeSMCreateDropQSBNodeReq(pReq->pCont, pReq->contLen, &dropReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + return -1; + } + + if (dropReq.dnodeId != pDnode->dnodeId) { + terrno = TSDB_CODE_NODE_INVALID_OPTION; + dError("failed to drop snode since %s", terrstr()); + return -1; + } else { + return smDrop(pWrapper); + } +} + +void smInitMsgHandles(SMgmtWrapper *pWrapper) {} diff --git a/source/dnode/mgmt/snode/src/smWorker.c b/source/dnode/mgmt/snode/src/smWorker.c deleted file mode 100644 index e69de29bb2..0000000000