diff --git a/include/libs/transport/trpc.h b/include/libs/transport/trpc.h index fdc9368b76..1f9cc06946 100644 --- a/include/libs/transport/trpc.h +++ b/include/libs/transport/trpc.h @@ -42,10 +42,17 @@ typedef struct SRpcMsg { void * pCont; int contLen; int32_t code; - void * handle; // rpc handle returned to app - void * ahandle; // app handle set by client + void *handle; // rpc handle returned to app + void *ahandle; // app handle set by client } SRpcMsg; +typedef struct { + char user[TSDB_USER_LEN]; + SRpcMsg rpcMsg; + SEpSet rpcEpSet; + int32_t rspLen; + void *pRsp; +} SNodeMsg; typedef struct SRpcInit { uint16_t localPort; // local port diff --git a/source/dnode/mgmt/dnode/inc/dndInt.h b/source/dnode/mgmt/dnode/inc/dndInt.h index c47d50597b..e8e2e5953b 100644 --- a/source/dnode/mgmt/dnode/inc/dndInt.h +++ b/source/dnode/mgmt/dnode/inc/dndInt.h @@ -61,29 +61,152 @@ typedef enum { DND_STAT_INIT, DND_STAT_RUNNING, DND_STAT_STOPPED } EDndStatus; typedef enum { DND_WORKER_SINGLE, DND_WORKER_MULTI } EWorkerType; typedef enum { DND_ENV_INIT, DND_ENV_READY, DND_ENV_CLEANU } EEnvStat; +typedef struct SMgmtFp SMgmtFp; +typedef struct SMgmtWrapper SMgmtWrapper; + typedef void (*DndMsgFp)(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEps); typedef int32_t (*MndMsgFp)(SDnode *pDnode, SMndMsg *pMsg); +typedef SMgmtWrapper *(*MgmtOpenFp)(SDnode *pDnode, const char *path); +typedef void (*MgmtCloseFp)(SDnode *pDnode, SMgmtWrapper *pMgmt); +typedef bool (*MgmtRequiredFp)(SDnode *pDnode, const char *path); +typedef SArray *(*MgmtMsgFp)(SMgmtWrapper *pNode, SNodeMsg *pMsg); +typedef struct { + EWorkerType type; + const char *name; + int32_t minNum; + int32_t maxNum; + void *queueFp; + SDnode *pDnode; + STaosQueue *queue; + union { + SQWorkerPool pool; + SWWorkerPool mpool; + }; +} SDnodeWorker; +typedef struct { + int32_t dnodeId; + int32_t dropped; + int64_t clusterId; + int64_t dver; + int64_t rebootTime; + int64_t updateTime; + int8_t statusSent; + SEpSet mnodeEpSet; + char *file; + SHashObj *dnodeHash; + SArray *pDnodeEps; + pthread_t *threadId; + SRWLatch latch; + SDnodeWorker mgmtWorker; + SDnodeWorker statusWorker; +} SDnodeMgmt; + +typedef struct { + int32_t refCount; + int8_t deployed; + int8_t dropped; + SMnode *pMnode; + SRWLatch latch; + SDnodeWorker readWorker; + SDnodeWorker writeWorker; + SDnodeWorker syncWorker; + int8_t replica; + int8_t selfIndex; + SReplica replicas[TSDB_MAX_REPLICA]; + + // + MndMsgFp msgFp[TDMT_MAX]; + SProcObj *pProcess; + bool singleProc; +} SMnodeMgmt; + +typedef struct { + int32_t refCount; + int8_t deployed; + int8_t dropped; + SQnode *pQnode; + SRWLatch latch; + SDnodeWorker queryWorker; + SDnodeWorker fetchWorker; +} SQnodeMgmt; + +typedef struct { + int32_t refCount; + int8_t deployed; + int8_t dropped; + SSnode *pSnode; + SRWLatch latch; + SDnodeWorker writeWorker; +} SSnodeMgmt; + +typedef struct { + int32_t openVnodes; + int32_t totalVnodes; + int32_t masterNum; + int64_t numOfSelectReqs; + int64_t numOfInsertReqs; + int64_t numOfInsertSuccessReqs; + int64_t numOfBatchInsertReqs; + int64_t numOfBatchInsertSuccessReqs; +} SVnodesStat; + +typedef struct { + int32_t refCount; + int8_t deployed; + int8_t dropped; + SBnode *pBnode; + SRWLatch latch; + SDnodeWorker writeWorker; +} SBnodeMgmt; + +typedef struct { + SVnodesStat stat; + SHashObj *hash; + SRWLatch latch; + SQWorkerPool queryPool; + SFWorkerPool fetchPool; + SWWorkerPool syncPool; + SWWorkerPool writePool; +} SVnodesMgmt; + +typedef struct { + void *serverRpc; + void *clientRpc; + DndMsgFp msgFp[TDMT_MAX]; +} STransMgmt; + +typedef struct SMgmtFp { + MgmtOpenFp openFp; + MgmtCloseFp closeFp; + MgmtRequiredFp requiredFp; + MgmtMsgFp msgFp; +} SMgmtFp; + +typedef struct SMgmtWrapper { + const char *name; + char *path; + bool required; + EProcType procType; + SProcObj *pProc; + void *pMgmt; + SMgmtFp fp; +} SMgmtWrapper; typedef struct SDnode { EDndStatus status; - SDndCfg cfg; - SDnodeDir dir; + EDndEvent event; + EProcType procType; + SDndCfg cfg; + SStartupReq startup; TdFilePtr pLockFile; SDnodeMgmt dmgmt; - SMndMgmt mmgmt; - SQnodeMgmt qmgmt; - SSnodeMgmt smgmt; - SBnodeMgmt bmgmt; - SVnodesMgmt vmgmt; STransMgmt tmgmt; - STfs *pTfs; - SStartupReq startup; - EDndEvent event; + SMgmtFp fps[NODE_MAX]; + SMgmtWrapper mgmts[NODE_MAX]; } SDnode; - EDndStatus dndGetStatus(SDnode *pDnode); void dndSetStatus(SDnode *pDnode, EDndStatus stat); const char *dndStatStr(EDndStatus stat); diff --git a/source/dnode/mgmt/dnode/inc/dndMain.h b/source/dnode/mgmt/dnode/inc/dndMain.h index 3a18db4590..003f07bbd8 100644 --- a/source/dnode/mgmt/dnode/inc/dndMain.h +++ b/source/dnode/mgmt/dnode/inc/dndMain.h @@ -13,8 +13,8 @@ * along with this program. If not, see . */ -#ifndef _TD_DND_ENV_H_ -#define _TD_DND_ENV_H_ +#ifndef _TD_DND_MAIN_H_ +#define _TD_DND_MAIN_H_ #ifdef __cplusplus extern "C" { @@ -22,126 +22,8 @@ extern "C" { #include "dndInt.h" -typedef struct { - EWorkerType type; - const char *name; - int32_t minNum; - int32_t maxNum; - void *queueFp; - SDnode *pDnode; - STaosQueue *queue; - union { - SQWorkerPool pool; - SWWorkerPool mpool; - }; -} SDnodeWorker; - -typedef struct { - char *dnode; - char *mnode; - char *snode; - char *bnode; - char *vnodes; -} SDnodeDir; - -typedef struct { - int32_t dnodeId; - int32_t dropped; - int64_t clusterId; - int64_t dver; - int64_t rebootTime; - int64_t updateTime; - int8_t statusSent; - SEpSet mnodeEpSet; - char *file; - SHashObj *dnodeHash; - SArray *pDnodeEps; - pthread_t *threadId; - SRWLatch latch; - SDnodeWorker mgmtWorker; - SDnodeWorker statusWorker; -} SDnodeMgmt; - -typedef enum { SINGLE_PROC, MULTI_PROC_PARENT, MULTI_PROC_CHILD } EProcType; - -typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - SMnode *pMnode; - SRWLatch latch; - SDnodeWorker readWorker; - SDnodeWorker writeWorker; - SDnodeWorker syncWorker; - int8_t replica; - int8_t selfIndex; - SReplica replicas[TSDB_MAX_REPLICA]; - - // - MndMsgFp msgFp[TDMT_MAX]; - SProcObj *pProcess; - bool singleProc; -} SMndMgmt; - -typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - SQnode *pQnode; - SRWLatch latch; - SDnodeWorker queryWorker; - SDnodeWorker fetchWorker; -} SQnodeMgmt; - -typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - SSnode *pSnode; - SRWLatch latch; - SDnodeWorker writeWorker; -} SSnodeMgmt; - -typedef struct { - int32_t refCount; - int8_t deployed; - int8_t dropped; - SBnode *pBnode; - SRWLatch latch; - SDnodeWorker writeWorker; -} SBnodeMgmt; - -typedef struct { - int32_t openVnodes; - int32_t totalVnodes; - int32_t masterNum; - int64_t numOfSelectReqs; - int64_t numOfInsertReqs; - int64_t numOfInsertSuccessReqs; - int64_t numOfBatchInsertReqs; - int64_t numOfBatchInsertSuccessReqs; -} SVnodesStat; - -typedef struct { - SVnodesStat stat; - SHashObj *hash; - SRWLatch latch; - SQWorkerPool queryPool; - SFWorkerPool fetchPool; - SWWorkerPool syncPool; - SWWorkerPool writePool; -} SVnodesMgmt; - -typedef struct { - void *serverRpc; - void *clientRpc; - DndMsgFp msgFp[TDMT_MAX]; -} STransMgmt; - - - #ifdef __cplusplus } #endif -#endif /*_TD_DND_ENV_H_*/ \ No newline at end of file +#endif /*_TD_DND_MAIN_H_*/ \ No newline at end of file diff --git a/source/dnode/mgmt/dnode/src/dndInt.c b/source/dnode/mgmt/dnode/src/dndInt.c index 9203ce5838..d47b2d3a42 100644 --- a/source/dnode/mgmt/dnode/src/dndInt.c +++ b/source/dnode/mgmt/dnode/src/dndInt.c @@ -78,5 +78,6 @@ int32_t dndGetMonitorDiskInfo(SDnode *pDnode, SMonDiskInfo *pInfo) { tstrncpy(pInfo->tempdir.name, tsTempDir, sizeof(pInfo->tempdir.name)); pInfo->tempdir.size = tsTempSpace.size; - return tfsGetMonitorInfo(pDnode->pTfs, pInfo); + //return tfsGetMonitorInfo(pDnode->pTfs, pInfo); + return tfsGetMonitorInfo(NULL, pInfo); } diff --git a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h index d882a3bbc9..2dd3ce0b5b 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h +++ b/source/dnode/mgmt/impl/mnodeMgmt/inc/mm.h @@ -48,7 +48,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe // mmWorker int32_t mmStartWorker(SDnode *pDnode); void mmStopWorker(SDnode *pDnode); -void mmInitMsgFp(SMndMgmt *pMgmt); +void mmInitMsgFp(SMnodeMgmt *pMgmt); void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); int32_t mmPutMsgToWriteQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); int32_t mmPutMsgToReadQueue(SDnode *pDnode, SRpcMsg *pRpcMsg); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c index 42158f631c..89d7eefab2 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmFile.c @@ -17,7 +17,7 @@ #include "mm.h" int32_t mmReadFile(SDnode *pDnode) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = TSDB_CODE_DND_MNODE_READ_FILE_ERROR; int32_t len = 0; @@ -115,7 +115,7 @@ PRASE_MNODE_OVER: } int32_t mmWriteFile(SDnode *pDnode) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; char file[PATH_MAX]; snprintf(file, sizeof(file), "%s%smnode.json.bak", pDnode->dir.dnode, TD_DIRSEP); diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c index 4aab5aa40d..34fe71eef7 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmHandle.c @@ -122,7 +122,7 @@ int32_t mmGetMonitorInfo(SDnode *pDnode, SMonClusterInfo *pClusterInfo, SMonVgro } int32_t dndGetUserAuthFromMnode(SDnode *pDnode, char *user, char *spi, char *encrypt, char *secret, char *ckey) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c index add2f76cb4..9c1728542b 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmMgmt.c @@ -29,7 +29,7 @@ int32_t mmInit(SDnode *pDnode) { dInfo("mnode mgmt start to init"); int32_t code = -1; - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosInitRWLatch(&pMgmt->latch); mmInitMsgFp(pMgmt); @@ -76,7 +76,7 @@ _OVER: void mmCleanup(SDnode *pDnode) { dInfo("mnode mgmt start to clean up"); - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; if (pMgmt->pMnode) { mmStopWorker(pDnode); mndClose(pMgmt->pMnode); @@ -86,7 +86,7 @@ void mmCleanup(SDnode *pDnode) { } SMnode *mmAcquire(SDnode *pDnode) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = NULL; int32_t refCount = 0; @@ -108,7 +108,7 @@ SMnode *mmAcquire(SDnode *pDnode) { void mmRelease(SDnode *pDnode, SMnode *pMnode) { if (pMnode == NULL) return; - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosRLockLatch(&pMgmt->latch); int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); @@ -116,7 +116,7 @@ void mmRelease(SDnode *pDnode, SMnode *pMnode) { } int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->singleProc = true; int32_t code = mmOpenImp(pDnode, pOption); @@ -150,7 +150,7 @@ int32_t mmOpen(SDnode *pDnode, SMnodeOpt *pOption) { } int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { @@ -169,7 +169,7 @@ int32_t mmAlter(SDnode *pDnode, SMnodeOpt *pOption) { } int32_t mmDrop(SDnode *pDnode) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mmAcquire(pDnode); if (pMnode == NULL) { @@ -238,7 +238,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) { pReplica->port = pDnode->cfg.serverPort; memcpy(pReplica->fqdn, pDnode->cfg.localFqdn, TSDB_FQDN_LEN); - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); @@ -246,7 +246,7 @@ static void mmBuildOptionForDeploy(SDnode *pDnode, SMnodeOpt *pOption) { static void mmBuildOptionForOpen(SDnode *pDnode, SMnodeOpt *pOption) { mmInitOption(pDnode, pOption); - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pOption->selfIndex = pMgmt->selfIndex; pOption->replica = pMgmt->replica; memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); @@ -274,7 +274,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe return -1; } - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->selfIndex = pOption->selfIndex; pMgmt->replica = pOption->replica; memcpy(&pMgmt->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); @@ -282,7 +282,7 @@ int32_t mmBuildOptionFromReq(SDnode *pDnode, SMnodeOpt *pOption, SDCreateMnodeRe } static int32_t mmOpenImp(SDnode *pDnode, SMnodeOpt *pOption) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SMnode *pMnode = mndOpen(pDnode->dir.mnode, pOption); if (pMnode == NULL) { diff --git a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c index 8d400cf867..a721c5977a 100644 --- a/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c +++ b/source/dnode/mgmt/impl/mnodeMgmt/src/mmWorker.c @@ -28,7 +28,7 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs static void mmConsumeMsgQueue(SDnode *pDnode, SMndMsg *pMsg); int32_t mmStartWorker(SDnode *pDnode) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; if (dndInitWorker(pDnode, &pMgmt->readWorker, DND_WORKER_SINGLE, "mnode-read", 0, 1, mmConsumeMsgQueue) != 0) { dError("failed to start mnode read worker since %s", terrstr()); return -1; @@ -48,7 +48,7 @@ int32_t mmStartWorker(SDnode *pDnode) { } void mmStopWorker(SDnode *pDnode) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; taosWLockLatch(&pMgmt->latch); pMgmt->deployed = 0; @@ -63,7 +63,7 @@ void mmStopWorker(SDnode *pDnode) { dndCleanupWorker(&pMgmt->syncWorker); } -void mmInitMsgFp(SMndMgmt *pMgmt) { +void mmInitMsgFp(SMnodeMgmt *pMgmt) { // Requests handled by DNODE pMgmt->msgFp[TMSG_INDEX(TDMT_DND_CREATE_MNODE_RSP)] = mmProcessWriteMsg; pMgmt->msgFp[TMSG_INDEX(TDMT_DND_ALTER_MNODE_RSP)] = mmProcessWriteMsg; @@ -163,7 +163,7 @@ static int32_t mmBuildMsg(SMndMsg *pMsg, SRpcMsg *pRpc) { } void mmProcessRpcMsg(SDnode *pDnode, SRpcMsg *pRpc, SEpSet *pEpSet) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = -1; SMndMsg *pMsg = NULL; @@ -261,7 +261,7 @@ static int32_t mmPutRpcMsgToWorker(SDnode *pDnode, SDnodeWorker *pWorker, SRpcMs } void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) { - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; int32_t code = -1; if (pMgmt->singleProc) { @@ -278,7 +278,7 @@ void mmPutRpcRspToWorker(SDnode *pDnode, SRpcMsg *pRpc) { void mmConsumeChildQueue(SDnode *pDnode, SMndMsg *pMsg, int32_t msgLen, void *pCont, int32_t contLen) { dTrace("msg:%p, get from child queue", pMsg); - SMndMgmt *pMgmt = &pDnode->mmgmt; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; SRpcMsg *pRpc = &pMsg->rpcMsg; pRpc->pCont = pCont;